Skip to content

Commit ed67b9c

Browse files
Backport #87178 to 25.9: PR: fix LEFT/INNER ... RIGHT ... JOINS chain
1 parent 13b2258 commit ed67b9c

File tree

3 files changed

+81
-2
lines changed

3 files changed

+81
-2
lines changed

src/Planner/PlannerJoinTree.cpp

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2403,6 +2403,8 @@ JoinTreeQueryPlan buildJoinTreeQueryPlan(const QueryTreeNodePtr & query_node,
24032403
size_t joins_count = 0;
24042404
bool is_full_join = false;
24052405
bool is_global_join = false;
2406+
int first_left_or_inner_join_pos = -1;
2407+
int first_right_join_pos = -1;
24062408
/// For each table, table function, query, union table expressions prepare before query plan build
24072409
for (size_t i = 0; i < table_expressions_stack_size; ++i)
24082410
{
@@ -2427,16 +2429,35 @@ JoinTreeQueryPlan buildJoinTreeQueryPlan(const QueryTreeNodePtr & query_node,
24272429
if (join_node.getLocality() == JoinLocality::Global)
24282430
is_global_join = true;
24292431

2432+
// if right join position is after left/inner join then we can't parallelize the left/inner join
2433+
if (first_left_or_inner_join_pos < 0 && (join_node.getKind() == JoinKind::Left || join_node.getKind() == JoinKind::Inner))
2434+
first_left_or_inner_join_pos = i;
2435+
if (first_right_join_pos < 0 && join_node.getKind() == JoinKind::Right)
2436+
first_right_join_pos = i;
2437+
24302438
continue;
24312439
}
24322440

24332441
prepareBuildQueryPlanForTableExpression(table_expression, planner_context);
24342442
}
24352443

2436-
/// disable parallel replicas for n-way join with FULL JOIN or GLOBAL JOINS
2437-
if (joins_count > 1 && (is_full_join || is_global_join))
2444+
auto should_disable_parallel_replicas = [&]() -> bool
2445+
{
2446+
/// n-way join like LEFT/INNER ... RIGHT ...
2447+
if (first_left_or_inner_join_pos >= 0 && first_right_join_pos >= 0 && first_left_or_inner_join_pos < first_right_join_pos)
2448+
return true;
2449+
2450+
/// for n-way join with FULL JOIN or GLOBAL JOINS
2451+
if (joins_count > 1 && (is_full_join || is_global_join))
2452+
return true;
2453+
2454+
return false;
2455+
};
2456+
2457+
if (should_disable_parallel_replicas())
24382458
planner_context->getMutableQueryContext()->setSetting("enable_parallel_replicas", Field{0});
24392459

2460+
24402461
// in case of n-way JOINs the table expression stack contains several join nodes
24412462
// so, we need to find right parent node for a table expression to pass into buildQueryPlanForTableExpression()
24422463
QueryTreeNodePtr parent_join_tree = join_tree_node;
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
-- no parallel replicas --
2+
\N \N \N 0 3 B
3+
1 1 1 a 1 A 1 A
4+
2 2 2 b 2 B 2 B
5+
-- parallel replicas --
6+
\N \N \N 0 3 B
7+
1 1 1 a 1 A 1 A
8+
2 2 2 b 2 B 2 B
9+
-- explain: check parallel replicas is disabled, looking at ReadFromRemoteParallelReplicas steps count --
10+
0
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
DROP TABLE IF EXISTS tab;
2+
CREATE TABLE tab ( `k` Nullable(UInt32), `k1` Nullable(UInt32), `k2` Nullable(UInt32), `v` String ) ENGINE = MergeTree ORDER BY tuple();
3+
INSERT INTO tab VALUES (1, 1, 1, 'a'), (2, 2, 2, 'b');
4+
5+
DROP TABLE IF EXISTS mem;
6+
CREATE TABLE mem ( `k` UInt64, `v` String ) ENGINE = Join(ANY, LEFT, k);
7+
INSERT INTO mem VALUES (1, 'A'), (2, 'B'), (3, 'B');
8+
9+
DROP TABLE IF EXISTS mem2;
10+
CREATE TABLE mem2 ( `k` UInt64, `v` String ) ENGINE = Join(ANY, RIGHT, k);
11+
INSERT INTO mem2 VALUES (1, 'A'), (2, 'B'), (3, 'B');
12+
13+
SET enable_analyzer = 1;
14+
15+
SELECT '-- no parallel replicas --';
16+
SELECT *
17+
FROM tab
18+
ANY LEFT JOIN mem ON k1 = mem.k
19+
ANY RIGHT JOIN mem2 ON k2 = mem2.k
20+
ORDER BY tab.v
21+
SETTINGS enable_parallel_replicas=0;
22+
23+
SELECT '-- parallel replicas --';
24+
SELECT *
25+
FROM tab
26+
ANY LEFT JOIN mem ON k1 = mem.k
27+
ANY RIGHT JOIN mem2 ON k2 = mem2.k
28+
ORDER BY tab.v
29+
SETTINGS enable_parallel_replicas=1, max_parallel_replicas=3, cluster_for_parallel_replicas='test_cluster_one_shard_three_replicas_localhost', parallel_replicas_for_non_replicated_merge_tree=1;
30+
31+
32+
SELECT '-- explain: check parallel replicas is disabled, looking at ReadFromRemoteParallelReplicas steps count --';
33+
SELECT count()
34+
FROM
35+
(
36+
EXPLAIN
37+
SELECT *
38+
FROM tab
39+
ANY LEFT JOIN mem ON k1 = mem.k
40+
ANY RIGHT JOIN mem2 ON k2 = mem2.k
41+
ORDER BY tab.v ASC
42+
SETTINGS enable_parallel_replicas = 1, max_parallel_replicas = 3, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_for_non_replicated_merge_tree = 1
43+
)
44+
WHERE explain ILIKE '%ReadFromRemoteParallelReplicas%';
45+
46+
DROP TABLE mem2;
47+
DROP TABLE mem;
48+
DROP TABLE tab;

0 commit comments

Comments
 (0)