Skip to content

Commit 6364315

Browse files
KochetovNicolaiamosbird
authored andcommitted
Backport #93827 to 25.11: Fix rebuilding of projections with _part_offset during merge
1 parent 07c7d64 commit 6364315

File tree

9 files changed

+87
-37
lines changed

9 files changed

+87
-37
lines changed

src/Storages/MergeTree/MergeTask.cpp

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -358,9 +358,13 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::extractMergingAndGatheringColu
358358

359359
for (const auto * projection : global_ctx->projections_to_rebuild)
360360
{
361-
Names projection_columns_vec = projection->getRequiredColumns();
362-
std::copy(projection_columns_vec.cbegin(), projection_columns_vec.cend(),
363-
std::inserter(key_columns, key_columns.end()));
361+
for (const auto & column : projection->getRequiredColumns())
362+
{
363+
if (projection->with_parent_part_offset && column == "_part_offset")
364+
continue;
365+
366+
key_columns.insert(column);
367+
}
364368
}
365369

366370
/// TODO: also force "summing" and "aggregating" columns to make Horizontal merge only for such columns
@@ -948,16 +952,12 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::prepareProjectionsToMergeAndRe
948952
}
949953

950954

951-
void MergeTask::ExecuteAndFinalizeHorizontalPart::calculateProjections(const Block & block) const
955+
void MergeTask::ExecuteAndFinalizeHorizontalPart::calculateProjections(const Block & block, UInt64 starting_offset) const
952956
{
953957
for (size_t i = 0, size = global_ctx->projections_to_rebuild.size(); i < size; ++i)
954958
{
955959
const auto & projection = *global_ctx->projections_to_rebuild[i];
956-
Block block_with_required_columns;
957-
for (const auto & name : projection.getRequiredColumns())
958-
if (name != "_part_offset")
959-
block_with_required_columns.insert(block.getByName(name));
960-
Block block_to_squash = projection.calculate(block_with_required_columns, global_ctx->context);
960+
Block block_to_squash = projection.calculate(block, starting_offset, global_ctx->context);
961961
/// Avoid replacing the projection squash header if nothing was generated (it used to return an empty block)
962962
if (block_to_squash.rows() == 0)
963963
return;
@@ -1073,6 +1073,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl() const
10731073
}
10741074
}
10751075

1076+
size_t starting_offset = global_ctx->rows_written;
10761077
global_ctx->rows_written += block.rows();
10771078
const_cast<MergedBlockOutputStream &>(*global_ctx->to).write(block);
10781079

@@ -1082,7 +1083,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl() const
10821083
block, MergeTreeData::getMinMaxColumnsNames(global_ctx->metadata_snapshot->getPartitionKey()));
10831084
}
10841085

1085-
calculateProjections(block);
1086+
calculateProjections(block, starting_offset);
10861087

10871088
UInt64 result_rows = 0;
10881089
UInt64 result_bytes = 0;

src/Storages/MergeTree/MergeTask.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@ class MergeTask
315315
ExecuteAndFinalizeHorizontalPartSubtasks::const_iterator subtasks_iterator = subtasks.begin();
316316

317317
void prepareProjectionsToMergeAndRebuild() const;
318-
void calculateProjections(const Block & block) const;
318+
void calculateProjections(const Block & block, UInt64 starting_offset) const;
319319
void finalizeProjections() const;
320320
void constructTaskForProjectionPartsMerge() const;
321321
bool executeMergeProjections() const;

src/Storages/MergeTree/MergeTreeDataWriter.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -946,7 +946,7 @@ MergeTreeTemporaryPartPtr MergeTreeDataWriter::writeTempPartImpl(
946946
Block projection_block;
947947
{
948948
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::MergeTreeDataWriterProjectionsCalculationMicroseconds);
949-
projection_block = projection.calculate(block, context, perm_ptr);
949+
projection_block = projection.calculate(block, 0, context, perm_ptr);
950950
LOG_DEBUG(
951951
log, "Spent {} ms calculating projection {} for the part {}", watch.elapsed() / 1000, projection.name, new_data_part->name);
952952
}

src/Storages/MergeTree/MutateTask.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1419,13 +1419,14 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections()
14191419
if (ctx->count_lightweight_deleted_rows)
14201420
existing_rows_count += MutationHelpers::getExistingRowsCount(cur_block);
14211421

1422+
UInt64 starting_offset = (*ctx->mutate_entry)->rows_written;
14221423
for (size_t i = 0, size = ctx->projections_to_build.size(); i < size; ++i)
14231424
{
14241425
Chunk squashed_chunk;
14251426

14261427
{
14271428
ProfileEventTimeIncrement<Microseconds> projection_watch(ProfileEvents::MutateTaskProjectionsCalculationMicroseconds);
1428-
Block block_to_squash = ctx->projections_to_build[i]->calculate(cur_block, ctx->context);
1429+
Block block_to_squash = ctx->projections_to_build[i]->calculate(cur_block, starting_offset, ctx->context);
14291430

14301431
/// Everything is deleted by lighweight delete
14311432
if (block_to_squash.rows() == 0)

src/Storages/ProjectionsDescription.cpp

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,7 @@ ProjectionDescription::getProjectionFromAST(const ASTPtr & definition_ast, const
323323
result.sample_block.erase("_part_offset");
324324
result.sample_block.insert(std::move(new_column));
325325
result.with_parent_part_offset = true;
326+
std::erase_if(result.required_columns, [](const String & s) { return s.contains("_part_offset"); });
326327
}
327328

328329
auto block = result.sample_block;
@@ -462,7 +463,8 @@ void ProjectionDescription::recalculateWithNewColumns(const ColumnsDescription &
462463
*this = getProjectionFromAST(definition_ast, new_columns, query_context);
463464
}
464465

465-
Block ProjectionDescription::calculate(const Block & block, ContextPtr context, const IColumnPermutation * perm_ptr) const
466+
Block ProjectionDescription::calculate(
467+
const Block & block, UInt64 starting_offset, ContextPtr context, const IColumnPermutation * perm_ptr) const
466468
{
467469
auto mut_context = Context::createCopy(context);
468470
/// We ignore aggregate_functions_null_for_empty cause it changes aggregate function types.
@@ -493,31 +495,34 @@ Block ProjectionDescription::calculate(const Block & block, ContextPtr context,
493495
makeASTOperator("equals", std::make_shared<ASTIdentifier>(RowExistsColumn::name), std::make_shared<ASTLiteral>(1)));
494496
}
495497

496-
/// Create "_part_offset" column when needed for projection with parent part offsets
498+
/// Only keep required columns
497499
Block source_block = block;
500+
for (const auto & column : required_columns)
501+
source_block.insert(block.getByName(column));
502+
503+
/// Create "_part_offset" column when needed for projection with parent part offsets
498504
if (with_parent_part_offset)
499505
{
500506
chassert(sample_block.has("_parent_part_offset"));
501-
502-
/// Add "_part_offset" column if not present (needed for insertions but not mutations - materialize projections)
503-
if (!source_block.has("_part_offset"))
507+
chassert(!source_block.has("_part_offset"));
508+
auto uint64 = std::make_shared<DataTypeUInt64>();
509+
auto column = uint64->createColumn();
510+
auto & offset = assert_cast<ColumnUInt64 &>(*column).getData();
511+
offset.resize_exact(block.rows());
512+
if (perm_ptr)
504513
{
505-
auto uint64 = std::make_shared<DataTypeUInt64>();
506-
auto column = uint64->createColumn();
507-
auto & offset = assert_cast<ColumnUInt64 &>(*column).getData();
508-
offset.resize_exact(block.rows());
509-
if (perm_ptr)
510-
{
511-
for (size_t i = 0; i < block.rows(); ++i)
512-
offset[(*perm_ptr)[i]] = i;
513-
}
514-
else
515-
{
516-
iota(offset.data(), offset.size(), UInt64(0));
517-
}
518-
519-
source_block.insert({std::move(column), std::move(uint64), "_part_offset"});
514+
/// Insertion path
515+
chassert(starting_offset == 0);
516+
for (size_t i = 0; i < block.rows(); ++i)
517+
offset[(*perm_ptr)[i]] = i;
520518
}
519+
else
520+
{
521+
/// Rebuilding path
522+
iota(offset.data(), offset.size(), starting_offset);
523+
}
524+
525+
source_block.insert({std::move(column), std::move(uint64), "_part_offset"});
521526
}
522527

523528
auto builder = InterpreterSelectQuery(

src/Storages/ProjectionsDescription.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,11 @@ struct ProjectionDescription
105105
* @brief Calculates the projection result for a given input block.
106106
*
107107
* @param block The input block used to evaluate the projection.
108+
* @param starting_offset The absolute starting row index of the current `block` within the
109+
* source data part. It is used to calculate the value of the virtual `_part_offset`
110+
* column (i.e., `_part_offset = starting_offset + row_index`). This column is
111+
* essential for mapping projection rows back to their original positions in the
112+
* parent part during merge or mutation.
108113
* @param context The query context. A copy will be made internally with adjusted settings.
109114
* @param perm_ptr Optional pointer to a permutation vector. If provided, it is used to map
110115
* the output rows back to their original order in the parent block. This is necessary
@@ -113,7 +118,7 @@ struct ProjectionDescription
113118
*
114119
* @return The resulting block after executing the projection query.
115120
*/
116-
Block calculate(const Block & block, ContextPtr context, const IColumnPermutation * perm_ptr = nullptr) const;
121+
Block calculate(const Block & block, UInt64 starting_offset, ContextPtr context, const IColumnPermutation * perm_ptr = nullptr) const;
117122

118123
String getDirectoryName() const { return name + ".proj"; }
119124
};

tests/integration/test_projection_rebuild_with_required_columns/test.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,21 @@ def started_cluster():
1919

2020

2121
def test_projection_rebuild_uses_only_required_columns(started_cluster):
22+
# Here we check that projection rebuild does not create too many temporary parts.
23+
# The size of temporary projection part is limited by min_insert_block_size_bytes/min_insert_block_size_rows, so they are changed in the config.
24+
2225
node1.query("drop table if exists tab")
2326
node1.query("create table tab (x UInt64, y UInt64, data String codec(NONE), v UInt8, projection p (select _part_offset order by y)) engine = ReplacingMergeTree(v) order by x settings allow_part_offset_column_in_projections=1, deduplicate_merge_projection_mode='rebuild';")
24-
node1.query("insert into tab select number, number, rightPad('', 100, 'a'), 0 from numbers(30000);")
27+
# Here we expect 3 parts to be inserted, contrilled by max_block_size=min_insert_block_size_rows=10000
28+
node1.query("insert into tab select number, number, rightPad('', 100, 'a'), 0 from numbers(30000) settings max_block_size=10000;")
29+
# Here we merge parts, and projections should be rebuild
30+
# Initially we kept `data` column in projection squash, ~10 temporary parts were created by min_insert_block_size_bytes limit
2531
node1.query("optimize table tab final settings mutations_sync=2, alter_sync=2;")
2632
node1.query("system flush logs;")
2733

2834
uuid = node1.query("select uuid from system.tables where table = 'tab';").strip()
2935
cnt = node1.query("select count() from system.text_log where query_id like '{}::all_%_2' and message like '%Reading%from part p_%from the beginning of the part%'".format(uuid))
30-
assert (cnt == '2\n')
31-
36+
# One projection part per source part
37+
assert (cnt == '3\n')
38+
# Here we check that _parent_part_offset is calculated properly. It was fixed in https://github.com/ClickHouse/ClickHouse/pull/93827
39+
assert(node1.query("select min(_parent_part_offset), max(_parent_part_offset) from mergeTreeProjection(default, tab, 'p')") == '0\t29999\n')
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
200000
2+
200000
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
DROP TABLE IF EXISTS test;
2+
3+
CREATE TABLE test
4+
(
5+
`a` Int32,
6+
`b` Int32,
7+
PROJECTION p
8+
(
9+
SELECT
10+
a,
11+
b,
12+
_part_offset
13+
ORDER BY b
14+
)
15+
)
16+
ENGINE = ReplacingMergeTree
17+
ORDER BY a
18+
SETTINGS index_granularity_bytes = 10485760, index_granularity = 8192, deduplicate_merge_projection_mode = 'rebuild';
19+
20+
INSERT INTO test SELECT number * 3, rand() FROM numbers(100000);
21+
INSERT INTO test SELECT number * 3 + 1, rand() FROM numbers(100000);
22+
SELECT sum(l._part_offset = r._parent_part_offset) FROM test l JOIN mergeTreeProjection(currentDatabase(), test, p) r USING (a) SETTINGS enable_analyzer = 1;
23+
24+
OPTIMIZE TABLE test FINAL;
25+
26+
SELECT sum(l._part_offset = r._parent_part_offset) FROM test l JOIN mergeTreeProjection(currentDatabase(), test, p) r USING (a) SETTINGS enable_analyzer = 1;
27+
28+
DROP TABLE test;

0 commit comments

Comments
 (0)