multistage prewhere in parquet reader v3#93542
Conversation
a8344fa to
0f93cf6
Compare
|
Workflow [PR], commit [31325b6] Summary: ⏳
|
38fe544 to
efcc5f9
Compare
There was a problem hiding this comment.
Pull request overview
This PR implements multi-stage prewhere optimization for data lake storage formats (Iceberg, Delta Lake, Hudi) using the Parquet reader v3. The main change enables data lake storages to support prewhere filtering and extends the Parquet reader to handle multiple sequential prewhere steps, applying filters incrementally as columns are read.
Changes:
- Enabled prewhere support for data lake configurations by removing the check that previously disabled it
- Refactored Parquet reader to support multi-stage prewhere with sequential filter application
- Added comprehensive integration tests for multi-stage prewhere with various filter combinations
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| src/Storages/ObjectStorage/StorageObjectStorage.cpp | Removed check that disabled prewhere for data lake configurations |
| src/Storages/MergeTree/MergeTreeSplitPrewhereIntoReadSteps.h | New header exposing tryBuildPrewhereSteps function for use in Parquet reader |
| src/Storages/MergeTree/MergeTreeSplitPrewhereIntoReadSteps.cpp | Moved includes to header file |
| src/Storages/MergeTree/MergeTreeSelectProcessor.cpp | Removed forward declaration, now using header |
| src/Processors/Formats/Impl/Parquet/Reader.h | Changed prewhere tracking from boolean to step indices, restructured Step to support multi-stage |
| src/Processors/Formats/Impl/Parquet/Reader.cpp | Implemented multi-stage prewhere logic with step-by-step filter application |
| src/Processors/Formats/Impl/Parquet/ReadManager.h | Added step_idx parameter to task and state tracking functions |
| src/Processors/Formats/Impl/Parquet/ReadManager.cpp | Refactored read stages from separate Prewhere/Main stages to unified step-based approach |
| src/Processors/Formats/Impl/Parquet/ReadCommon.h | Consolidated ReadStage enum, removed PrewhereOffsetIndex/PrewhereData/MainOffsetIndex/MainData stages |
| src/Common/ProfileEvents.cpp | Added metrics for tracking rows and columns processed by filter expressions |
| tests/integration/helpers/iceberg_utils.py | Added generate_data_complex helper for creating test data with multiple columns |
| tests/integration/test_storage_iceberg_with_spark/test_single_iceberg_file.py | Added comprehensive test_multistage_prewhere test with multiple filter scenarios |
tests/integration/test_storage_iceberg_with_spark/test_single_iceberg_file.py
Outdated
Show resolved
Hide resolved
tests/integration/test_storage_iceberg_with_spark/test_single_iceberg_file.py
Outdated
Show resolved
Hide resolved
tests/integration/test_storage_iceberg_with_spark/test_single_iceberg_file.py
Outdated
Show resolved
Hide resolved
tests/integration/test_storage_iceberg_with_spark/test_single_iceberg_file.py
Outdated
Show resolved
Hide resolved
tests/integration/test_storage_iceberg_with_spark/test_single_iceberg_file.py
Outdated
Show resolved
Hide resolved
|
The same benchmark but with 2M rows: My solution: Previous: |
|
Let's try more columns (exactly 7): Generator: My solution: Previous: |
ef8a545
|
@al13n321 could you review please? |
| for (size_t i = 0; i < filter_column->size(); ++i) | ||
| { | ||
| Field field; | ||
| filter_column->get(i, field); | ||
| } |
| if (rows_pass == filter.size()) | ||
| /// Nothing was filtered out. | ||
| continue; |
| { | ||
| if (reader.primitive_columns[i].use_prewhere != is_prewhere) | ||
| if ((step_idx == 0 && !reader.primitive_columns[i].steps_to_calculate.empty()) || | ||
| (step_idx > 0 && !reader.primitive_columns[i].steps_to_calculate.contains(step_idx))) |
There was a problem hiding this comment.
If steps_to_calculate has more than one element, this will try to read the column multiple times? Indeed, this fails an assert:
insert into function file('t.parquet') select number as x, number as y, number as z from numbers(10) settings engine_file_truncate_on_insert=1;
select * from file('t.parquet') prewhere x > 5 and (x > 6 or y > 3);| /// Can start prewhere in next subgroup. | ||
| addTasksToReadColumns(row_group_idx, row_subgroup_idx + 1, ReadStage::PrewhereOffsetIndex, diff); | ||
| const auto & step = reader.steps[step_idx - 1]; | ||
| if (step.filter_column_name && !step.filter_column_name->empty()) |
There was a problem hiding this comment.
nit: When would it be present but empty?
| /// Can start prewhere in next subgroup. | ||
| addTasksToReadColumns(row_group_idx, row_subgroup_idx + 1, ReadStage::PrewhereOffsetIndex, diff); | ||
| const auto & step = reader.steps[step_idx - 1]; | ||
| if (step.filter_column_name && !step.filter_column_name->empty()) |
There was a problem hiding this comment.
I think it's better to execute the ExpressionActions even if no filtering was requested. Because:
- Otherwise what's the meaning of
PrewhereExprStep::need_filter? I.e. if aPrewhereExprStepwithneed_filter == falsemeans "do nothing, as if this PrewhereExprStep didn't exist" then why does thisPrewhereExprStepexist? - Maybe there's some weird case where the expression outputs additional columns (which
applyPrewherewould propagate throughidxs_in_output_block) but doesn't do filtering. E.g. maybe the query optimizer figured out that the condition is always true and removed the filtering, but for some reason didn't move the expression evaluation from PREWHERE to SELECT. Or maybePrewhereInfocan be used for other things that are not literal PREWHERE; it's just a mechanism to inject arbitrary expression evaluation into the middle of data reading.
| if (row_subgroup.filter.rows_pass == 0) | ||
| break; |
There was a problem hiding this comment.
nit: Why create a Task in this case in the first place? filter is already known at addTasksToReadColumns time, right?
| /// If we're reusing filter.memory for a new step (multistage prewhere), free the old memory first. | ||
| if (row_subgroup.filter.memory) | ||
| row_subgroup.filter.memory.reset(&diff); | ||
| row_subgroup.filter.memory = MemoryUsageToken(row_subgroup.filter.rows_total, &diff); |
There was a problem hiding this comment.
nit:
| /// If we're reusing filter.memory for a new step (multistage prewhere), free the old memory first. | |
| if (row_subgroup.filter.memory) | |
| row_subgroup.filter.memory.reset(&diff); | |
| row_subgroup.filter.memory = MemoryUsageToken(row_subgroup.filter.rows_total, &diff); | |
| /// If we're reusing filter.memory for a new step (multistage prewhere), free the old memory first. | |
| if (!row_subgroup.filter.memory) | |
| row_subgroup.filter.memory = MemoryUsageToken(row_subgroup.filter.rows_total, &diff); |
| if (row_subgroup.filter.rows_pass == 0) | ||
| break; |
| } | ||
| size_t prev_page_idx = column.data_pages_idx; | ||
|
|
||
| chassert(task.row_subgroup_idx != UINT64_MAX); |
There was a problem hiding this comment.
nit: This doesn't do anything after the row_group.subgroups.at(task.row_subgroup_idx) above.
| for (const RowSubgroup & subgroup : row_group.subgroups) | ||
| chassert(subgroup.stage.load(std::memory_order_relaxed) == ReadStage::Deallocated); | ||
| } | ||
| for (size_t i = 0; i < stages.size(); ++i) |
There was a problem hiding this comment.
No leak check anymore? The memory accounting is kind of sketchy and semi-manual, it's easy to have a bug there that would mostly go unnoticed, the check seems important.
…parquet Revert "Merge pull request #93542 from scanhex12/multistage_prewhere"
…ex12/multistage_prewhere"
…age-prewhere-parquet Revert "Revert "Merge pull request #93542 from scanhex12/multistage_prewhere""
…evert-multistage-prewhere-parquet Revert "Revert "Revert "Merge pull request #93542 from scanhex12/multistage_prewhere"""
…evert-95496-revert-multistage-prewhere-parquet Revert "Revert "Revert "Revert "Merge pull request #93542 from scanhex12/multistage_prewhere""""
…ltistage_prewhere""
…hex12/multistage_prewhere"""
…rom scanhex12/multistage_prewhere""""
Changelog category (leave one):
Changelog entry (a user-readable short description of the changes that goes into CHANGELOG.md):
Datalakes prewhere & multistage prewhere in parquet reader v3 resolves #89101
Benchmarks:
Documentation entry for user-facing changes