Skip to content

Commit 8accde7

Browse files
Merge pull request #11162 from azat/data-skip-index-merging-params-fix
[RFC] Fix data skipping indexes for columns with additional actions during merge
2 parents a8f5b10 + 9416e82 commit 8accde7

File tree

5 files changed

+71
-5
lines changed

5 files changed

+71
-5
lines changed

src/Storages/MergeTree/MergeTreeData.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,7 @@ void MergeTreeData::setProperties(const StorageInMemoryMetadata & metadata, bool
423423
}
424424

425425
ASTPtr skip_indices_with_primary_key_expr_list = new_primary_key_expr_list->clone();
426+
ASTPtr skip_indices_expr_list = new_primary_key_expr_list->clone();
426427
ASTPtr skip_indices_with_sorting_key_expr_list = new_sorting_key_expr_list->clone();
427428

428429
MergeTreeIndices new_indices;
@@ -452,6 +453,7 @@ void MergeTreeData::setProperties(const StorageInMemoryMetadata & metadata, bool
452453
{
453454
skip_indices_with_primary_key_expr_list->children.push_back(expr->clone());
454455
skip_indices_with_sorting_key_expr_list->children.push_back(expr->clone());
456+
skip_indices_expr_list->children.push_back(expr->clone());
455457
}
456458

457459
indices_names.insert(new_indices.back()->name);
@@ -462,6 +464,11 @@ void MergeTreeData::setProperties(const StorageInMemoryMetadata & metadata, bool
462464
auto new_indices_with_primary_key_expr = ExpressionAnalyzer(
463465
skip_indices_with_primary_key_expr_list, syntax_primary, global_context).getActions(false);
464466

467+
auto syntax_indices = SyntaxAnalyzer(global_context).analyze(
468+
skip_indices_with_primary_key_expr_list, all_columns);
469+
auto new_indices_expr = ExpressionAnalyzer(
470+
skip_indices_expr_list, syntax_indices, global_context).getActions(false);
471+
465472
auto syntax_sorting = SyntaxAnalyzer(global_context).analyze(
466473
skip_indices_with_sorting_key_expr_list, all_columns);
467474
auto new_indices_with_sorting_key_expr = ExpressionAnalyzer(
@@ -494,6 +501,7 @@ void MergeTreeData::setProperties(const StorageInMemoryMetadata & metadata, bool
494501

495502
setConstraints(metadata.constraints);
496503

504+
skip_indices_expr = new_indices_expr;
497505
primary_key_and_skip_indices_expr = new_indices_with_primary_key_expr;
498506
sorting_key_and_skip_indices_expr = new_indices_with_sorting_key_expr;
499507
}

src/Storages/MergeTree/MergeTreeData.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -647,6 +647,7 @@ class MergeTreeData : public IStorage
647647
/// Secondary (data skipping) indices for MergeTree
648648
MergeTreeIndices skip_indices;
649649

650+
ExpressionActionsPtr skip_indices_expr;
650651
ExpressionActionsPtr primary_key_and_skip_indices_expr;
651652
ExpressionActionsPtr sorting_key_and_skip_indices_expr;
652653

src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -715,13 +715,10 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
715715

716716
Pipe pipe(std::move(input));
717717

718-
if (data.hasPrimaryKey() || data.hasSkipIndices())
718+
if (data.hasSortingKey())
719719
{
720-
auto expr = std::make_shared<ExpressionTransform>(pipe.getHeader(), data.sorting_key_and_skip_indices_expr);
720+
auto expr = std::make_shared<ExpressionTransform>(pipe.getHeader(), data.sorting_key_expr);
721721
pipe.addSimpleTransform(std::move(expr));
722-
723-
auto materializing = std::make_shared<MaterializingTransform>(pipe.getHeader());
724-
pipe.addSimpleTransform(std::move(materializing));
725722
}
726723

727724
pipes.emplace_back(std::move(pipe));
@@ -796,6 +793,12 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
796793
if (need_remove_expired_values)
797794
merged_stream = std::make_shared<TTLBlockInputStream>(merged_stream, data, new_data_part, time_of_merge, force_ttl);
798795

796+
if (data.hasSkipIndices())
797+
{
798+
merged_stream = std::make_shared<ExpressionBlockInputStream>(merged_stream, data.skip_indices_expr);
799+
merged_stream = std::make_shared<MaterializingBlockInputStream>(merged_stream);
800+
}
801+
799802
MergedBlockOutputStream to{
800803
new_data_part,
801804
merging_columns,
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
INSERT
2+
1 0
3+
1 1
4+
1 1
5+
INSERT
6+
1 0
7+
1 1
8+
1 0
9+
1 1
10+
1 2
11+
1 3
12+
1 1
13+
1 1
14+
1 3
15+
OPTIMIZE
16+
1 3
17+
1 3
18+
OPTIMIZE
19+
1 3
20+
1 3
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
DROP TABLE IF EXISTS data_01285;
2+
3+
SET max_threads=1;
4+
5+
6+
CREATE TABLE data_01285 (
7+
key Int,
8+
value SimpleAggregateFunction(max, Nullable(Int)),
9+
INDEX value_idx assumeNotNull(value) TYPE minmax GRANULARITY 1
10+
)
11+
ENGINE=AggregatingMergeTree()
12+
ORDER BY key;
13+
14+
SELECT 'INSERT';
15+
INSERT INTO data_01285 SELECT 1, number FROM numbers(2);
16+
SELECT * FROM data_01285;
17+
SELECT * FROM data_01285 WHERE assumeNotNull(value) = 1;
18+
SELECT 'INSERT';
19+
INSERT INTO data_01285 SELECT 1, number FROM numbers(4);
20+
SELECT * FROM data_01285;
21+
SELECT * FROM data_01285 WHERE assumeNotNull(value) = 1;
22+
SELECT * FROM data_01285 WHERE assumeNotNull(value) = 3;
23+
SELECT 'OPTIMIZE';
24+
OPTIMIZE TABLE data_01285 FINAL;
25+
SELECT * FROM data_01285;
26+
-- before the fix value_idx contains one range {0, 0}
27+
-- and hence cannot find these record.
28+
SELECT * FROM data_01285 WHERE assumeNotNull(value) = 3;
29+
-- one more time just in case
30+
SELECT 'OPTIMIZE';
31+
OPTIMIZE TABLE data_01285 FINAL;
32+
SELECT * FROM data_01285;
33+
-- and this passes even without fix.
34+
SELECT * FROM data_01285 WHERE assumeNotNull(value) = 3;

0 commit comments

Comments
 (0)