Skip to content

Commit af12721

Browse files
Merge branch 'backport/25.8/93827' into backport-93827-2
1 parent 11117b5 commit af12721

File tree

8 files changed

+76
-34
lines changed

8 files changed

+76
-34
lines changed

src/Storages/MergeTree/MergeTask.cpp

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -352,9 +352,13 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::extractMergingAndGatheringColu
352352

353353
for (const auto * projection : global_ctx->projections_to_rebuild)
354354
{
355-
Names projection_columns_vec = projection->getRequiredColumns();
356-
std::copy(projection_columns_vec.cbegin(), projection_columns_vec.cend(),
357-
std::inserter(key_columns, key_columns.end()));
355+
for (const auto & column : projection->getRequiredColumns())
356+
{
357+
if (projection->with_parent_part_offset && column == "_part_offset")
358+
continue;
359+
360+
key_columns.insert(getColumnNameInStorage(column, storage_columns));
361+
}
358362
}
359363

360364
/// TODO: also force "summing" and "aggregating" columns to make Horizontal merge only for such columns
@@ -888,16 +892,12 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::prepareProjectionsToMergeAndRe
888892
}
889893

890894

891-
void MergeTask::ExecuteAndFinalizeHorizontalPart::calculateProjections(const Block & block) const
895+
void MergeTask::ExecuteAndFinalizeHorizontalPart::calculateProjections(const Block & block, UInt64 starting_offset) const
892896
{
893897
for (size_t i = 0, size = global_ctx->projections_to_rebuild.size(); i < size; ++i)
894898
{
895899
const auto & projection = *global_ctx->projections_to_rebuild[i];
896-
Block block_with_required_columns;
897-
for (const auto & name : projection.getRequiredColumns())
898-
if (name != "_part_offset")
899-
block_with_required_columns.insert(block.getByName(name));
900-
Block block_to_squash = projection.calculate(block_with_required_columns, global_ctx->context);
900+
Block block_to_squash = projection.calculate(block, starting_offset, global_ctx->context);
901901
/// Avoid replacing the projection squash header if nothing was generated (it used to return an empty block)
902902
if (block_to_squash.rows() == 0)
903903
return;
@@ -1013,6 +1013,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl() const
10131013
}
10141014
}
10151015

1016+
size_t starting_offset = global_ctx->rows_written;
10161017
global_ctx->rows_written += block.rows();
10171018
const_cast<MergedBlockOutputStream &>(*global_ctx->to).write(block);
10181019

@@ -1022,7 +1023,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl() const
10221023
block, MergeTreeData::getMinMaxColumnsNames(global_ctx->metadata_snapshot->getPartitionKey()));
10231024
}
10241025

1025-
calculateProjections(block);
1026+
calculateProjections(block, starting_offset);
10261027

10271028
UInt64 result_rows = 0;
10281029
UInt64 result_bytes = 0;

src/Storages/MergeTree/MergeTask.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ class MergeTask
314314
ExecuteAndFinalizeHorizontalPartSubtasks::const_iterator subtasks_iterator = subtasks.begin();
315315

316316
void prepareProjectionsToMergeAndRebuild() const;
317-
void calculateProjections(const Block & block) const;
317+
void calculateProjections(const Block & block, UInt64 starting_offset) const;
318318
void finalizeProjections() const;
319319
void constructTaskForProjectionPartsMerge() const;
320320
bool executeMergeProjections() const;

src/Storages/MergeTree/MergeTreeDataWriter.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -808,7 +808,7 @@ MergeTreeTemporaryPartPtr MergeTreeDataWriter::writeTempPartImpl(
808808
Block projection_block;
809809
{
810810
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::MergeTreeDataWriterProjectionsCalculationMicroseconds);
811-
projection_block = projection.calculate(block, context, perm_ptr);
811+
projection_block = projection.calculate(block, 0, context, perm_ptr);
812812
LOG_DEBUG(
813813
log, "Spent {} ms calculating projection {} for the part {}", watch.elapsed() / 1000, projection.name, new_data_part->name);
814814
}

src/Storages/MergeTree/MutateTask.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1369,13 +1369,14 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections()
13691369
if (ctx->count_lightweight_deleted_rows)
13701370
existing_rows_count += MutationHelpers::getExistingRowsCount(cur_block);
13711371

1372+
UInt64 starting_offset = (*ctx->mutate_entry)->rows_written;
13721373
for (size_t i = 0, size = ctx->projections_to_build.size(); i < size; ++i)
13731374
{
13741375
Chunk squashed_chunk;
13751376

13761377
{
13771378
ProfileEventTimeIncrement<Microseconds> projection_watch(ProfileEvents::MutateTaskProjectionsCalculationMicroseconds);
1378-
Block block_to_squash = ctx->projections_to_build[i]->calculate(cur_block, ctx->context);
1379+
Block block_to_squash = ctx->projections_to_build[i]->calculate(cur_block, starting_offset, ctx->context);
13791380

13801381
/// Everything is deleted by lighweight delete
13811382
if (block_to_squash.rows() == 0)

src/Storages/ProjectionsDescription.cpp

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,7 @@ ProjectionDescription::getProjectionFromAST(const ASTPtr & definition_ast, const
307307
result.sample_block.erase("_part_offset");
308308
result.sample_block.insert(std::move(new_column));
309309
result.with_parent_part_offset = true;
310+
std::erase_if(result.required_columns, [](const String & s) { return s.contains("_part_offset"); });
310311
}
311312

312313
auto block = result.sample_block;
@@ -436,7 +437,8 @@ void ProjectionDescription::recalculateWithNewColumns(const ColumnsDescription &
436437
*this = getProjectionFromAST(definition_ast, new_columns, query_context);
437438
}
438439

439-
Block ProjectionDescription::calculate(const Block & block, ContextPtr context, const IColumnPermutation * perm_ptr) const
440+
Block ProjectionDescription::calculate(
441+
const Block & block, UInt64 starting_offset, ContextPtr context, const IColumnPermutation * perm_ptr) const
440442
{
441443
auto mut_context = Context::createCopy(context);
442444
/// We ignore aggregate_functions_null_for_empty cause it changes aggregate function types.
@@ -458,31 +460,34 @@ Block ProjectionDescription::calculate(const Block & block, ContextPtr context,
458460
makeASTFunction("equals", std::make_shared<ASTIdentifier>(RowExistsColumn::name), std::make_shared<ASTLiteral>(1)));
459461
}
460462

461-
/// Create "_part_offset" column when needed for projection with parent part offsets
463+
/// Only keep required columns
462464
Block source_block = block;
465+
for (const auto & column : required_columns)
466+
source_block.insert(block.getByName(column));
467+
468+
/// Create "_part_offset" column when needed for projection with parent part offsets
463469
if (with_parent_part_offset)
464470
{
465471
chassert(sample_block.has("_parent_part_offset"));
466-
467-
/// Add "_part_offset" column if not present (needed for insertions but not mutations - materialize projections)
468-
if (!source_block.has("_part_offset"))
472+
chassert(!source_block.has("_part_offset"));
473+
auto uint64 = std::make_shared<DataTypeUInt64>();
474+
auto column = uint64->createColumn();
475+
auto & offset = assert_cast<ColumnUInt64 &>(*column).getData();
476+
offset.resize_exact(block.rows());
477+
if (perm_ptr)
469478
{
470-
auto uint64 = std::make_shared<DataTypeUInt64>();
471-
auto column = uint64->createColumn();
472-
auto & offset = assert_cast<ColumnUInt64 &>(*column).getData();
473-
offset.resize_exact(block.rows());
474-
if (perm_ptr)
475-
{
476-
for (size_t i = 0; i < block.rows(); ++i)
477-
offset[(*perm_ptr)[i]] = i;
478-
}
479-
else
480-
{
481-
iota(offset.data(), offset.size(), UInt64(0));
482-
}
483-
484-
source_block.insert({std::move(column), std::move(uint64), "_part_offset"});
479+
/// Insertion path
480+
chassert(starting_offset == 0);
481+
for (size_t i = 0; i < block.rows(); ++i)
482+
offset[(*perm_ptr)[i]] = i;
485483
}
484+
else
485+
{
486+
/// Rebuilding path
487+
iota(offset.data(), offset.size(), starting_offset);
488+
}
489+
490+
source_block.insert({std::move(column), std::move(uint64), "_part_offset"});
486491
}
487492

488493
auto builder = InterpreterSelectQuery(

src/Storages/ProjectionsDescription.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,11 @@ struct ProjectionDescription
103103
* @brief Calculates the projection result for a given input block.
104104
*
105105
* @param block The input block used to evaluate the projection.
106+
* @param starting_offset The absolute starting row index of the current `block` within the
107+
* source data part. It is used to calculate the value of the virtual `_part_offset`
108+
* column (i.e., `_part_offset = starting_offset + row_index`). This column is
109+
* essential for mapping projection rows back to their original positions in the
110+
* parent part during merge or mutation.
106111
* @param context The query context. A copy will be made internally with adjusted settings.
107112
* @param perm_ptr Optional pointer to a permutation vector. If provided, it is used to map
108113
* the output rows back to their original order in the parent block. This is necessary
@@ -111,7 +116,7 @@ struct ProjectionDescription
111116
*
112117
* @return The resulting block after executing the projection query.
113118
*/
114-
Block calculate(const Block & block, ContextPtr context, const IColumnPermutation * perm_ptr = nullptr) const;
119+
Block calculate(const Block & block, UInt64 starting_offset, ContextPtr context, const IColumnPermutation * perm_ptr = nullptr) const;
115120

116121
String getDirectoryName() const { return name + ".proj"; }
117122
};
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
200000
2+
200000
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
DROP TABLE IF EXISTS test;
2+
3+
CREATE TABLE test
4+
(
5+
`a` Int32,
6+
`b` Int32,
7+
PROJECTION p
8+
(
9+
SELECT
10+
a,
11+
b,
12+
_part_offset
13+
ORDER BY b
14+
)
15+
)
16+
ENGINE = ReplacingMergeTree
17+
ORDER BY a
18+
SETTINGS index_granularity_bytes = 10485760, index_granularity = 8192, deduplicate_merge_projection_mode = 'rebuild';
19+
20+
INSERT INTO test SELECT number * 3, rand() FROM numbers(100000);
21+
INSERT INTO test SELECT number * 3 + 1, rand() FROM numbers(100000);
22+
SELECT sum(l._part_offset = r._parent_part_offset) FROM test l JOIN mergeTreeProjection(currentDatabase(), test, p) r USING (a) SETTINGS enable_analyzer = 1;
23+
24+
OPTIMIZE TABLE test FINAL;
25+
26+
SELECT sum(l._part_offset = r._parent_part_offset) FROM test l JOIN mergeTreeProjection(currentDatabase(), test, p) r USING (a) SETTINGS enable_analyzer = 1;
27+
28+
DROP TABLE test;

0 commit comments

Comments
 (0)