Skip to content

Commit d7ba41d

Browse files
alexey-milovidovKochetovNicolai
authored andcommitted
Merge pull request #11162 from azat/data-skip-index-merging-params-fix
[RFC] Fix data skipping indexes for columns with additional actions during merge (cherry picked from commit 8accde7)
1 parent 16af85c commit d7ba41d

File tree

5 files changed

+71
-5
lines changed

5 files changed

+71
-5
lines changed

src/Storages/MergeTree/MergeTreeData.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,7 @@ void MergeTreeData::setProperties(const StorageInMemoryMetadata & metadata, bool
417417
}
418418

419419
ASTPtr skip_indices_with_primary_key_expr_list = new_primary_key_expr_list->clone();
420+
ASTPtr skip_indices_expr_list = new_primary_key_expr_list->clone();
420421
ASTPtr skip_indices_with_sorting_key_expr_list = new_sorting_key_expr_list->clone();
421422

422423
MergeTreeIndices new_indices;
@@ -446,6 +447,7 @@ void MergeTreeData::setProperties(const StorageInMemoryMetadata & metadata, bool
446447
{
447448
skip_indices_with_primary_key_expr_list->children.push_back(expr->clone());
448449
skip_indices_with_sorting_key_expr_list->children.push_back(expr->clone());
450+
skip_indices_expr_list->children.push_back(expr->clone());
449451
}
450452

451453
indices_names.insert(new_indices.back()->name);
@@ -456,6 +458,11 @@ void MergeTreeData::setProperties(const StorageInMemoryMetadata & metadata, bool
456458
auto new_indices_with_primary_key_expr = ExpressionAnalyzer(
457459
skip_indices_with_primary_key_expr_list, syntax_primary, global_context).getActions(false);
458460

461+
auto syntax_indices = SyntaxAnalyzer(global_context).analyze(
462+
skip_indices_with_primary_key_expr_list, all_columns);
463+
auto new_indices_expr = ExpressionAnalyzer(
464+
skip_indices_expr_list, syntax_indices, global_context).getActions(false);
465+
459466
auto syntax_sorting = SyntaxAnalyzer(global_context).analyze(
460467
skip_indices_with_sorting_key_expr_list, all_columns);
461468
auto new_indices_with_sorting_key_expr = ExpressionAnalyzer(
@@ -482,6 +489,7 @@ void MergeTreeData::setProperties(const StorageInMemoryMetadata & metadata, bool
482489

483490
setConstraints(metadata.constraints);
484491

492+
skip_indices_expr = new_indices_expr;
485493
primary_key_and_skip_indices_expr = new_indices_with_primary_key_expr;
486494
sorting_key_and_skip_indices_expr = new_indices_with_sorting_key_expr;
487495
}

src/Storages/MergeTree/MergeTreeData.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -680,6 +680,7 @@ class MergeTreeData : public IStorage
680680
/// Secondary (data skipping) indices for MergeTree
681681
MergeTreeIndices skip_indices;
682682

683+
ExpressionActionsPtr skip_indices_expr;
683684
ExpressionActionsPtr primary_key_and_skip_indices_expr;
684685
ExpressionActionsPtr sorting_key_and_skip_indices_expr;
685686

src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -713,13 +713,10 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
713713

714714
Pipe pipe(std::move(input));
715715

716-
if (data.hasPrimaryKey() || data.hasSkipIndices())
716+
if (data.hasSortingKey())
717717
{
718-
auto expr = std::make_shared<ExpressionTransform>(pipe.getHeader(), data.sorting_key_and_skip_indices_expr);
718+
auto expr = std::make_shared<ExpressionTransform>(pipe.getHeader(), data.sorting_key_expr);
719719
pipe.addSimpleTransform(std::move(expr));
720-
721-
auto materializing = std::make_shared<MaterializingTransform>(pipe.getHeader());
722-
pipe.addSimpleTransform(std::move(materializing));
723720
}
724721

725722
pipes.emplace_back(std::move(pipe));
@@ -794,6 +791,12 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
794791
if (need_remove_expired_values)
795792
merged_stream = std::make_shared<TTLBlockInputStream>(merged_stream, data, new_data_part, time_of_merge, force_ttl);
796793

794+
if (data.hasSkipIndices())
795+
{
796+
merged_stream = std::make_shared<ExpressionBlockInputStream>(merged_stream, data.skip_indices_expr);
797+
merged_stream = std::make_shared<MaterializingBlockInputStream>(merged_stream);
798+
}
799+
797800
MergedBlockOutputStream to{
798801
new_data_part,
799802
merging_columns,
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
INSERT
2+
1 0
3+
1 1
4+
1 1
5+
INSERT
6+
1 0
7+
1 1
8+
1 0
9+
1 1
10+
1 2
11+
1 3
12+
1 1
13+
1 1
14+
1 3
15+
OPTIMIZE
16+
1 3
17+
1 3
18+
OPTIMIZE
19+
1 3
20+
1 3
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
DROP TABLE IF EXISTS data_01285;
2+
3+
SET max_threads=1;
4+
5+
6+
CREATE TABLE data_01285 (
7+
key Int,
8+
value SimpleAggregateFunction(max, Nullable(Int)),
9+
INDEX value_idx assumeNotNull(value) TYPE minmax GRANULARITY 1
10+
)
11+
ENGINE=AggregatingMergeTree()
12+
ORDER BY key;
13+
14+
SELECT 'INSERT';
15+
INSERT INTO data_01285 SELECT 1, number FROM numbers(2);
16+
SELECT * FROM data_01285;
17+
SELECT * FROM data_01285 WHERE assumeNotNull(value) = 1;
18+
SELECT 'INSERT';
19+
INSERT INTO data_01285 SELECT 1, number FROM numbers(4);
20+
SELECT * FROM data_01285;
21+
SELECT * FROM data_01285 WHERE assumeNotNull(value) = 1;
22+
SELECT * FROM data_01285 WHERE assumeNotNull(value) = 3;
23+
SELECT 'OPTIMIZE';
24+
OPTIMIZE TABLE data_01285 FINAL;
25+
SELECT * FROM data_01285;
26+
-- before the fix value_idx contains one range {0, 0}
27+
-- and hence cannot find these record.
28+
SELECT * FROM data_01285 WHERE assumeNotNull(value) = 3;
29+
-- one more time just in case
30+
SELECT 'OPTIMIZE';
31+
OPTIMIZE TABLE data_01285 FINAL;
32+
SELECT * FROM data_01285;
33+
-- and this passes even without fix.
34+
SELECT * FROM data_01285 WHERE assumeNotNull(value) = 3;

0 commit comments

Comments
 (0)