@@ -64,36 +64,61 @@ namespace
6464 return non_const_columns;
6565 }
6666
67+ // / build actions DAG from stack of steps
68+ ActionsDAGPtr buildActionsForPlanPath (std::vector<ActionsDAGPtr> & dag_stack)
69+ {
70+ if (dag_stack.empty ())
71+ return nullptr ;
72+
73+ ActionsDAGPtr path_actions = dag_stack.back ()->clone ();
74+ dag_stack.pop_back ();
75+ while (!dag_stack.empty ())
76+ {
77+ ActionsDAGPtr clone = dag_stack.back ()->clone ();
78+ logActionsDAG (" DAG to merge" , clone);
79+ dag_stack.pop_back ();
80+ path_actions->mergeInplace (std::move (*clone));
81+ }
82+ return path_actions;
83+ }
84+
6785 bool compareAggregationKeysWithDistinctColumns (
68- const Names & aggregation_keys, const DistinctColumns & distinct_columns, const ActionsDAGPtr & path_actions )
86+ const Names & aggregation_keys, const DistinctColumns & distinct_columns, std::vector<std::vector< ActionsDAGPtr>> actions_chain )
6987 {
7088 logDebug (" aggregation_keys" , aggregation_keys);
7189 logDebug (" aggregation_keys size" , aggregation_keys.size ());
7290 logDebug (" distinct_columns size" , distinct_columns.size ());
7391
74- std::set<std::string_view> original_distinct_columns ;
75- FindOriginalNodeForOutputName original_node_finder (path_actions) ;
76- for (const auto & column : distinct_columns )
92+ std::set<String> current_columns ( begin (distinct_columns), end (distinct_columns)) ;
93+ std::set<String> source_columns ;
94+ for (auto & actions : actions_chain )
7795 {
78- logDebug (" distinct column name" , column);
79- const auto * alias_node = original_node_finder.find (String (column));
80- if (!alias_node)
96+ FindOriginalNodeForOutputName original_node_finder (buildActionsForPlanPath (actions));
97+ for (const auto & column : current_columns)
8198 {
82- logDebug (" original name for alias is not found" , column);
83- original_distinct_columns.insert (column);
84- }
85- else
86- {
87- logDebug (" alias result name" , alias_node->result_name );
88- original_distinct_columns.insert (alias_node->result_name );
99+ logDebug (" distinct column name" , column);
100+ const auto * alias_node = original_node_finder.find (String (column));
101+ if (!alias_node)
102+ {
103+ logDebug (" original name for alias is not found" , column);
104+ source_columns.insert (String (column));
105+ }
106+ else
107+ {
108+ logDebug (" alias result name" , alias_node->result_name );
109+ source_columns.insert (alias_node->result_name );
110+ }
89111 }
112+
113+ current_columns = std::move (source_columns);
114+ source_columns.clear ();
90115 }
91116 // / if aggregation keys are part of distinct columns then rows already distinct
92117 for (const auto & key : aggregation_keys)
93118 {
94- if (!original_distinct_columns .contains (key))
119+ if (!current_columns .contains (key))
95120 {
96- logDebug (" aggregation key NOT found: {} " , key);
121+ logDebug (" aggregation key NOT found" , key);
97122 return false ;
98123 }
99124 }
@@ -122,30 +147,13 @@ namespace
122147 return false ;
123148 }
124149
125- // / build actions DAG from stack of steps
126- ActionsDAGPtr buildActionsForPlanPath (std::vector<ActionsDAGPtr> & dag_stack)
127- {
128- if (dag_stack.empty ())
129- return nullptr ;
130-
131- ActionsDAGPtr path_actions = dag_stack.back ()->clone ();
132- dag_stack.pop_back ();
133- while (!dag_stack.empty ())
134- {
135- ActionsDAGPtr clone = dag_stack.back ()->clone ();
136- logActionsDAG (" DAG to merge" , clone);
137- dag_stack.pop_back ();
138- path_actions->mergeInplace (std::move (*clone));
139- }
140- return path_actions;
141- }
142-
143150 bool passTillAggregation (const QueryPlan::Node * distinct_node)
144151 {
145152 const DistinctStep * distinct_step = typeid_cast<DistinctStep *>(distinct_node->step .get ());
146153 chassert (distinct_step);
147154
148155 std::vector<ActionsDAGPtr> dag_stack;
156+ std::vector<std::vector<ActionsDAGPtr>> actions_chain;
149157 const DistinctStep * inner_distinct_step = nullptr ;
150158 const IQueryPlanStep * aggregation_before_distinct = nullptr ;
151159 const QueryPlan::Node * node = distinct_node;
@@ -163,6 +171,12 @@ namespace
163171 break ;
164172 }
165173
174+ if (typeid_cast<const WindowStep *>(current_step))
175+ {
176+ actions_chain.push_back (std::move (dag_stack));
177+ dag_stack.clear ();
178+ }
179+
166180 if (const auto * const expr = typeid_cast<const ExpressionStep *>(current_step); expr)
167181 dag_stack.push_back (expr->getExpression ());
168182 else if (const auto * const filter = typeid_cast<const FilterStep *>(current_step); filter)
@@ -177,16 +191,22 @@ namespace
177191
178192 if (aggregation_before_distinct)
179193 {
180- ActionsDAGPtr actions = buildActionsForPlanPath (dag_stack);
181- logActionsDAG ( " aggregation pass: merged DAG " , actions );
194+ if (actions_chain. empty ())
195+ actions_chain. push_back ( std::move (dag_stack) );
182196
183197 const auto distinct_columns = getDistinctColumns (distinct_step);
184198
185199 if (const auto * aggregating_step = typeid_cast<const AggregatingStep *>(aggregation_before_distinct); aggregating_step)
186- return compareAggregationKeysWithDistinctColumns (aggregating_step->getParams ().keys , distinct_columns, actions);
200+ {
201+ return compareAggregationKeysWithDistinctColumns (
202+ aggregating_step->getParams ().keys , distinct_columns, std::move (actions_chain));
203+ }
187204 else if (const auto * merging_aggregated_step = typeid_cast<const MergingAggregatedStep *>(aggregation_before_distinct);
188205 merging_aggregated_step)
189- return compareAggregationKeysWithDistinctColumns (merging_aggregated_step->getParams ().keys , distinct_columns, actions);
206+ {
207+ return compareAggregationKeysWithDistinctColumns (
208+ merging_aggregated_step->getParams ().keys , distinct_columns, std::move (actions_chain));
209+ }
190210 }
191211
192212 return false ;
0 commit comments