Skip to content

Commit ddd8268

Browse files
Handle removal of columns that are not handled by actions
1 parent 0f4a895 commit ddd8268

File tree

2 files changed

+62
-25
lines changed

2 files changed

+62
-25
lines changed

src/Processors/QueryPlan/ExpressionStep.cpp

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -102,27 +102,46 @@ IQueryPlanStep::UnusedColumnRemovalResult ExpressionStep::removeUnusedColumns(co
102102
{
103103
const auto split_results = actions_dag.splitPossibleOutputNames(required_outputs);
104104
const auto actions_dag_input_count_before = actions_dag.getInputs().size();
105-
const auto removed_any_actions = actions_dag.removeUnusedActions(split_results.output_names, remove_inputs);
106-
107-
if (!removed_any_actions && output_header.has_value() && output_header->columns() == required_outputs.size())
108-
return UnusedColumnRemovalResult{false, false};
109-
110-
auto & input_header = input_headers.front();
111-
// Number of columns that are not changed/removed by actions
105+
auto updated_actions = actions_dag.removeUnusedActions(split_results.output_names, remove_inputs);
106+
const auto & input_header = input_headers.front();
107+
// Number of input columns that are not removed by actions
112108
const auto pass_through_inputs = input_header.columns() - actions_dag_input_count_before;
113-
const auto update_inputs = remove_inputs
114-
&& (actions_dag.getInputs().size() < actions_dag_input_count_before || pass_through_inputs > split_results.not_output_names.size());
109+
const auto has_to_remove_any_pass_through_input = pass_through_inputs > split_results.not_output_names.size();
110+
const auto has_to_add_input_to_actions = !remove_inputs && has_to_remove_any_pass_through_input;
115111

116-
if (update_inputs)
112+
const auto build_required_inputs_set = [this, &not_output_names = split_results.not_output_names]()
117113
{
118114
std::unordered_set<String> required_inputs_set;
119115

120116
for (const auto * input_node : actions_dag.getInputs())
121117
required_inputs_set.insert(input_node->result_name);
122118

123-
for (const auto & pass_through_input : split_results.not_output_names)
119+
for (const auto & pass_through_input : not_output_names)
124120
required_inputs_set.insert(pass_through_input);
125121

122+
return required_inputs_set;
123+
};
124+
125+
if (has_to_add_input_to_actions)
126+
{
127+
const auto required_inputs_set = build_required_inputs_set();
128+
129+
for (const auto & name_and_type : input_header)
130+
if (!required_inputs_set.contains(name_and_type.name))
131+
actions_dag.addInput(name_and_type);
132+
133+
updated_actions = true;
134+
}
135+
136+
if (!updated_actions && output_header.has_value() && output_header->columns() == required_outputs.size())
137+
return UnusedColumnRemovalResult{false, false};
138+
139+
const auto update_inputs = remove_inputs
140+
&& (actions_dag.getInputs().size() < actions_dag_input_count_before || pass_through_inputs > split_results.not_output_names.size());
141+
142+
if (update_inputs)
143+
{
144+
const auto required_inputs_set = build_required_inputs_set();
126145
Header new_input_header{};
127146

128147
for (const auto & col_type_and_name : input_header)

src/Processors/QueryPlan/FilterStep.cpp

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,6 @@ IQueryPlanStep::UnusedColumnRemovalResult FilterStep::removeUnusedColumns(const
300300
auto split_results = actions_dag.splitPossibleOutputNames(required_outputs);
301301
const auto actions_dag_input_count_before = actions_dag.getInputs().size();
302302

303-
304303
const auto required_columns_contains_filter = std::any_of(
305304
split_results.output_names.begin(),
306305
split_results.output_names.end(),
@@ -309,30 +308,49 @@ IQueryPlanStep::UnusedColumnRemovalResult FilterStep::removeUnusedColumns(const
309308
if (!required_columns_contains_filter)
310309
split_results.output_names.push_back(filter_column_name);
311310

312-
const auto removed_any_actions = actions_dag.removeUnusedActions(split_results.output_names, remove_inputs);
311+
auto updated_actions = actions_dag.removeUnusedActions(split_results.output_names, remove_inputs);
313312

314-
if (!removed_any_actions && output_header.has_value() && output_header->columns() == required_outputs.size())
313+
const auto & input_header = input_headers.front();
314+
// Number of input columns that are not removed by actions
315+
const auto pass_through_inputs = input_header.columns() - actions_dag_input_count_before;
316+
const auto has_to_remove_any_pass_through_input = pass_through_inputs > split_results.not_output_names.size();
317+
const auto has_to_add_input_to_actions = !remove_inputs && has_to_remove_any_pass_through_input;
318+
const auto build_required_inputs_set = [this, &not_output_names = split_results.not_output_names]()
319+
{
320+
std::unordered_set<String> required_inputs_set;
321+
322+
for (const auto * input_node : actions_dag.getInputs())
323+
required_inputs_set.insert(input_node->result_name);
324+
325+
for (const auto & pass_through_input : not_output_names)
326+
required_inputs_set.insert(pass_through_input);
327+
328+
return required_inputs_set;
329+
};
330+
331+
if (has_to_add_input_to_actions)
332+
{
333+
const auto required_inputs_set = build_required_inputs_set();
334+
335+
for (const auto & name_and_type : input_header)
336+
if (!required_inputs_set.contains(name_and_type.name))
337+
actions_dag.addInput(name_and_type);
338+
339+
updated_actions = true;
340+
}
341+
342+
if (!updated_actions && output_header.has_value() && output_header->columns() == required_outputs.size())
315343
return UnusedColumnRemovalResult{false, false};
316344

317345
// There cannot be more inputs in the DAG than columns in the input header
318346
chassert(actions_dag.getInputs().size() <= getInputHeaders().at(0).columns());
319347

320-
auto & input_header = input_headers.front();
321-
// Number of columns that are not changed/removed by actions
322-
const auto pass_through_inputs = input_header.columns() - actions_dag_input_count_before;
323348
const auto update_inputs = remove_inputs
324349
&& (actions_dag.getInputs().size() < actions_dag_input_count_before || pass_through_inputs > split_results.not_output_names.size());
325350

326351
if (update_inputs)
327352
{
328-
std::unordered_set<String> required_inputs_set;
329-
330-
for (const auto * input_node : actions_dag.getInputs())
331-
required_inputs_set.insert(input_node->result_name);
332-
333-
for (const auto & pass_through_input : split_results.not_output_names)
334-
required_inputs_set.insert(pass_through_input);
335-
353+
const auto required_inputs_set = build_required_inputs_set();
336354
Header new_input_header{};
337355

338356
for (const auto & col_type_and_name : input_header)

0 commit comments

Comments
 (0)