1414#include < Processors/Formats/Impl/Parquet/Reader.h>
1515#include < Processors/Formats/Impl/Parquet/SchemaConverter.h>
1616#include < Storages/SelectQueryInfo.h>
17- #include < Storages/MergeTree/MergeTreeRangeReader.h>
18- #include < Storages/MergeTree/MergeTreeSplitPrewhereIntoReadSteps.h>
1917
2018#include < mutex>
2119#include < lz4.h>
@@ -36,12 +34,6 @@ namespace DB::ErrorCodes
3634 extern const int CHECKSUM_DOESNT_MATCH;
3735}
3836
39- namespace ProfileEvents
40- {
41- extern const Event ParquetRowsFilterExpression;
42- extern const Event ParquetColumnsFilterExpression;
43- }
44-
4537namespace DB ::Parquet
4638{
4739
@@ -668,12 +660,11 @@ void Reader::preparePrewhere()
668660
669661 // / Convert ActionsDAG to ExpressionActions.
670662 std::optional<ExpressionActionsSettings> actions_settings;
671-
672- auto add_single_step = [&] (const ActionsDAG & dag, const String & filter_column_name, bool needs_filter, size_t step_idx)
663+ auto add_step = [&](const ActionsDAG & dag, const String & filter_column_name, bool needs_filter)
673664 {
674665 if (!actions_settings.has_value ())
675666 actions_settings.emplace ();
676- Step step { .actions = ExpressionActions (dag.clone (), actions_settings.value ()) };
667+ PrewhereStep step { .actions = ExpressionActions (dag.clone (), actions_settings.value ()) };
677668 if (needs_filter)
678669 step.filter_column_name = filter_column_name;
679670
@@ -685,13 +676,12 @@ void Reader::preparePrewhere()
685676 if (output_idx.has_value ())
686677 {
687678 OutputColumnInfo & output_info = output_columns[output_idx.value ()];
688- output_info.step_idx = step_idx + 1 ;
679+ output_info.use_prewhere = true ;
689680 bool only_for_prewhere = idx_in_output_block >= sample_block->columns ();
690681
691682 for (size_t primitive_idx = output_info.primitive_start ; primitive_idx < output_info.primitive_end ; ++primitive_idx)
692683 {
693- // primitive_columns[primitive_idx].use_prewhere = true;
694- primitive_columns[primitive_idx].steps_to_calculate .insert (steps.size () + 1 );
684+ primitive_columns[primitive_idx].use_prewhere = true ;
695685 primitive_columns[primitive_idx].only_for_prewhere = only_for_prewhere;
696686 }
697687 }
@@ -715,37 +705,7 @@ void Reader::preparePrewhere()
715705 }
716706 }
717707
718- steps.push_back (std::move (step));
719- };
720-
721- auto add_step = [&](const ActionsDAG & dag, const String & filter_column_name, bool needs_filter)
722- {
723- if (!actions_settings.has_value ())
724- actions_settings.emplace ();
725-
726- auto prewhere_info_patched = std::make_shared<PrewhereInfo>(dag.clone (), filter_column_name);
727- prewhere_info_patched->need_filter = needs_filter;
728- PrewhereExprInfo prewhere_expr_info;
729- bool success = tryBuildPrewhereSteps (
730- prewhere_info_patched,
731- *actions_settings,
732- prewhere_expr_info,
733- /* force_short_circuit_execution*/ false );
734-
735- if (success)
736- {
737- // / Add all steps separately.
738- for (size_t i = 0 ; i < prewhere_expr_info.steps .size (); ++i)
739- {
740- auto filter = prewhere_expr_info.steps [i];
741- add_single_step (filter->actions ->getActionsDAG (), filter->filter_column_name , true , i);
742- }
743- }
744- else
745- {
746- // / Execute everything as one large step
747- add_single_step (dag, filter_column_name, needs_filter, 0 );
748- }
708+ prewhere_steps.push_back (std::move (step));
749709 };
750710
751711 if (row_level_filter)
@@ -1118,7 +1078,6 @@ void Reader::intersectColumnIndexResultsAndInitSubgroups(RowGroup & row_group)
11181078 }
11191079 }
11201080 row_group.intersected_row_ranges_after_column_index = std::move (row_ranges);
1121- row_group.next_subgroup_for_step = std::vector<std::atomic<size_t >>(steps.size () + 1 );
11221081}
11231082
11241083void Reader::decodeOffsetIndex (ColumnChunk & column, const RowGroup & row_group)
@@ -1400,7 +1359,7 @@ void Reader::decodePrimitiveColumn(ColumnChunk & column, const PrimitiveColumnIn
14001359 }
14011360
14021361 if (subchunk.arrays_offsets .empty () && subchunk.column ->size () != row_subgroup.filter .rows_pass )
1403- throw Exception (ErrorCodes::LOGICAL_ERROR, " Unexpected number of rows in column subchunk {} {} " , subchunk. column -> size (), row_subgroup. filter . rows_pass );
1362+ throw Exception (ErrorCodes::LOGICAL_ERROR, " Unexpected number of rows in column subchunk" );
14041363
14051364 if (column_info.output_nullable )
14061365 {
@@ -2092,27 +2051,20 @@ ColumnPtr & Reader::getOrFormOutputColumn(RowSubgroup & row_subgroup, size_t idx
20922051 return state.column ;
20932052}
20942053
2095- void Reader::applyPrewhere (RowSubgroup & row_subgroup, const RowGroup & row_group, size_t step_idx )
2054+ void Reader::applyPrewhere (RowSubgroup & row_subgroup, const RowGroup & row_group)
20962055{
2056+ for (size_t step_idx = 0 ; step_idx < prewhere_steps.size (); ++step_idx)
20972057 {
2098- const Step & step = steps .at (step_idx - 1 );
2058+ const PrewhereStep & step = prewhere_steps .at (step_idx);
20992059
21002060 Block block;
21012061 for (size_t idx_in_output_block : step.input_idxs )
21022062 {
21032063 const ColumnWithTypeAndName & col = extended_sample_block.getByPosition (idx_in_output_block);
21042064 block.insert ({getOrFormOutputColumn (row_subgroup, idx_in_output_block), col.type , col.name });
21052065 }
2106- addDummyColumnWithRowCount (block, row_subgroup.filter .rows_pass );
2107-
2108- ProfileEvents::increment (ProfileEvents::ParquetRowsFilterExpression, block.rows ());
2109- ProfileEvents::increment (ProfileEvents::ParquetColumnsFilterExpression, block.columns ());
2066+ addDummyColumnWithRowCount (block, row_subgroup.filter .rows_total );
21102067
2111- if (block.rows () == 0 )
2112- {
2113- row_subgroup.filter .rows_pass = 0 ;
2114- return ;
2115- }
21162068 step.actions .execute (block);
21172069
21182070 for (const auto & [name, idx] : step.idxs_in_output_block )
@@ -2123,44 +2075,45 @@ void Reader::applyPrewhere(RowSubgroup & row_subgroup, const RowGroup & row_grou
21232075 }
21242076
21252077 // / If it's the last prewhere step, deallocate the columns that were only needed for prewhere.
2126- if (step_idx == steps .size ())
2078+ if (step_idx == prewhere_steps .size () - 1 )
21272079 {
21282080 while (row_subgroup.output .size () > sample_block->columns ())
21292081 row_subgroup.output .pop_back (); // because OutputColumnState has no move constructor
21302082 }
21312083
21322084 if (!step.filter_column_name .has_value ())
2133- return ;
2085+ continue ;
21342086
21352087 ColumnPtr filter_column = block.getByName (step.filter_column_name .value ()).column ;
21362088 filter_column = FilterDescription::preprocessFilterColumn (std::move (filter_column));
21372089 const IColumnFilter & filter = typeid_cast<const ColumnUInt8 &>(*filter_column).getData ();
21382090 chassert (filter.size () == row_subgroup.filter .rows_pass );
21392091
2140- for (size_t i = 0 ; i < filter_column->size (); ++i)
2141- {
2142- Field field;
2143- filter_column->get (i, field);
2144- }
21452092 size_t rows_pass = countBytesInFilter (filter.data (), 0 , filter.size ());
21462093 if (rows_pass == 0 || !row_group.need_to_process )
21472094 {
21482095 // / Whole row group was filtered out.
21492096 row_subgroup.filter .rows_pass = 0 ;
21502097 return ;
21512098 }
2099+ if (rows_pass == filter.size ())
2100+ // / Nothing was filtered out.
2101+ continue ;
21522102
21532103 // / Filter columns that were already read.
2104+
21542105 for (auto & state : row_subgroup.output )
21552106 if (state.column )
21562107 state.column = state.column ->filter (filter, /* result_size_hint=*/ rows_pass);
21572108
21582109 // / Expand the filter to correspond to all column subchunk rows, rather than only rows that
21592110 // / passed previous filters (previous prewhere steps).
2111+
21602112 auto mut_col = IColumn::mutate (std::move (filter_column));
21612113 auto & mut_filter = typeid_cast<ColumnUInt8 &>(*mut_col);
21622114 if (row_subgroup.filter .rows_pass != row_subgroup.filter .rows_total )
21632115 mut_filter.expand (row_subgroup.filter .filter , /* inverted*/ false );
2116+
21642117 row_subgroup.filter .filter = std::move (mut_filter.getData ());
21652118 row_subgroup.filter .rows_pass = rows_pass;
21662119 }
0 commit comments