Skip to content

Commit 04b939d

Browse files
Backport #63776 to 24.4: Fix: remove redundant distinct with window functions
1 parent 38adf8f commit 04b939d

File tree

3 files changed

+86
-38
lines changed

3 files changed

+86
-38
lines changed

src/Processors/QueryPlan/Optimizations/removeRedundantDistinct.cpp

Lines changed: 58 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -64,36 +64,61 @@ namespace
6464
return non_const_columns;
6565
}
6666

67+
/// build actions DAG from stack of steps
68+
ActionsDAGPtr buildActionsForPlanPath(std::vector<ActionsDAGPtr> & dag_stack)
69+
{
70+
if (dag_stack.empty())
71+
return nullptr;
72+
73+
ActionsDAGPtr path_actions = dag_stack.back()->clone();
74+
dag_stack.pop_back();
75+
while (!dag_stack.empty())
76+
{
77+
ActionsDAGPtr clone = dag_stack.back()->clone();
78+
logActionsDAG("DAG to merge", clone);
79+
dag_stack.pop_back();
80+
path_actions->mergeInplace(std::move(*clone));
81+
}
82+
return path_actions;
83+
}
84+
6785
bool compareAggregationKeysWithDistinctColumns(
68-
const Names & aggregation_keys, const DistinctColumns & distinct_columns, const ActionsDAGPtr & path_actions)
86+
const Names & aggregation_keys, const DistinctColumns & distinct_columns, std::vector<std::vector<ActionsDAGPtr>> actions_chain)
6987
{
7088
logDebug("aggregation_keys", aggregation_keys);
7189
logDebug("aggregation_keys size", aggregation_keys.size());
7290
logDebug("distinct_columns size", distinct_columns.size());
7391

74-
std::set<std::string_view> original_distinct_columns;
75-
FindOriginalNodeForOutputName original_node_finder(path_actions);
76-
for (const auto & column : distinct_columns)
92+
std::set<String> current_columns(begin(distinct_columns), end(distinct_columns));
93+
std::set<String> source_columns;
94+
for (auto & actions : actions_chain)
7795
{
78-
logDebug("distinct column name", column);
79-
const auto * alias_node = original_node_finder.find(String(column));
80-
if (!alias_node)
96+
FindOriginalNodeForOutputName original_node_finder(buildActionsForPlanPath(actions));
97+
for (const auto & column : current_columns)
8198
{
82-
logDebug("original name for alias is not found", column);
83-
original_distinct_columns.insert(column);
84-
}
85-
else
86-
{
87-
logDebug("alias result name", alias_node->result_name);
88-
original_distinct_columns.insert(alias_node->result_name);
99+
logDebug("distinct column name", column);
100+
const auto * alias_node = original_node_finder.find(String(column));
101+
if (!alias_node)
102+
{
103+
logDebug("original name for alias is not found", column);
104+
source_columns.insert(String(column));
105+
}
106+
else
107+
{
108+
logDebug("alias result name", alias_node->result_name);
109+
source_columns.insert(alias_node->result_name);
110+
}
89111
}
112+
113+
current_columns = std::move(source_columns);
114+
source_columns.clear();
90115
}
91116
/// if aggregation keys are part of distinct columns then rows already distinct
92117
for (const auto & key : aggregation_keys)
93118
{
94-
if (!original_distinct_columns.contains(key))
119+
if (!current_columns.contains(key))
95120
{
96-
logDebug("aggregation key NOT found: {}", key);
121+
logDebug("aggregation key NOT found", key);
97122
return false;
98123
}
99124
}
@@ -122,30 +147,13 @@ namespace
122147
return false;
123148
}
124149

125-
/// build actions DAG from stack of steps
126-
ActionsDAGPtr buildActionsForPlanPath(std::vector<ActionsDAGPtr> & dag_stack)
127-
{
128-
if (dag_stack.empty())
129-
return nullptr;
130-
131-
ActionsDAGPtr path_actions = dag_stack.back()->clone();
132-
dag_stack.pop_back();
133-
while (!dag_stack.empty())
134-
{
135-
ActionsDAGPtr clone = dag_stack.back()->clone();
136-
logActionsDAG("DAG to merge", clone);
137-
dag_stack.pop_back();
138-
path_actions->mergeInplace(std::move(*clone));
139-
}
140-
return path_actions;
141-
}
142-
143150
bool passTillAggregation(const QueryPlan::Node * distinct_node)
144151
{
145152
const DistinctStep * distinct_step = typeid_cast<DistinctStep *>(distinct_node->step.get());
146153
chassert(distinct_step);
147154

148155
std::vector<ActionsDAGPtr> dag_stack;
156+
std::vector<std::vector<ActionsDAGPtr>> actions_chain;
149157
const DistinctStep * inner_distinct_step = nullptr;
150158
const IQueryPlanStep * aggregation_before_distinct = nullptr;
151159
const QueryPlan::Node * node = distinct_node;
@@ -163,6 +171,12 @@ namespace
163171
break;
164172
}
165173

174+
if (typeid_cast<const WindowStep *>(current_step))
175+
{
176+
actions_chain.push_back(std::move(dag_stack));
177+
dag_stack.clear();
178+
}
179+
166180
if (const auto * const expr = typeid_cast<const ExpressionStep *>(current_step); expr)
167181
dag_stack.push_back(expr->getExpression());
168182
else if (const auto * const filter = typeid_cast<const FilterStep *>(current_step); filter)
@@ -177,16 +191,22 @@ namespace
177191

178192
if (aggregation_before_distinct)
179193
{
180-
ActionsDAGPtr actions = buildActionsForPlanPath(dag_stack);
181-
logActionsDAG("aggregation pass: merged DAG", actions);
194+
if (actions_chain.empty())
195+
actions_chain.push_back(std::move(dag_stack));
182196

183197
const auto distinct_columns = getDistinctColumns(distinct_step);
184198

185199
if (const auto * aggregating_step = typeid_cast<const AggregatingStep *>(aggregation_before_distinct); aggregating_step)
186-
return compareAggregationKeysWithDistinctColumns(aggregating_step->getParams().keys, distinct_columns, actions);
200+
{
201+
return compareAggregationKeysWithDistinctColumns(
202+
aggregating_step->getParams().keys, distinct_columns, std::move(actions_chain));
203+
}
187204
else if (const auto * merging_aggregated_step = typeid_cast<const MergingAggregatedStep *>(aggregation_before_distinct);
188205
merging_aggregated_step)
189-
return compareAggregationKeysWithDistinctColumns(merging_aggregated_step->getParams().keys, distinct_columns, actions);
206+
{
207+
return compareAggregationKeysWithDistinctColumns(
208+
merging_aggregated_step->getParams().keys, distinct_columns, std::move(actions_chain));
209+
}
190210
}
191211

192212
return false;
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
1
2+
2
3+
3
4+
--------
5+
1 2023-01-14 00:00:00
6+
2 2023-01-14 00:00:00
7+
3 2023-01-14 00:00:00
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
DROP TABLE IF EXISTS tab;
2+
DROP TABLE IF EXISTS tab_v;
3+
4+
CREATE TABLE tab (id Int32, val Nullable(Float64), dt Nullable(DateTime64(6)), type Nullable(Int32)) ENGINE = MergeTree ORDER BY id;
5+
6+
insert into tab values (1,10,'2023-01-14 00:00:00',1),(2,20,'2023-01-14 00:00:00',1),(3,20,'2023-01-14 00:00:00',2),(4,40,'2023-01-14 00:00:00',3),(5,50,'2023-01-14 00:00:00',3);
7+
8+
CREATE VIEW tab_v AS SELECT
9+
t1.type AS type,
10+
sum(t1.val) AS sval,
11+
toStartOfDay(t1.dt) AS sday,
12+
anyLast(sval) OVER w AS lval
13+
FROM tab AS t1
14+
GROUP BY
15+
type,
16+
sday
17+
WINDOW w AS (PARTITION BY type);
18+
19+
select distinct type from tab_v order by type;
20+
select '--------';
21+
select distinct type, sday from tab_v order by type, sday;

0 commit comments

Comments
 (0)