Skip to content

Distributing Workloads More Evenly Between Parquet Pipes #65963

@Clark0

Description

@Clark0

Describe the situation
I'm testing the read performance of Parquet files using 26 files on a machine with 64 cores. Each file contains a varying number of row groups, ranging from 1 to 9. These files were generated using Trino from the TPC-DS 100 store_sales table.

How to reproduce

  • ClickHouse version 24.7.1.702
SELECT
    num_row_groups,
    num_rows
FROM file('./store_sales/*', ParquetMetaData)
FORMAT TSV

1	553238
1	604702
1	411712
1	25732
1	411712
1	411712
1	411712
6	21376859
1	257320
1	141526
1	411712
9	29035346
1	411712
1	585403
1	411712
9	28845572
9	28771593
1	467656
9	28826273
1	527506
9	29073943
1	411712
9	28961365
9	28729778
9	28903470
9	29016046

If I run the test with the default settings, it takes approximately 7 seconds to complete.

SELECT
    ss_sold_date_sk,
    ss_customer_sk,
    ss_hdemo_sk,
    ss_addr_sk,
    ss_store_sk,
    ss_ticket_number,
    ss_ext_sales_price,
    ss_ext_list_price,
    ss_ext_tax
FROM file('./store_sales/*', Parquet)
WHERE (ss_sold_date_sk >= 2451180) AND (ss_sold_date_sk <= 2452246) AND (ss_store_sk >= 2) AND (ss_store_sk <= 401) AND (ss_hdemo_sk >= 480) AND (ss_hdemo_sk <= 6599)
FORMAT `Null`

Query id: 46f28761-8370-4095-80ba-3d38154376f1

Ok.

0 rows in set. Elapsed: 7.704 sec. Processed 163.76 million rows, 6.02 GB (21.26 million rows/s., 781.96 MB/s.)

However, if I increase the max_parsing_threads to 64 or 512, the query speed increases significantly.

SELECT
    ss_sold_date_sk,
    ss_customer_sk,
    ss_hdemo_sk,
    ss_addr_sk,
    ss_store_sk,
    ss_ticket_number,
    ss_ext_sales_price,
    ss_ext_list_price,
    ss_ext_tax
FROM file('./store_sales/*', Parquet)
WHERE (ss_sold_date_sk >= 2451180) AND (ss_sold_date_sk <= 2452246) AND (ss_store_sk >= 2) AND (ss_store_sk <= 401) AND (ss_hdemo_sk >= 480) AND (ss_hdemo_sk <= 6599)
FORMAT `Null`
SETTINGS max_parsing_threads = 64

Query id: 254f7a90-5640-4ff0-8d59-e361043e96f3

Ok.

0 rows in set. Elapsed: 4.145 sec. Processed 163.37 million rows, 6.01 GB (39.42 million rows/s., 1.45 GB/s.)

SELECT
    ss_sold_date_sk,
    ss_customer_sk,
    ss_hdemo_sk,
    ss_addr_sk,
    ss_store_sk,
    ss_ticket_number,
    ss_ext_sales_price,
    ss_ext_list_price,
    ss_ext_tax
FROM file('./store_sales/*', Parquet)
WHERE (ss_sold_date_sk >= 2451180) AND (ss_sold_date_sk <= 2452246) AND (ss_store_sk >= 2) AND (ss_store_sk <= 401) AND (ss_hdemo_sk >= 480) AND (ss_hdemo_sk <= 6599)
FORMAT `Null`
SETTINGS max_parsing_threads = 512

Query id: d1e4ef00-d343-483f-9272-8acfb2a0c73f

Ok.

0 rows in set. Elapsed: 1.709 sec. Processed 163.76 million rows, 6.02 GB (95.83 million rows/s., 3.53 GB/s.)

The settings here control the number of threads each Parquet pipe can use. However, each file contains a different number of row groups, and the row groups read by each Parquet pipe will vary.

const auto max_parsing_threads = std::max<size_t>(settings.max_parsing_threads / file_num, 1UL);

The number of pipes is limited by the number of files,

if (max_num_streams > files_to_read)

Proposal

Always use num_threads pipes and create a read pool similar to a MergeTreeReadPool. Each pipe retrieves new tasks from the reading pool. So that we don't need to create extra thread pools for each pipe and ensures a more balanced workload distribution between the pipes.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions