Skip to content

Commit 33d491e

Browse files
Merge pull request #10516 from azat/dist-GROUP_BY-sharding_key-fixes
Disable GROUP BY sharding_key optimization by default (and fix for WITH ROLLUP/CUBE/TOTALS)
2 parents 3a982e0 + 170a341 commit 33d491e

8 files changed

+100
-8
lines changed

src/Core/Settings.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ struct Settings : public SettingsCollection<Settings>
116116
M(SettingBool, distributed_group_by_no_merge, false, "Do not merge aggregation states from different servers for distributed query processing - in case it is for certain that there are different keys on different shards.", 0) \
117117
M(SettingBool, parallel_distributed_insert_select, false, "If true, distributed insert select query in the same cluster will be processed on local tables on every shard", 0) \
118118
M(SettingBool, optimize_skip_unused_shards, false, "Assumes that data is distributed by sharding_key. Optimization to skip unused shards if SELECT query filters by sharding_key.", 0) \
119+
M(SettingBool, optimize_distributed_group_by_sharding_key, false, "Optimize GROUP BY sharding_key queries (by avodiing costly aggregation on the initiator server).", 0) \
119120
M(SettingUInt64, force_optimize_skip_unused_shards, 0, "Throw an exception if unused shards cannot be skipped (1 - throw only if the table has the sharding key, 2 - always throw.", 0) \
120121
M(SettingBool, force_optimize_skip_unused_shards_no_nested, false, "Do not apply force_optimize_skip_unused_shards for nested Distributed tables.", 0) \
121122
\

src/Storages/StorageDistributed.cpp

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,9 @@ bool StorageDistributed::canForceGroupByNoMerge(const Context &context, QueryPro
375375

376376
if (settings.distributed_group_by_no_merge)
377377
return true;
378+
if (!settings.optimize_distributed_group_by_sharding_key)
379+
return false;
380+
378381
/// Distributed-over-Distributed (see getQueryProcessingStageImpl())
379382
if (to_stage == QueryProcessingStage::WithMergeableState)
380383
return false;
@@ -385,8 +388,17 @@ bool StorageDistributed::canForceGroupByNoMerge(const Context &context, QueryPro
385388

386389
const auto & select = query_ptr->as<ASTSelectQuery &>();
387390

391+
if (select.group_by_with_totals || select.group_by_with_rollup || select.group_by_with_cube)
392+
return false;
393+
394+
// TODO: The following can be optimized too (but with some caveats, will be addressed later):
395+
// - ORDER BY
396+
// - LIMIT BY
397+
// - LIMIT
388398
if (select.orderBy())
389399
return false;
400+
if (select.limitBy() || select.limitLength())
401+
return false;
390402

391403
if (select.distinct)
392404
{
@@ -402,11 +414,6 @@ bool StorageDistributed::canForceGroupByNoMerge(const Context &context, QueryPro
402414
reason = "DISTINCT " + backQuote(serializeAST(*select.select(), true));
403415
}
404416

405-
// This can use distributed_group_by_no_merge but in this case limit stage
406-
// should be done later (which is not the case right now).
407-
if (select.limitBy() || select.limitLength())
408-
return false;
409-
410417
const ASTPtr group_by = select.groupBy();
411418
if (!group_by)
412419
{

tests/queries/0_stateless/01213_optimize_skip_unused_shards_DISTINCT.reference

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,3 @@ optimize_skip_unused_shards
66
optimize_skip_unused_shards lack of WHERE
77
0
88
1
9-
0
10-
1

tests/queries/0_stateless/01213_optimize_skip_unused_shards_DISTINCT.sql

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ SELECT DISTINCT id FROM dist_01213 WHERE id = 1 SETTINGS distributed_group_by_no
1212
SELECT 'optimize_skip_unused_shards';
1313
SELECT DISTINCT id FROM dist_01213 WHERE id = 1 SETTINGS optimize_skip_unused_shards=1;
1414
-- check that querying all shards is ok
15-
-- (there will be duplicates, since the INSERT was done via local table)
1615
SELECT 'optimize_skip_unused_shards lack of WHERE';
1716
SELECT DISTINCT id FROM dist_01213 SETTINGS optimize_skip_unused_shards=1;
1817

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
Distributed(number)-over-Distributed(number)
2+
1 0
3+
1 1
4+
1 0
5+
1 1
6+
1 0
7+
1 1
8+
1 0
9+
1 1
10+
Distributed(rand)-over-Distributed(number)
11+
4 0
12+
4 1
13+
Distributed(rand)-over-Distributed(rand)
14+
2 0
15+
2 1
16+
2 0
17+
2 1
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
-- TODO: correct testing with real unique shards
2+
3+
set optimize_distributed_group_by_sharding_key=1;
4+
5+
drop table if exists dist_01247;
6+
drop table if exists dist_layer_01247;
7+
drop table if exists data_01247;
8+
9+
create table data_01247 as system.numbers engine=Memory();
10+
-- since data is not inserted via distributed it will have duplicates
11+
-- (and this is how we ensure that this optimization will work)
12+
insert into data_01247 select * from system.numbers limit 2;
13+
14+
set max_distributed_connections=1;
15+
set optimize_skip_unused_shards=1;
16+
17+
select 'Distributed(number)-over-Distributed(number)';
18+
create table dist_layer_01247 as data_01247 engine=Distributed(test_cluster_two_shards, currentDatabase(), data_01247, number);
19+
create table dist_01247 as data_01247 engine=Distributed(test_cluster_two_shards, currentDatabase(), dist_layer_01247, number);
20+
select count(), * from dist_01247 group by number;
21+
drop table if exists dist_01247;
22+
drop table if exists dist_layer_01247;
23+
24+
select 'Distributed(rand)-over-Distributed(number)';
25+
create table dist_layer_01247 as data_01247 engine=Distributed(test_cluster_two_shards, currentDatabase(), data_01247, number);
26+
create table dist_01247 as data_01247 engine=Distributed(test_cluster_two_shards, currentDatabase(), dist_layer_01247, rand());
27+
select count(), * from dist_01247 group by number;
28+
drop table if exists dist_01247;
29+
drop table if exists dist_layer_01247;
30+
31+
select 'Distributed(rand)-over-Distributed(rand)';
32+
create table dist_layer_01247 as data_01247 engine=Distributed(test_cluster_two_shards, currentDatabase(), data_01247, rand());
33+
create table dist_01247 as data_01247 engine=Distributed(test_cluster_two_shards, currentDatabase(), dist_layer_01247, number);
34+
select count(), * from dist_01247 group by number;
35+
drop table dist_01247;
36+
drop table dist_layer_01247;

tests/queries/0_stateless/01247_distributed_group_by_no_merge_GROUP_BY_injective_sharding_key.reference

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,3 +67,24 @@ GROUP BY (Distributed-over-Distributed) distributed_group_by_no_merge
6767
1 1
6868
1 0
6969
1 1
70+
extremes
71+
1 0
72+
1 1
73+
1 0
74+
1 1
75+
76+
1 0
77+
1 1
78+
WITH TOTALS
79+
2 0
80+
2 1
81+
82+
4 0
83+
WITH ROLLUP
84+
2 0
85+
2 1
86+
4 0
87+
WITH CUBE
88+
2 0
89+
2 1
90+
4 0

tests/queries/0_stateless/01247_distributed_group_by_no_merge_GROUP_BY_injective_sharding_key.sql

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
-- TODO: correct testing with real unique shards
2+
3+
set optimize_distributed_group_by_sharding_key=1;
4+
15
drop table if exists dist_01247;
26
drop table if exists data_01247;
37

@@ -60,3 +64,12 @@ select 'GROUP BY (Distributed-over-Distributed)';
6064
select count(), * from cluster(test_cluster_two_shards, currentDatabase(), dist_01247) group by number;
6165
select 'GROUP BY (Distributed-over-Distributed) distributed_group_by_no_merge';
6266
select count(), * from cluster(test_cluster_two_shards, currentDatabase(), dist_01247) group by number settings distributed_group_by_no_merge=1;
67+
68+
select 'extremes';
69+
select count(), * from dist_01247 group by number settings extremes=1;
70+
select 'WITH TOTALS';
71+
select count(), * from dist_01247 group by number with totals;
72+
select 'WITH ROLLUP';
73+
select count(), * from dist_01247 group by number with rollup;
74+
select 'WITH CUBE';
75+
select count(), * from dist_01247 group by number with cube;

0 commit comments

Comments
 (0)