Skip to content

Commit 52af79c

Browse files
Revert "Revert "Revert "Merge pull request #93542 from scanhex12/multistage_prewhere"""
1 parent 9248c1a commit 52af79c

21 files changed

+278
-1026
lines changed

src/Common/ProfileEvents.cpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1259,8 +1259,6 @@ The server successfully detected this situation and will download merged part fr
12591259
M(ParquetPrefetcherReadRandomRead, "The total number of reads with ReadMode::RandomRead by DB::Parquet::Prefetcher", ValueType::Number) \
12601260
M(ParquetPrefetcherReadSeekAndRead, "The total number of reads with ReadMode::SeekAndRead by DB::Parquet::Prefetcher", ValueType::Number) \
12611261
M(ParquetPrefetcherReadEntireFile, "The total number of read with ReadMode::EntireFileIsInMemory by DB::Parquet::Prefetcher", ValueType::Number) \
1262-
M(ParquetRowsFilterExpression, "The total number of rows that were passed through filter", ValueType::Number) \
1263-
M(ParquetColumnsFilterExpression, "The total number of columns that were passed through filter", ValueType::Number) \
12641262
M(FilterTransformPassedRows, "Number of rows that passed the filter in the query", ValueType::Number) \
12651263
M(FilterTransformPassedBytes, "Number of bytes that passed the filter in the query", ValueType::Bytes) \
12661264
\

src/Processors/Formats/Impl/Parquet/Decoding.cpp

Lines changed: 43 additions & 299 deletions
Large diffs are not rendered by default.

src/Processors/Formats/Impl/Parquet/Decoding.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ struct Dictionary
5353
struct PageDecoder
5454
{
5555
virtual void skip(size_t num_values) = 0;
56-
virtual void decode(size_t num_values, IColumn & col, const UInt8 * filter, size_t filter_offset) = 0;
56+
virtual void decode(size_t num_values, IColumn &) = 0;
5757

5858
explicit PageDecoder(std::span<const char> data_) : data(data_.data()), end(data_.data() + data_.size()) {}
5959
virtual ~PageDecoder() = default;

src/Processors/Formats/Impl/Parquet/ReadCommon.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,11 @@ enum class ReadStage
9393
BloomFilterBlocksOrDictionary,
9494
ColumnIndexAndOffsetIndex,
9595

96-
OffsetIndex,
97-
ColumnData,
96+
PrewhereOffsetIndex,
97+
PrewhereData,
98+
99+
MainOffsetIndex, // "main" means columns that are not in prewhere
100+
MainData,
98101

99102
Deliver,
100103

src/Processors/Formats/Impl/Parquet/ReadManager.cpp

Lines changed: 146 additions & 301 deletions
Large diffs are not rendered by default.

src/Processors/Formats/Impl/Parquet/ReadManager.h

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,13 @@ struct AtomicBitSet
1313

1414
bool set(size_t i, std::memory_order memory_order)
1515
{
16-
UInt64 mask = 1ull << (i & 63);
16+
UInt64 mask = 1ul << (i & 63);
1717
UInt64 x = a[i >> 6].fetch_or(mask, memory_order);
1818
return (x & mask) == 0;
1919
}
2020
bool unset(size_t i, std::memory_order memory_order)
2121
{
22-
UInt64 mask = 1ull << (i & 63);
22+
UInt64 mask = 1ul << (i & 63);
2323
UInt64 x = a[i >> 6].fetch_and(~mask, memory_order);
2424
return (x & mask) != 0;
2525
}
@@ -67,7 +67,6 @@ class ReadManager
6767
struct Task
6868
{
6969
ReadStage stage;
70-
size_t step_idx = 0; /// 0 = main step, (>=1) = prewhere steps
7170
size_t row_group_idx;
7271
size_t row_subgroup_idx = UINT64_MAX;
7372
size_t column_idx = UINT64_MAX;
@@ -119,16 +118,15 @@ class ReadManager
119118
void runBatchOfTasks(const std::vector<Task> & tasks) noexcept;
120119
void scheduleTasksIfNeeded(ReadStage stage_idx);
121120
void finishRowGroupStage(size_t row_group_idx, ReadStage stage, MemoryUsageDiff & diff);
122-
void finishRowSubgroupStage(size_t row_group_idx, size_t row_subgroup_idx, ReadStage stage, size_t step_idx, MemoryUsageDiff & diff);
121+
void finishRowSubgroupStage(size_t row_group_idx, size_t row_subgroup_idx, ReadStage stage, MemoryUsageDiff & diff);
123122
/// Free some memory ColumnChunk that's not needed after decoding is done in all row sugroups.
124123
/// Call sites should be careful to not call it from multiple threads in parallel.
125124
void clearColumnChunk(ColumnChunk & column, MemoryUsageDiff & diff);
126125
void clearRowSubgroup(RowSubgroup & row_subgroup, MemoryUsageDiff & diff);
127126
void setTasksToSchedule(size_t row_group_idx, ReadStage stage, std::vector<Task> add_tasks, MemoryUsageDiff & diff);
128-
void addTasksToReadColumns(size_t row_group_idx, size_t row_subgroup_idx, ReadStage stage, size_t step_idx, MemoryUsageDiff & diff);
127+
void addTasksToReadColumns(size_t row_group_idx, size_t row_subgroup_idx, ReadStage stage, MemoryUsageDiff & diff);
129128
void advanceDeliveryPtrIfNeeded(size_t row_group_idx, MemoryUsageDiff & diff);
130129
void flushMemoryUsageDiff(MemoryUsageDiff && diff);
131-
std::string collectDeadlockDiagnostics();
132130
};
133131

134132
}

0 commit comments

Comments
 (0)