Skip to content

Commit 3ec6ce6

Browse files
Backport #95496 to 26.1: Revert "Merge pull request #93542 from scanhex12/multistage_prewhere"
1 parent a190f94 commit 3ec6ce6

File tree

14 files changed

+182
-645
lines changed

14 files changed

+182
-645
lines changed

src/Common/ProfileEvents.cpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1254,8 +1254,6 @@ The server successfully detected this situation and will download merged part fr
12541254
M(ParquetPrefetcherReadRandomRead, "The total number of reads with ReadMode::RandomRead by DB::Parquet::Prefetcher", ValueType::Number) \
12551255
M(ParquetPrefetcherReadSeekAndRead, "The total number of reads with ReadMode::SeekAndRead by DB::Parquet::Prefetcher", ValueType::Number) \
12561256
M(ParquetPrefetcherReadEntireFile, "The total number of read with ReadMode::EntireFileIsInMemory by DB::Parquet::Prefetcher", ValueType::Number) \
1257-
M(ParquetRowsFilterExpression, "The total number of rows that were passed through filter", ValueType::Number) \
1258-
M(ParquetColumnsFilterExpression, "The total number of columns that were passed through filter", ValueType::Number) \
12591257
M(FilterTransformPassedRows, "Number of rows that passed the filter in the query", ValueType::Number) \
12601258
M(FilterTransformPassedBytes, "Number of bytes that passed the filter in the query", ValueType::Bytes) \
12611259
\

src/Processors/Formats/Impl/Parquet/ReadCommon.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,11 @@ enum class ReadStage
9393
BloomFilterBlocksOrDictionary,
9494
ColumnIndexAndOffsetIndex,
9595

96-
OffsetIndex,
97-
ColumnData,
96+
PrewhereOffsetIndex,
97+
PrewhereData,
98+
99+
MainOffsetIndex, // "main" means columns that are not in prewhere
100+
MainData,
98101

99102
Deliver,
100103

src/Processors/Formats/Impl/Parquet/ReadManager.cpp

Lines changed: 129 additions & 336 deletions
Large diffs are not rendered by default.

src/Processors/Formats/Impl/Parquet/ReadManager.h

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,13 @@ struct AtomicBitSet
1313

1414
bool set(size_t i, std::memory_order memory_order)
1515
{
16-
UInt64 mask = 1ull << (i & 63);
16+
UInt64 mask = 1ul << (i & 63);
1717
UInt64 x = a[i >> 6].fetch_or(mask, memory_order);
1818
return (x & mask) == 0;
1919
}
2020
bool unset(size_t i, std::memory_order memory_order)
2121
{
22-
UInt64 mask = 1ull << (i & 63);
22+
UInt64 mask = 1ul << (i & 63);
2323
UInt64 x = a[i >> 6].fetch_and(~mask, memory_order);
2424
return (x & mask) != 0;
2525
}
@@ -67,7 +67,6 @@ class ReadManager
6767
struct Task
6868
{
6969
ReadStage stage;
70-
size_t step_idx = 0; /// 0 = main step, (>=1) = prewhere steps
7170
size_t row_group_idx;
7271
size_t row_subgroup_idx = UINT64_MAX;
7372
size_t column_idx = UINT64_MAX;
@@ -119,16 +118,15 @@ class ReadManager
119118
void runBatchOfTasks(const std::vector<Task> & tasks) noexcept;
120119
void scheduleTasksIfNeeded(ReadStage stage_idx);
121120
void finishRowGroupStage(size_t row_group_idx, ReadStage stage, MemoryUsageDiff & diff);
122-
void finishRowSubgroupStage(size_t row_group_idx, size_t row_subgroup_idx, ReadStage stage, size_t step_idx, MemoryUsageDiff & diff);
121+
void finishRowSubgroupStage(size_t row_group_idx, size_t row_subgroup_idx, ReadStage stage, MemoryUsageDiff & diff);
123122
/// Free some memory ColumnChunk that's not needed after decoding is done in all row sugroups.
124123
/// Call sites should be careful to not call it from multiple threads in parallel.
125124
void clearColumnChunk(ColumnChunk & column, MemoryUsageDiff & diff);
126125
void clearRowSubgroup(RowSubgroup & row_subgroup, MemoryUsageDiff & diff);
127126
void setTasksToSchedule(size_t row_group_idx, ReadStage stage, std::vector<Task> add_tasks, MemoryUsageDiff & diff);
128-
void addTasksToReadColumns(size_t row_group_idx, size_t row_subgroup_idx, ReadStage stage, size_t step_idx, MemoryUsageDiff & diff);
127+
void addTasksToReadColumns(size_t row_group_idx, size_t row_subgroup_idx, ReadStage stage, MemoryUsageDiff & diff);
129128
void advanceDeliveryPtrIfNeeded(size_t row_group_idx, MemoryUsageDiff & diff);
130129
void flushMemoryUsageDiff(MemoryUsageDiff && diff);
131-
std::string collectDeadlockDiagnostics();
132130
};
133131

134132
}

src/Processors/Formats/Impl/Parquet/Reader.cpp

Lines changed: 18 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@
1414
#include <Processors/Formats/Impl/Parquet/Reader.h>
1515
#include <Processors/Formats/Impl/Parquet/SchemaConverter.h>
1616
#include <Storages/SelectQueryInfo.h>
17-
#include <Storages/MergeTree/MergeTreeRangeReader.h>
18-
#include <Storages/MergeTree/MergeTreeSplitPrewhereIntoReadSteps.h>
1917

2018
#include <mutex>
2119
#include <lz4.h>
@@ -36,12 +34,6 @@ namespace DB::ErrorCodes
3634
extern const int CHECKSUM_DOESNT_MATCH;
3735
}
3836

39-
namespace ProfileEvents
40-
{
41-
extern const Event ParquetRowsFilterExpression;
42-
extern const Event ParquetColumnsFilterExpression;
43-
}
44-
4537
namespace DB::Parquet
4638
{
4739

@@ -668,12 +660,11 @@ void Reader::preparePrewhere()
668660

669661
/// Convert ActionsDAG to ExpressionActions.
670662
std::optional<ExpressionActionsSettings> actions_settings;
671-
672-
auto add_single_step = [&] (const ActionsDAG & dag, const String & filter_column_name, bool needs_filter, size_t step_idx)
663+
auto add_step = [&](const ActionsDAG & dag, const String & filter_column_name, bool needs_filter)
673664
{
674665
if (!actions_settings.has_value())
675666
actions_settings.emplace();
676-
Step step { .actions = ExpressionActions(dag.clone(), actions_settings.value()) };
667+
PrewhereStep step { .actions = ExpressionActions(dag.clone(), actions_settings.value()) };
677668
if (needs_filter)
678669
step.filter_column_name = filter_column_name;
679670

@@ -685,13 +676,12 @@ void Reader::preparePrewhere()
685676
if (output_idx.has_value())
686677
{
687678
OutputColumnInfo & output_info = output_columns[output_idx.value()];
688-
output_info.step_idx = step_idx + 1;
679+
output_info.use_prewhere = true;
689680
bool only_for_prewhere = idx_in_output_block >= sample_block->columns();
690681

691682
for (size_t primitive_idx = output_info.primitive_start; primitive_idx < output_info.primitive_end; ++primitive_idx)
692683
{
693-
//primitive_columns[primitive_idx].use_prewhere = true;
694-
primitive_columns[primitive_idx].steps_to_calculate.insert(steps.size() + 1);
684+
primitive_columns[primitive_idx].use_prewhere = true;
695685
primitive_columns[primitive_idx].only_for_prewhere = only_for_prewhere;
696686
}
697687
}
@@ -715,37 +705,7 @@ void Reader::preparePrewhere()
715705
}
716706
}
717707

718-
steps.push_back(std::move(step));
719-
};
720-
721-
auto add_step = [&](const ActionsDAG & dag, const String & filter_column_name, bool needs_filter)
722-
{
723-
if (!actions_settings.has_value())
724-
actions_settings.emplace();
725-
726-
auto prewhere_info_patched = std::make_shared<PrewhereInfo>(dag.clone(), filter_column_name);
727-
prewhere_info_patched->need_filter = needs_filter;
728-
PrewhereExprInfo prewhere_expr_info;
729-
bool success = tryBuildPrewhereSteps(
730-
prewhere_info_patched,
731-
*actions_settings,
732-
prewhere_expr_info,
733-
/*force_short_circuit_execution*/ false);
734-
735-
if (success)
736-
{
737-
/// Add all steps separately.
738-
for (size_t i = 0; i < prewhere_expr_info.steps.size(); ++i)
739-
{
740-
auto filter = prewhere_expr_info.steps[i];
741-
add_single_step(filter->actions->getActionsDAG(), filter->filter_column_name, true, i);
742-
}
743-
}
744-
else
745-
{
746-
/// Execute everything as one large step
747-
add_single_step(dag, filter_column_name, needs_filter, 0);
748-
}
708+
prewhere_steps.push_back(std::move(step));
749709
};
750710

751711
if (row_level_filter)
@@ -1118,7 +1078,6 @@ void Reader::intersectColumnIndexResultsAndInitSubgroups(RowGroup & row_group)
11181078
}
11191079
}
11201080
row_group.intersected_row_ranges_after_column_index = std::move(row_ranges);
1121-
row_group.next_subgroup_for_step = std::vector<std::atomic<size_t>>(steps.size() + 1);
11221081
}
11231082

11241083
void Reader::decodeOffsetIndex(ColumnChunk & column, const RowGroup & row_group)
@@ -1400,7 +1359,7 @@ void Reader::decodePrimitiveColumn(ColumnChunk & column, const PrimitiveColumnIn
14001359
}
14011360

14021361
if (subchunk.arrays_offsets.empty() && subchunk.column->size() != row_subgroup.filter.rows_pass)
1403-
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected number of rows in column subchunk {} {}", subchunk.column->size(), row_subgroup.filter.rows_pass);
1362+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected number of rows in column subchunk");
14041363

14051364
if (column_info.output_nullable)
14061365
{
@@ -2092,27 +2051,20 @@ ColumnPtr & Reader::getOrFormOutputColumn(RowSubgroup & row_subgroup, size_t idx
20922051
return state.column;
20932052
}
20942053

2095-
void Reader::applyPrewhere(RowSubgroup & row_subgroup, const RowGroup & row_group, size_t step_idx)
2054+
void Reader::applyPrewhere(RowSubgroup & row_subgroup, const RowGroup & row_group)
20962055
{
2056+
for (size_t step_idx = 0; step_idx < prewhere_steps.size(); ++step_idx)
20972057
{
2098-
const Step & step = steps.at(step_idx - 1);
2058+
const PrewhereStep & step = prewhere_steps.at(step_idx);
20992059

21002060
Block block;
21012061
for (size_t idx_in_output_block : step.input_idxs)
21022062
{
21032063
const ColumnWithTypeAndName & col = extended_sample_block.getByPosition(idx_in_output_block);
21042064
block.insert({getOrFormOutputColumn(row_subgroup, idx_in_output_block), col.type, col.name});
21052065
}
2106-
addDummyColumnWithRowCount(block, row_subgroup.filter.rows_pass);
2107-
2108-
ProfileEvents::increment(ProfileEvents::ParquetRowsFilterExpression, block.rows());
2109-
ProfileEvents::increment(ProfileEvents::ParquetColumnsFilterExpression, block.columns());
2066+
addDummyColumnWithRowCount(block, row_subgroup.filter.rows_total);
21102067

2111-
if (block.rows() == 0)
2112-
{
2113-
row_subgroup.filter.rows_pass = 0;
2114-
return;
2115-
}
21162068
step.actions.execute(block);
21172069

21182070
for (const auto & [name, idx] : step.idxs_in_output_block)
@@ -2123,44 +2075,45 @@ void Reader::applyPrewhere(RowSubgroup & row_subgroup, const RowGroup & row_grou
21232075
}
21242076

21252077
/// If it's the last prewhere step, deallocate the columns that were only needed for prewhere.
2126-
if (step_idx == steps.size())
2078+
if (step_idx == prewhere_steps.size() - 1)
21272079
{
21282080
while (row_subgroup.output.size() > sample_block->columns())
21292081
row_subgroup.output.pop_back(); // because OutputColumnState has no move constructor
21302082
}
21312083

21322084
if (!step.filter_column_name.has_value())
2133-
return;
2085+
continue;
21342086

21352087
ColumnPtr filter_column = block.getByName(step.filter_column_name.value()).column;
21362088
filter_column = FilterDescription::preprocessFilterColumn(std::move(filter_column));
21372089
const IColumnFilter & filter = typeid_cast<const ColumnUInt8 &>(*filter_column).getData();
21382090
chassert(filter.size() == row_subgroup.filter.rows_pass);
21392091

2140-
for (size_t i = 0; i < filter_column->size(); ++i)
2141-
{
2142-
Field field;
2143-
filter_column->get(i, field);
2144-
}
21452092
size_t rows_pass = countBytesInFilter(filter.data(), 0, filter.size());
21462093
if (rows_pass == 0 || !row_group.need_to_process)
21472094
{
21482095
/// Whole row group was filtered out.
21492096
row_subgroup.filter.rows_pass = 0;
21502097
return;
21512098
}
2099+
if (rows_pass == filter.size())
2100+
/// Nothing was filtered out.
2101+
continue;
21522102

21532103
/// Filter columns that were already read.
2104+
21542105
for (auto & state : row_subgroup.output)
21552106
if (state.column)
21562107
state.column = state.column->filter(filter, /*result_size_hint=*/ rows_pass);
21572108

21582109
/// Expand the filter to correspond to all column subchunk rows, rather than only rows that
21592110
/// passed previous filters (previous prewhere steps).
2111+
21602112
auto mut_col = IColumn::mutate(std::move(filter_column));
21612113
auto & mut_filter = typeid_cast<ColumnUInt8 &>(*mut_col);
21622114
if (row_subgroup.filter.rows_pass != row_subgroup.filter.rows_total)
21632115
mut_filter.expand(row_subgroup.filter.filter, /*inverted*/ false);
2116+
21642117
row_subgroup.filter.filter = std::move(mut_filter.getData());
21652118
row_subgroup.filter.rows_pass = rows_pass;
21662119
}

src/Processors/Formats/Impl/Parquet/Reader.h

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ struct Reader
174174

175175
bool use_bloom_filter = false;
176176
const KeyCondition * column_index_condition = nullptr;
177-
std::unordered_set<size_t> steps_to_calculate;
177+
bool use_prewhere = false;
178178
bool only_for_prewhere = false; // can remove this column after applying prewhere
179179

180180
bool used_by_key_condition = false;
@@ -206,7 +206,7 @@ struct Reader
206206
/// `rep - 1` is index in ColumnChunk::arrays_offsets.
207207
UInt8 rep = 0;
208208

209-
size_t step_idx = 0;
209+
bool use_prewhere = false;
210210
};
211211

212212
struct RowSet
@@ -416,15 +416,19 @@ struct Reader
416416

417417

418418
/// Fields below are used only by ReadManager.
419-
std::vector<std::atomic<size_t>> next_subgroup_for_step;
420419

420+
/// Indexes of the first subgroup that didn't finish
421+
/// {prewhere, reading main columns, delivering final chunk}.
422+
/// delivery_ptr <= read_ptr <= prewhere_ptr <= subgroups.size()
423+
std::atomic<size_t> prewhere_ptr {0};
424+
std::atomic<size_t> read_ptr {0};
421425
std::atomic<size_t> delivery_ptr {0};
422426

423427
std::atomic<ReadStage> stage {ReadStage::NotStarted};
424428
std::atomic<size_t> stage_tasks_remaining {0};
425429
};
426430

427-
struct Step
431+
struct PrewhereStep
428432
{
429433
ExpressionActions actions;
430434
std::optional<String> filter_column_name {};
@@ -473,7 +477,7 @@ struct Reader
473477
/// (Why not just add them to sample_block? To avoid unnecessarily applying filter to them.)
474478
Block extended_sample_block;
475479
DataTypes extended_sample_block_data_types; // = extended_sample_block.getDataTypes()
476-
std::vector<Step> steps;
480+
std::vector<PrewhereStep> prewhere_steps;
477481

478482
std::optional<KeyCondition> bloom_filter_condition;
479483

@@ -515,7 +519,7 @@ struct Reader
515519
MutableColumnPtr formOutputColumn(RowSubgroup & row_subgroup, size_t output_column_idx, size_t num_rows);
516520
ColumnPtr & getOrFormOutputColumn(RowSubgroup & row_subgroup, size_t idx_in_output_block);
517521

518-
void applyPrewhere(RowSubgroup & row_subgroup, const RowGroup & row_group, size_t step_idx);
522+
void applyPrewhere(RowSubgroup & row_subgroup, const RowGroup & row_group);
519523

520524
private:
521525
struct BloomFilterLookup : public KeyCondition::BloomFilter

src/Storages/MergeTree/MergeTreeSelectProcessor.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
#include <Common/ElapsedTimeProfileEventIncrement.h>
2121
#include <Common/OpenTelemetryTraceContext.h>
2222
#include <Storages/MergeTree/MergeTreeReadTask.h>
23-
#include <Storages/MergeTree/MergeTreeSplitPrewhereIntoReadSteps.h>
2423

2524
namespace
2625
{
@@ -166,6 +165,12 @@ String MergeTreeSelectProcessor::getName() const
166165
return fmt::format("MergeTreeSelect(pool: {}, algorithm: {})", pool->getName(), algorithm->getName());
167166
}
168167

168+
bool tryBuildPrewhereSteps(
169+
PrewhereInfoPtr prewhere_info,
170+
const ExpressionActionsSettings & actions_settings,
171+
PrewhereExprInfo & prewhere,
172+
bool force_short_circuit_execution);
173+
169174
PrewhereExprInfo MergeTreeSelectProcessor::getPrewhereActions(
170175
const FilterDAGInfoPtr & row_level_filter,
171176
const PrewhereInfoPtr & prewhere_info,

src/Storages/MergeTree/MergeTreeSplitPrewhereIntoReadSteps.cpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,12 @@
1-
#include <Storages/MergeTree/MergeTreeSplitPrewhereIntoReadSteps.h>
1+
#include <Functions/CastOverloadResolver.h>
2+
#include <Functions/FunctionsLogical.h>
3+
#include <Functions/IFunctionAdaptors.h>
4+
#include <Storages/SelectQueryInfo.h>
5+
#include <Storages/MergeTree/MergeTreeRangeReader.h>
6+
#include <DataTypes/DataTypeNullable.h>
7+
#include <DataTypes/DataTypeString.h>
8+
#include <DataTypes/DataTypeLowCardinality.h>
9+
#include <Interpreters/ExpressionActions.h>
210

311

412
namespace DB

src/Storages/MergeTree/MergeTreeSplitPrewhereIntoReadSteps.h

Lines changed: 0 additions & 22 deletions
This file was deleted.

src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
#include <Formats/FormatParserSharedResources.h>
2020
#include <memory>
2121
#include <string>
22-
#include <type_traits>
2322

2423
#include <Common/ErrorCodes.h>
2524
#include <Common/filesystemHelpers.h>

0 commit comments

Comments
 (0)