Skip to content

Commit 459559b

Browse files
alexey-milovidovKochetovNicolai
authored andcommitted
Merge pull request #11162 from azat/data-skip-index-merging-params-fix
[RFC] Fix data skipping indexes for columns with additional actions during merge (cherry picked from commit 8accde7)
1 parent 8f8a076 commit 459559b

File tree

5 files changed

+71
-2
lines changed

5 files changed

+71
-2
lines changed

src/Storages/MergeTree/MergeTreeData.cpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,7 @@ void MergeTreeData::setProperties(const StorageInMemoryMetadata & metadata, bool
406406
}
407407

408408
ASTPtr skip_indices_with_primary_key_expr_list = new_primary_key_expr_list->clone();
409+
ASTPtr skip_indices_expr_list = new_primary_key_expr_list->clone();
409410
ASTPtr skip_indices_with_sorting_key_expr_list = new_sorting_key_expr_list->clone();
410411

411412
MergeTreeIndices new_indices;
@@ -435,6 +436,7 @@ void MergeTreeData::setProperties(const StorageInMemoryMetadata & metadata, bool
435436
{
436437
skip_indices_with_primary_key_expr_list->children.push_back(expr->clone());
437438
skip_indices_with_sorting_key_expr_list->children.push_back(expr->clone());
439+
skip_indices_expr_list->children.push_back(expr->clone());
438440
}
439441

440442
indices_names.insert(new_indices.back()->name);
@@ -445,7 +447,12 @@ void MergeTreeData::setProperties(const StorageInMemoryMetadata & metadata, bool
445447
auto new_indices_with_primary_key_expr = ExpressionAnalyzer(
446448
skip_indices_with_primary_key_expr_list, syntax_primary, global_context).getActions(false);
447449

448-
auto syntax_sorting = SyntaxAnalyzer(global_context, {}).analyze(
450+
auto syntax_indices = SyntaxAnalyzer(global_context, {}).analyze(
451+
skip_indices_with_primary_key_expr_list, all_columns);
452+
auto new_indices_expr = ExpressionAnalyzer(
453+
skip_indices_expr_list, syntax_indices, global_context).getActions(false);
454+
455+
auto syntax_sorting = SyntaxAnalyzer(global_context).analyze(
449456
skip_indices_with_sorting_key_expr_list, all_columns);
450457
auto new_indices_with_sorting_key_expr = ExpressionAnalyzer(
451458
skip_indices_with_sorting_key_expr_list, syntax_sorting, global_context).getActions(false);
@@ -471,6 +478,7 @@ void MergeTreeData::setProperties(const StorageInMemoryMetadata & metadata, bool
471478

472479
setConstraints(metadata.constraints);
473480

481+
skip_indices_expr = new_indices_expr;
474482
primary_key_and_skip_indices_expr = new_indices_with_primary_key_expr;
475483
sorting_key_and_skip_indices_expr = new_indices_with_sorting_key_expr;
476484
}

src/Storages/MergeTree/MergeTreeData.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -708,6 +708,7 @@ class MergeTreeData : public IStorage
708708
/// Secondary (data skipping) indices for MergeTree
709709
MergeTreeIndices skip_indices;
710710

711+
ExpressionActionsPtr skip_indices_expr;
711712
ExpressionActionsPtr primary_key_and_skip_indices_expr;
712713
ExpressionActionsPtr sorting_key_and_skip_indices_expr;
713714

src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -683,7 +683,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
683683
MergeProgressCallback(merge_entry, watch_prev_elapsed, horizontal_stage_progress));
684684

685685
BlockInputStreamPtr stream = std::move(input);
686-
if (data.hasPrimaryKey() || data.hasSkipIndices())
686+
if (data.hasSortingKey())
687687
stream = std::make_shared<MaterializingBlockInputStream>(
688688
std::make_shared<ExpressionBlockInputStream>(stream, data.sorting_key_and_skip_indices_expr));
689689

@@ -756,6 +756,12 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
756756
if (need_remove_expired_values)
757757
merged_stream = std::make_shared<TTLBlockInputStream>(merged_stream, data, new_data_part, time_of_merge, force_ttl);
758758

759+
if (data.hasSkipIndices())
760+
{
761+
merged_stream = std::make_shared<ExpressionBlockInputStream>(merged_stream, data.skip_indices_expr);
762+
merged_stream = std::make_shared<MaterializingBlockInputStream>(merged_stream);
763+
}
764+
759765
MergedBlockOutputStream to{
760766
data,
761767
new_part_tmp_path,
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
INSERT
2+
1 0
3+
1 1
4+
1 1
5+
INSERT
6+
1 0
7+
1 1
8+
1 0
9+
1 1
10+
1 2
11+
1 3
12+
1 1
13+
1 1
14+
1 3
15+
OPTIMIZE
16+
1 3
17+
1 3
18+
OPTIMIZE
19+
1 3
20+
1 3
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
DROP TABLE IF EXISTS data_01285;
2+
3+
SET max_threads=1;
4+
5+
6+
CREATE TABLE data_01285 (
7+
key Int,
8+
value SimpleAggregateFunction(max, Nullable(Int)),
9+
INDEX value_idx assumeNotNull(value) TYPE minmax GRANULARITY 1
10+
)
11+
ENGINE=AggregatingMergeTree()
12+
ORDER BY key;
13+
14+
SELECT 'INSERT';
15+
INSERT INTO data_01285 SELECT 1, number FROM numbers(2);
16+
SELECT * FROM data_01285;
17+
SELECT * FROM data_01285 WHERE assumeNotNull(value) = 1;
18+
SELECT 'INSERT';
19+
INSERT INTO data_01285 SELECT 1, number FROM numbers(4);
20+
SELECT * FROM data_01285;
21+
SELECT * FROM data_01285 WHERE assumeNotNull(value) = 1;
22+
SELECT * FROM data_01285 WHERE assumeNotNull(value) = 3;
23+
SELECT 'OPTIMIZE';
24+
OPTIMIZE TABLE data_01285 FINAL;
25+
SELECT * FROM data_01285;
26+
-- before the fix value_idx contains one range {0, 0}
27+
-- and hence cannot find these record.
28+
SELECT * FROM data_01285 WHERE assumeNotNull(value) = 3;
29+
-- one more time just in case
30+
SELECT 'OPTIMIZE';
31+
OPTIMIZE TABLE data_01285 FINAL;
32+
SELECT * FROM data_01285;
33+
-- and this passes even without fix.
34+
SELECT * FROM data_01285 WHERE assumeNotNull(value) = 3;

0 commit comments

Comments
 (0)