Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Interpreters/ActionsDAG.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ class ActionsDAG
std::string dumpDAG() const;

void serialize(WriteBuffer & out, SerializedSetsRegistry & registry) const;

static ActionsDAG deserialize(ReadBuffer & in, DeserializedSetsRegistry & registry, const ContextPtr & context);

const Node & addInput(std::string name, DataTypePtr type);
const Node & addInput(ColumnWithTypeAndName column);
const Node & addColumn(ColumnWithTypeAndName column);
Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/ExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2339,7 +2339,7 @@ void ExpressionAnalysisResult::checkActions() const
throw Exception(ErrorCodes::ILLEGAL_PREWHERE, "PREWHERE cannot contain ARRAY JOIN action");
};

check_actions(prewhere_info->prewhere_actions);
check_actions(prewhere_info->prewhere_actions.value());
}
}

Expand Down
55 changes: 32 additions & 23 deletions src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1074,9 +1074,10 @@ bool InterpreterSelectQuery::adjustParallelReplicasAfterAnalysis()
ActionDAGNodes added_filter_nodes = MergeTreeData::getFiltersForPrimaryKeyAnalysis(*this);
if (query_info_copy.prewhere_info)
{
if (query_info_copy.prewhere_info->prewhere_actions)
{
const auto & node
= query_info_copy.prewhere_info->prewhere_actions.findInOutputs(query_info_copy.prewhere_info->prewhere_column_name);
= query_info_copy.prewhere_info->prewhere_actions->findInOutputs(query_info_copy.prewhere_info->prewhere_column_name);
added_filter_nodes.nodes.push_back(&node);
}

Expand Down Expand Up @@ -1194,9 +1195,9 @@ Block InterpreterSelectQuery::getSampleBlockImpl()
{
auto header = *source_header;

if (analysis_result.prewhere_info)
if (analysis_result.prewhere_info && analysis_result.prewhere_info->prewhere_actions)
{
header = analysis_result.prewhere_info->prewhere_actions.updateHeader(header);
header = analysis_result.prewhere_info->prewhere_actions->updateHeader(header);
if (analysis_result.prewhere_info->remove_prewhere_column)
header.erase(analysis_result.prewhere_info->prewhere_column_name);
}
Expand Down Expand Up @@ -1657,14 +1658,17 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
query_plan.addStep(std::move(row_level_filter_step));
}

auto prewhere_step = std::make_unique<FilterStep>(
query_plan.getCurrentHeader(),
expressions.prewhere_info->prewhere_actions.clone(),
expressions.prewhere_info->prewhere_column_name,
expressions.prewhere_info->remove_prewhere_column);
if (expressions.prewhere_info->prewhere_actions)
{
auto prewhere_step = std::make_unique<FilterStep>(
query_plan.getCurrentHeader(),
expressions.prewhere_info->prewhere_actions->clone(),
expressions.prewhere_info->prewhere_column_name,
expressions.prewhere_info->remove_prewhere_column);

prewhere_step->setStepDescription("PREWHERE");
query_plan.addStep(std::move(prewhere_step));
prewhere_step->setStepDescription("PREWHERE");
query_plan.addStep(std::move(prewhere_step));
}
}
}
else
Expand Down Expand Up @@ -2219,13 +2223,16 @@ void InterpreterSelectQuery::addEmptySourceToQueryPlan(QueryPlan & query_plan, c
});
}

auto filter_actions = std::make_shared<ExpressionActions>(prewhere_info.prewhere_actions.clone());
pipe.addSimpleTransform([&](const SharedHeader & header)
if (prewhere_info.prewhere_actions)
{
return std::make_shared<FilterTransform>(
header, filter_actions,
prewhere_info.prewhere_column_name, prewhere_info.remove_prewhere_column);
});
auto filter_actions = std::make_shared<ExpressionActions>(prewhere_info.prewhere_actions->clone());
pipe.addSimpleTransform([&](const SharedHeader & header)
{
return std::make_shared<FilterTransform>(
header, filter_actions,
prewhere_info.prewhere_column_name, prewhere_info.remove_prewhere_column);
});
}
}

auto read_from_pipe = std::make_unique<ReadFromPreparedSource>(std::move(pipe));
Expand Down Expand Up @@ -2310,10 +2317,10 @@ void InterpreterSelectQuery::addPrewhereAliasActions()
{
NameSet columns;

if (prewhere_info)
if (prewhere_info && prewhere_info->prewhere_actions)
{
/// Get some columns directly from PREWHERE expression actions
auto prewhere_required_columns = prewhere_info->prewhere_actions.getRequiredColumns().getNames();
auto prewhere_required_columns = prewhere_info->prewhere_actions->getRequiredColumns().getNames();
columns.insert(prewhere_required_columns.begin(), prewhere_required_columns.end());

if (prewhere_info->row_level_filter)
Expand Down Expand Up @@ -2382,10 +2389,10 @@ void InterpreterSelectQuery::addPrewhereAliasActions()
NameSet required_columns_after_prewhere_set;

/// Collect required columns from prewhere expression actions.
if (prewhere_info)
if (prewhere_info && prewhere_info->prewhere_actions)
{
NameSet columns_to_remove(columns_to_remove_after_prewhere.begin(), columns_to_remove_after_prewhere.end());
Block prewhere_actions_result = prewhere_info->prewhere_actions.getResultColumns();
Block prewhere_actions_result = prewhere_info->prewhere_actions->getResultColumns();

/// Populate required columns with the columns, added by PREWHERE actions and not removed afterwards.
/// XXX: looks hacky that we already know which columns after PREWHERE we won't need for sure.
Expand Down Expand Up @@ -2420,11 +2427,11 @@ void InterpreterSelectQuery::addPrewhereAliasActions()
/// Remove columns which will be added by prewhere.
std::erase_if(required_columns, [&](const String & name) { return required_columns_after_prewhere_set.contains(name); });

if (prewhere_info)
if (prewhere_info && prewhere_info->prewhere_actions)
{
/// Don't remove columns which are needed to be aliased.
for (const auto & name : required_columns)
prewhere_info->prewhere_actions.tryRestoreColumn(name);
prewhere_info->prewhere_actions->tryRestoreColumn(name);

/// Add physical columns required by prewhere actions.
for (const auto & column : required_columns_from_prewhere)
Expand Down Expand Up @@ -2481,7 +2488,9 @@ std::optional<UInt64> InterpreterSelectQuery::getTrivialCount(UInt64 allow_exper
if (analysis_result.hasPrewhere())
{
auto & prewhere_info = analysis_result.prewhere_info;
filter_nodes.push_back(&prewhere_info->prewhere_actions.findInOutputs(prewhere_info->prewhere_column_name));

if (prewhere_info->prewhere_actions)
filter_nodes.push_back(&prewhere_info->prewhere_actions->findInOutputs(prewhere_info->prewhere_column_name));

if (prewhere_info->row_level_filter)
filter_nodes.push_back(&prewhere_info->row_level_filter->findInOutputs(prewhere_info->row_level_column_name));
Expand Down
25 changes: 17 additions & 8 deletions src/Planner/PlannerJoinTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,10 @@ void updatePrewhereOutputsIfNeeded(SelectQueryInfo & table_expression_query_info
if (!table_expression_query_info.prewhere_info)
return;

auto & prewhere_actions = table_expression_query_info.prewhere_info->prewhere_actions;
if (!table_expression_query_info.prewhere_info->prewhere_actions)
return;

auto & prewhere_actions = *table_expression_query_info.prewhere_info->prewhere_actions;

NameSet required_columns;
if (column_names.size() == 1)
Expand Down Expand Up @@ -896,7 +899,7 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres

updatePrewhereOutputsIfNeeded(table_expression_query_info, table_expression_data.getColumnNames(), storage_snapshot);

const auto add_filter = [&](FilterDAGInfo & filter_info, std::string description)
const auto add_filter = [&](FilterDAGInfo & filter_info, std::string description, bool is_row_policy = false)
{
bool is_final = table_expression_query_info.table_expression_modifiers
&& table_expression_query_info.table_expression_modifiers->hasFinal();
Expand All @@ -913,17 +916,21 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
if (!prewhere_info)
{
prewhere_info = std::make_shared<PrewhereInfo>();
prewhere_info->prewhere_actions = std::move(filter_info.actions);
prewhere_info->prewhere_column_name = filter_info.column_name;
prewhere_info->remove_prewhere_column = filter_info.do_remove_column;
prewhere_info->need_filter = true;
}
else if (!prewhere_info->row_level_filter)

if (is_row_policy)
{
prewhere_info->row_level_filter = std::move(filter_info.actions);
prewhere_info->row_level_column_name = filter_info.column_name;
prewhere_info->need_filter = true;
}
else if (!prewhere_info->prewhere_actions)
{
prewhere_info->prewhere_actions = std::move(filter_info.actions);
prewhere_info->prewhere_column_name = filter_info.column_name;
prewhere_info->remove_prewhere_column = filter_info.do_remove_column;
prewhere_info->need_filter = true;
}
else
{
where_filters.emplace_back(std::move(filter_info), std::move(description));
Expand All @@ -941,7 +948,7 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
if (row_policy_filter_info)
{
table_expression_data.setRowLevelFilterActions(row_policy_filter_info->actions.clone());
add_filter(*row_policy_filter_info, "Row-level security filter");
add_filter(*row_policy_filter_info, "Row-level security filter", true);
}

if (query_context->canUseParallelReplicasCustomKey())
Expand Down Expand Up @@ -1055,6 +1062,8 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
}
}

[[maybe_unused]] auto some_header = query_plan.getCurrentHeader();

auto parallel_replicas_enabled_for_storage = [](const StoragePtr & table, const Settings & query_settings)
{
if (!table->isMergeTree())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ UInt64 calculateHashFromStep(const SourceStepWithFilter & read)
if (const auto & snapshot = read.getStorageSnapshot())
hash.update(snapshot->storage.getStorageID().getFullTableName());
if (const auto & dag = read.getPrewhereInfo())
dag->prewhere_actions.updateHash(hash);
if (dag->prewhere_actions)
dag->prewhere_actions->updateHash(hash);
return hash.get64();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ ReadFromMergeTree * findReadingStep(const QueryPlan::Node & node)
ActionsDAG makeSourceDAG(ReadFromMergeTree & source)
{
if (const auto & prewhere_info = source.getPrewhereInfo())
return prewhere_info->prewhere_actions.clone();
if (prewhere_info->prewhere_actions)
return prewhere_info->prewhere_actions->clone();

return ActionsDAG(source.getOutputHeader()->getColumnsWithTypeAndName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ static void collectLazilyReadColumnNames(
if (prewhere_info->row_level_filter)
removeUsedColumnNames(*prewhere_info->row_level_filter, lazily_read_column_name_set, alias_index, prewhere_info->row_level_column_name);

removeUsedColumnNames(prewhere_info->prewhere_actions, lazily_read_column_name_set, alias_index, prewhere_info->prewhere_column_name);
removeUsedColumnNames(prewhere_info->prewhere_actions.value(), lazily_read_column_name_set, alias_index, prewhere_info->prewhere_column_name);
}

for (auto step_it = steps.rbegin(); step_it != steps.rend(); ++step_it)
Expand Down
50 changes: 40 additions & 10 deletions src/Processors/QueryPlan/Optimizations/optimizePrewhere.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,8 @@ ActionsDAG splitAndFillPrewhereInfo(
const std::list<const ActionsDAG::Node *> & prewhere_nodes_list)
{
prewhere_info->need_filter = true;
prewhere_info->remove_prewhere_column = remove_prewhere_column;

if (prewhere_info->remove_prewhere_column)
if (remove_prewhere_column)
{
removeFromOutput(filter_expression, filter_column_name);
auto & outputs = filter_expression.getOutputs();
Expand Down Expand Up @@ -94,22 +93,55 @@ ActionsDAG splitAndFillPrewhereInfo(
for (const auto * condition : prewhere_nodes_list)
conditions.push_back(split_result.split_nodes_mapping.at(condition));

/// Is it possible that prewhere_info->prewhere_actions was not empty?
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Just set prewhere manually.

/// Not sure, but just in case let's merge it
if (prewhere_info->prewhere_actions)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thing about this function splitAndFillPrewhereInfo is that the previous implementation simply did not care if PrewhereInfo already contained actions. It would simply override them. (Actually, this was not happening because there was an early return on optimizePrewhere that prevented the code from reaching this function).

All that needs to be done is to properly construct a DAG that is a conjuction of the optimized filter_expression + existing prewhere (preserving row level filters). I just don't know how to do it :).

{
split_result.first.mergeInplace(std::move(*prewhere_info->prewhere_actions));

const auto * existing_prewhere_node = &split_result.first.findInOutputs(prewhere_info->prewhere_column_name);
conditions.push_back(existing_prewhere_node);

/// Should this be done after or before the mergeInPlace?
if (prewhere_info->remove_prewhere_column)
{
removeFromOutput(split_result.first, prewhere_info->prewhere_column_name);
// split_result.first.removeUnusedActions();
}
}

{
std::unordered_set<const ActionsDAG::Node *> first_outputs(
split_result.first.getOutputs().begin(), split_result.first.getOutputs().end());
for (const auto * input : split_result.first.getInputs())
{
if (!first_outputs.contains(input))
{
split_result.first.getOutputs().push_back(input);
/// Add column to second actions as input.
/// Do not add it to result, so it would be removed.
split_result.second.addInput(input->result_name, input->result_type);
}
}
}

Comment on lines +106 to +127
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't know what's the right procedure to merge these conditions. This was the closest I got to it (a few lines below it'll and the functions together).

prewhere_info->prewhere_actions = std::move(split_result.first);

if (conditions.size() == 1)
{
prewhere_info->remove_prewhere_column = remove_prewhere_column;
prewhere_info->prewhere_column_name = conditions.front()->result_name;
if (prewhere_info->remove_prewhere_column)
prewhere_info->prewhere_actions.getOutputs().push_back(conditions.front());
prewhere_info->prewhere_actions->getOutputs().push_back(conditions.front());
}
else
{
prewhere_info->remove_prewhere_column = true;

FunctionOverloadResolverPtr func_builder_and = std::make_unique<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionAnd>());
const auto * node = &prewhere_info->prewhere_actions.addFunction(func_builder_and, std::move(conditions), {});
const auto * node = &prewhere_info->prewhere_actions->addFunction(func_builder_and, std::move(conditions), {});
prewhere_info->prewhere_column_name = node->result_name;
prewhere_info->prewhere_actions.getOutputs().push_back(node);
prewhere_info->prewhere_actions->getOutputs().push_back(node);
}

return std::move(split_result.second);
Expand Down Expand Up @@ -140,10 +172,6 @@ void optimizePrewhere(Stack & stack, QueryPlan::Nodes &)
if (!storage.canMoveConditionsToPrewhere())
return;

const auto & storage_prewhere_info = source_step_with_filter->getPrewhereInfo();
if (storage_prewhere_info)
return;

/// TODO: We can also check for UnionStep, such as StorageBuffer and local distributed plans.
QueryPlan::Node * filter_node = (stack.rbegin() + 1)->node;
auto * filter_step = typeid_cast<FilterStep *>(filter_node->step.get());
Expand Down Expand Up @@ -179,8 +207,10 @@ void optimizePrewhere(Stack & stack, QueryPlan::Nodes &)

Names queried_columns = source_step_with_filter->requiredSourceColumns();

const auto & storage_prewhere_info = source_step_with_filter->getPrewhereInfo();

const auto & source_filter_actions_dag = source_step_with_filter->getFilterActionsDAG();
MergeTreeWhereOptimizer where_optimizer{
MergeTreeWhereOptimizer where_optimizer{
std::move(column_compressed_sizes),
storage_metadata,
storage.getConditionSelectivityEstimatorByPredicate(storage_snapshot, source_filter_actions_dag ? &*source_filter_actions_dag : nullptr, context),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ void optimizePrimaryKeyConditionAndLimit(const Stack & stack)
const auto & storage_prewhere_info = source_step_with_filter->getPrewhereInfo();
if (storage_prewhere_info)
{
source_step_with_filter->addFilter(storage_prewhere_info->prewhere_actions.clone(), storage_prewhere_info->prewhere_column_name);
if (storage_prewhere_info->prewhere_actions)
source_step_with_filter->addFilter(storage_prewhere_info->prewhere_actions->clone(), storage_prewhere_info->prewhere_column_name);
if (storage_prewhere_info->row_level_filter)
source_step_with_filter->addFilter(storage_prewhere_info->row_level_filter->clone(), storage_prewhere_info->row_level_column_name);
}
Copy link
Copy Markdown
Contributor

@ilejn ilejn Aug 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me this code looks rather worrying (don't mean actual change).
It seems that some conditions like Limit applied before row policy. Is it true? If yes, is it risky from security standpoint?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The limit should be applied only after all the filters.

Also, it's used only in ReadFromPostgreSQL and ReadFromSystemNumbersStep as I can see.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,13 +179,13 @@ void buildSortingDAG(QueryPlan::Node & node, std::optional<ActionsDAG> & dag, Fi
IQueryPlanStep * step = node.step.get();
if (auto * reading = typeid_cast<ReadFromMergeTree *>(step))
{
if (const auto prewhere_info = reading->getPrewhereInfo())
if (const auto prewhere_info = reading->getPrewhereInfo(); prewhere_info && prewhere_info->prewhere_actions)
{
/// Should ignore limit if there is filtering.
limit = 0;

//std::cerr << "====== Adding prewhere " << std::endl;
appendExpression(dag, prewhere_info->prewhere_actions);
appendExpression(dag, prewhere_info->prewhere_actions.value());
if (const auto * filter_expression = dag->tryFindInOutputs(prewhere_info->prewhere_column_name))
appendFixedColumnsFromFilterExpression(*filter_expression, fixed_columns);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ std::optional<String> optimizeUseNormalProjections(Stack & stack, QueryPlan::Nod
Pipe pipe(std::make_shared<NullSource>(std::make_shared<const Block>(proj_snapshot->getSampleBlockForColumns(required_columns))));
if (projection_query_info.prewhere_info)
{
auto filter_actions = std::make_shared<ExpressionActions>(std::move(projection_query_info.prewhere_info->prewhere_actions));
auto filter_actions = std::make_shared<ExpressionActions>(std::move(projection_query_info.prewhere_info->prewhere_actions.value()));
pipe.addSimpleTransform(
[&](const SharedHeader & header)
{
Expand Down
Loading