Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
4c36cd1
Add converting step for 'join using'
vdimir Jan 31, 2021
f5b9801
Fix converting right key type in join using
vdimir Feb 1, 2021
d15c1a2
Fix TableJoin, upd 01675_join_implicit_cast
vdimir Feb 3, 2021
435f63f
Calculate common type for join using in TreeRewriter
vdimir Feb 9, 2021
4203dd5
Give up on name mismatch in inferJoinKeyCommonType
vdimir Feb 9, 2021
9b79ab2
Fix style in appendJoin
vdimir Feb 10, 2021
cd7d958
Add tests to 01720_join_implicit_cast - switch, use_nulls
vdimir Feb 17, 2021
a378bd0
Perform implicit type conversion for JOIN ON keys
vdimir Feb 18, 2021
3a7eddc
Remove addRequiredLeftColumn, some tests for join on different types
vdimir Feb 18, 2021
456414b
Fix converting join on keys, move actions into TableJoin
vdimir Feb 18, 2021
1e37d7c
Add comments to TableJoin::inferJoinKeyCommonType
vdimir Feb 18, 2021
ab0719c
More tests for join on key type convert
vdimir Feb 19, 2021
dc9e660
Remove 'error' from comment from 01710_join_use_nulls
vdimir Feb 19, 2021
1a26310
Split test join_implicit_cast (due to timeouts in flaky checks)
vdimir Feb 19, 2021
7b8b77f
Add tests to join_implicit_cast checks column name clash
vdimir Feb 19, 2021
4c6cde4
Merge remote-tracking branch 'upstream/master' into join-cast-types-v2
vdimir Feb 25, 2021
b7c7c97
Merge branch 'master' into join-cast-types-v2
vdimir Mar 1, 2021
e6406c3
Refactor join: make IJoin implementations independent from TableJoin
vdimir Mar 5, 2021
b0f6f30
Revert "Refactor join: make IJoin implementations independent from Ta…
vdimir Mar 5, 2021
5c18bbe
Clean converting-related fields in TableJoin::resetCollected
vdimir Mar 5, 2021
4a8708f
Clean subqueries_for_sets before second analyze in InterpreterSelectQ…
vdimir Mar 5, 2021
3da3794
Add test for cast join on keys with second analyze
vdimir Mar 5, 2021
354757d
Add logging for need_analyze_again in InterpreterSelectQuery
vdimir Mar 8, 2021
cc770ad
Fixes for join key inferring
vdimir Mar 8, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 43 additions & 15 deletions src/Interpreters/ActionsDAG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -679,14 +679,19 @@ ActionsDAGPtr ActionsDAG::makeConvertingActions(
const ColumnsWithTypeAndName & source,
const ColumnsWithTypeAndName & result,
MatchColumnsMode mode,
bool ignore_constant_values)
bool ignore_constant_values,
bool add_casted_columns,
NameToNameMap * new_names)
{
size_t num_input_columns = source.size();
size_t num_result_columns = result.size();

if (mode == MatchColumnsMode::Position && num_input_columns != num_result_columns)
throw Exception("Number of columns doesn't match", ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH);

if (add_casted_columns && mode != MatchColumnsMode::Name)
throw Exception("Converting with add_casted_columns supported only for MatchColumnsMode::Name", ErrorCodes::LOGICAL_ERROR);

auto actions_dag = std::make_shared<ActionsDAG>(source);
std::vector<Node *> projection(num_result_columns);

Expand All @@ -706,12 +711,13 @@ ActionsDAGPtr ActionsDAG::makeConvertingActions(
{
const auto & res_elem = result[result_col_num];
Node * src_node = nullptr;
Node * dst_node = nullptr;

switch (mode)
{
case MatchColumnsMode::Position:
{
src_node = actions_dag->inputs[result_col_num];
src_node = dst_node = actions_dag->inputs[result_col_num];
break;
}

Expand All @@ -722,7 +728,7 @@ ActionsDAGPtr ActionsDAG::makeConvertingActions(
throw Exception("Cannot find column " + backQuote(res_elem.name) + " in source stream",
ErrorCodes::THERE_IS_NO_COLUMN);

src_node = actions_dag->inputs[input.front()];
src_node = dst_node = actions_dag->inputs[input.front()];
input.pop_front();
break;
}
Expand All @@ -731,10 +737,10 @@ ActionsDAGPtr ActionsDAG::makeConvertingActions(
/// Check constants.
if (const auto * res_const = typeid_cast<const ColumnConst *>(res_elem.column.get()))
{
if (const auto * src_const = typeid_cast<const ColumnConst *>(src_node->column.get()))
if (const auto * src_const = typeid_cast<const ColumnConst *>(dst_node->column.get()))
{
if (ignore_constant_values)
src_node = const_cast<Node *>(&actions_dag->addColumn(res_elem, true));
dst_node = const_cast<Node *>(&actions_dag->addColumn(res_elem, true));
else if (res_const->getField() != src_const->getField())
throw Exception("Cannot convert column " + backQuote(res_elem.name) + " because "
"it is constant but values of constants are different in source and result",
Expand All @@ -747,35 +753,57 @@ ActionsDAGPtr ActionsDAG::makeConvertingActions(
}

/// Add CAST function to convert into result type if needed.
if (!res_elem.type->equals(*src_node->result_type))
if (!res_elem.type->equals(*dst_node->result_type))
{
ColumnWithTypeAndName column;
column.name = res_elem.type->getName();
column.column = DataTypeString().createColumnConst(0, column.name);
column.type = std::make_shared<DataTypeString>();

auto * right_arg = const_cast<Node *>(&actions_dag->addColumn(std::move(column), true));
auto * left_arg = src_node;
auto * left_arg = dst_node;

FunctionCast::Diagnostic diagnostic = {src_node->result_name, res_elem.name};
FunctionCast::Diagnostic diagnostic = {dst_node->result_name, res_elem.name};
FunctionOverloadResolverPtr func_builder_cast =
std::make_shared<FunctionOverloadResolverAdaptor>(
CastOverloadResolver<CastType::nonAccurate>::createImpl(false, std::move(diagnostic)));

Inputs children = { left_arg, right_arg };
src_node = &actions_dag->addFunction(func_builder_cast, std::move(children), {}, true);
dst_node = &actions_dag->addFunction(func_builder_cast, std::move(children), {}, true);
}

if (src_node->column && isColumnConst(*src_node->column) && !(res_elem.column && isColumnConst(*res_elem.column)))
if (dst_node->column && isColumnConst(*dst_node->column) && !(res_elem.column && isColumnConst(*res_elem.column)))
{
Inputs children = {src_node};
src_node = &actions_dag->addFunction(func_builder_materialize, std::move(children), {}, true);
Inputs children = {dst_node};
dst_node = &actions_dag->addFunction(func_builder_materialize, std::move(children), {}, true);
}

if (src_node->result_name != res_elem.name)
src_node = &actions_dag->addAlias(*src_node, res_elem.name, true);
if (dst_node->result_name != res_elem.name)
{
if (add_casted_columns)
{
if (inputs.contains(dst_node->result_name))
throw Exception("Cannot convert column " + backQuote(res_elem.name) +
" to "+ backQuote(dst_node->result_name) +
" because other column have same name",
ErrorCodes::ILLEGAL_COLUMN);
if (new_names)
new_names->emplace(res_elem.name, dst_node->result_name);

projection[result_col_num] = src_node;
/// Leave current column on same place, add converted to back
projection[result_col_num] = src_node;
projection.push_back(dst_node);
}
else
{
dst_node = &actions_dag->addAlias(*dst_node, res_elem.name, true);
projection[result_col_num] = dst_node;
}
}
else
{
projection[result_col_num] = dst_node;
}
}

actions_dag->removeUnusedActions(projection);
Expand Down
6 changes: 5 additions & 1 deletion src/Interpreters/ActionsDAG.h
Original file line number Diff line number Diff line change
Expand Up @@ -248,11 +248,15 @@ class ActionsDAG
/// Create ActionsDAG which converts block structure from source to result.
/// It is needed to convert result from different sources to the same structure, e.g. for UNION query.
/// Conversion should be possible with only usage of CAST function and renames.
/// @param ignore_constant_values - Do not check that constants are same. Use value from result_header.
/// @param add_casted_columns - Create new columns with converted values instead of replacing original.
static ActionsDAGPtr makeConvertingActions(
const ColumnsWithTypeAndName & source,
const ColumnsWithTypeAndName & result,
MatchColumnsMode mode,
bool ignore_constant_values = false); /// Do not check that constants are same. Use value from result_header.
bool ignore_constant_values = false,
bool add_casted_columns = false,
NameToNameMap * new_names = nullptr);

/// Create expression which add const column and then materialize it.
static ActionsDAGPtr makeAddingColumnActions(ColumnWithTypeAndName column);
Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/ActionsVisitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ void ActionsMatcher::visit(const ASTIdentifier & identifier, const ASTPtr & ast,
if (column_name_type.name == column_name)
{
throw Exception("Column " + backQuote(column_name) + " is not under aggregate function and not in GROUP BY",
ErrorCodes::NOT_AN_AGGREGATE);
ErrorCodes::NOT_AN_AGGREGATE);
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/Interpreters/ExpressionActions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ ExpressionActionsChain::JoinStep::JoinStep(
for (const auto & column : result_columns)
required_columns.emplace_back(column.name, column.type);

analyzed_join->addJoinedColumnsAndCorrectNullability(result_columns);
analyzed_join->addJoinedColumnsAndCorrectTypes(result_columns);
}

void ExpressionActionsChain::JoinStep::finalize(const Names & required_output_)
Expand All @@ -747,8 +747,8 @@ void ExpressionActionsChain::JoinStep::finalize(const Names & required_output_)
}

/// Result will also contain joined columns.
for (const auto & column : analyzed_join->columnsAddedByJoin())
required_names.emplace(column.name);
for (const auto & column_name : analyzed_join->columnsAddedByJoin())
required_names.emplace(column_name);

for (const auto & column : result_columns)
{
Expand Down
41 changes: 25 additions & 16 deletions src/Interpreters/ExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,13 @@

#include <DataTypes/DataTypeFactory.h>
#include <Parsers/parseQuery.h>
#include <Interpreters/interpretSubquery.h>
#include <Interpreters/DatabaseAndTableWithAlias.h>
#include <Interpreters/misc.h>

#include <Interpreters/ActionsVisitor.h>

#include <Interpreters/GlobalSubqueriesVisitor.h>
#include <Interpreters/GetAggregatesVisitor.h>
#include <Interpreters/GlobalSubqueriesVisitor.h>
#include <Interpreters/interpretSubquery.h>
#include <Interpreters/join_common.h>
#include <Interpreters/misc.h>

#include <IO/Operators.h>
#include <IO/WriteBufferFromString.h>
Expand Down Expand Up @@ -208,13 +207,12 @@ void ExpressionAnalyzer::analyzeAggregation()
{
getRootActionsNoMakeSet(analyzedJoin().leftKeysList(), true, temp_actions, false);
auto sample_columns = temp_actions->getResultColumns();
analyzedJoin().addJoinedColumnsAndCorrectNullability(sample_columns);
analyzedJoin().addJoinedColumnsAndCorrectTypes(sample_columns);
temp_actions = std::make_shared<ActionsDAG>(sample_columns);
}

columns_after_join = columns_after_array_join;
const auto & added_by_join = analyzedJoin().columnsAddedByJoin();
columns_after_join.insert(columns_after_join.end(), added_by_join.begin(), added_by_join.end());
analyzedJoin().addJoinedColumnsAndCorrectTypes(columns_after_join, false);
}

has_aggregation = makeAggregateDescriptions(temp_actions);
Expand Down Expand Up @@ -724,13 +722,17 @@ bool SelectQueryExpressionAnalyzer::appendJoinLeftKeys(ExpressionActionsChain &

JoinPtr SelectQueryExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain)
{
JoinPtr table_join = makeTableJoin(*syntax->ast_join);

ExpressionActionsChain::Step & step = chain.lastStep(columns_after_array_join);
const ColumnsWithTypeAndName & left_sample_columns = chain.getLastStep().getResultColumns();
JoinPtr table_join = makeTableJoin(*syntax->ast_join, left_sample_columns);

chain.steps.push_back(std::make_unique<ExpressionActionsChain::JoinStep>(
syntax->analyzed_join, table_join, step.getResultColumns()));
if (syntax->analyzed_join->needConvert())
{
chain.steps.push_back(std::make_unique<ExpressionActionsChain::ExpressionActionsStep>(syntax->analyzed_join->leftConvertingActions()));
chain.addStep();
}

ExpressionActionsChain::Step & step = chain.lastStep(columns_after_array_join);
chain.steps.push_back(std::make_unique<ExpressionActionsChain::JoinStep>(syntax->analyzed_join, table_join, step.getResultColumns()));
chain.addStep();
return table_join;
}
Expand Down Expand Up @@ -795,7 +797,8 @@ static std::shared_ptr<IJoin> makeJoin(std::shared_ptr<TableJoin> analyzed_join,
return std::make_shared<JoinSwitcher>(analyzed_join, sample_block);
}

JoinPtr SelectQueryExpressionAnalyzer::makeTableJoin(const ASTTablesInSelectQueryElement & join_element)
JoinPtr SelectQueryExpressionAnalyzer::makeTableJoin(
const ASTTablesInSelectQueryElement & join_element, const ColumnsWithTypeAndName & left_sample_columns)
{
/// Two JOINs are not supported with the same subquery, but different USINGs.
auto join_hash = join_element.getTreeHash();
Expand Down Expand Up @@ -831,7 +834,13 @@ JoinPtr SelectQueryExpressionAnalyzer::makeTableJoin(const ASTTablesInSelectQuer
}

/// TODO You do not need to set this up when JOIN is only needed on remote servers.
subquery_for_join.setJoinActions(joined_block_actions); /// changes subquery_for_join.sample_block inside
subquery_for_join.addJoinActions(joined_block_actions); /// changes subquery_for_join.sample_block inside

const ColumnsWithTypeAndName & right_sample_columns = subquery_for_join.sample_block.getColumnsWithTypeAndName();
bool need_convert = syntax->analyzed_join->applyJoinKeyConvert(left_sample_columns, right_sample_columns);
if (need_convert)
subquery_for_join.addJoinActions(std::make_shared<ExpressionActions>(syntax->analyzed_join->rightConvertingActions()));

subquery_for_join.join = makeJoin(syntax->analyzed_join, subquery_for_join.sample_block, context);

/// Do not make subquery for join over dictionary.
Expand Down Expand Up @@ -1426,9 +1435,9 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
if (query_analyzer.hasTableJoin())
{
query_analyzer.appendJoinLeftKeys(chain, only_types || !first_stage);

before_join = chain.getLastActions();
join = query_analyzer.appendJoin(chain);
converting_join_columns = query_analyzer.analyzedJoin().leftConvertingActions();
chain.addStep();
}

Expand Down
14 changes: 9 additions & 5 deletions src/Interpreters/ExpressionAnalyzer.h
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
#pragma once

#include <DataStreams/IBlockStream_fwd.h>
#include <Columns/FilterDescription.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <Interpreters/AggregateDescription.h>
#include <Interpreters/WindowDescription.h>
#include <Interpreters/TreeRewriter.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/SubqueryForSet.h>
#include <Interpreters/TreeRewriter.h>
#include <Interpreters/WindowDescription.h>
#include <Interpreters/join_common.h>
#include <Parsers/IAST_fwd.h>
#include <Storages/IStorage_fwd.h>
#include <Storages/SelectQueryInfo.h>
#include <Interpreters/DatabaseCatalog.h>

namespace DB
{
Expand Down Expand Up @@ -199,6 +200,7 @@ struct ExpressionAnalysisResult
ActionsDAGPtr before_array_join;
ArrayJoinActionPtr array_join;
ActionsDAGPtr before_join;
ActionsDAGPtr converting_join_columns;
JoinPtr join;
ActionsDAGPtr before_where;
ActionsDAGPtr before_aggregation;
Expand Down Expand Up @@ -313,7 +315,9 @@ class SelectQueryExpressionAnalyzer : public ExpressionAnalyzer
/// Create Set-s that we make from IN section to use index on them.
void makeSetsForIndex(const ASTPtr & node);

JoinPtr makeTableJoin(const ASTTablesInSelectQueryElement & join_element);
JoinPtr makeTableJoin(
const ASTTablesInSelectQueryElement & join_element,
const ColumnsWithTypeAndName & left_sample_columns);

const ASTSelectQuery * getAggregatingQuery() const;

Expand Down
22 changes: 17 additions & 5 deletions src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,10 @@ InterpreterSelectQuery::InterpreterSelectQuery(

if (need_analyze_again)
{
subquery_for_sets = std::move(query_analyzer->getSubqueriesForSets());
LOG_TRACE(log, "Running 'analyze' second time");
query_analyzer->getSubqueriesForSets().clear();
subquery_for_sets = SubqueriesForSets();

/// Do not try move conditions to PREWHERE for the second time.
/// Otherwise, we won't be able to fallback from inefficient PREWHERE to WHERE later.
analyze(/* try_move_to_prewhere = */ false);
Expand Down Expand Up @@ -1002,14 +1005,22 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
query_plan.addStep(std::move(before_join_step));
}

/// Optional step to convert key columns to common supertype.
/// Columns with changed types will be returned to user,
/// so its only suitable for `USING` join.
if (expressions.converting_join_columns)
{
QueryPlanStepPtr convert_join_step = std::make_unique<ExpressionStep>(
query_plan.getCurrentDataStream(),
expressions.converting_join_columns);
convert_join_step->setStepDescription("Convert JOIN columns");
query_plan.addStep(std::move(convert_join_step));
}

if (expressions.hasJoin())
{
Block join_result_sample;
JoinPtr join = expressions.join;

join_result_sample = JoiningTransform::transformHeader(
query_plan.getCurrentDataStream().header, expressions.join);

QueryPlanStepPtr join_step = std::make_unique<JoinStep>(
query_plan.getCurrentDataStream(),
expressions.join);
Expand All @@ -1019,6 +1030,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu

if (expressions.join_has_delayed_stream)
{
const Block & join_result_sample = query_plan.getCurrentDataStream().header;
auto stream = std::make_shared<LazyNonJoinedBlockInputStream>(*join, join_result_sample, settings.max_block_size);
auto source = std::make_shared<SourceFromInputStream>(std::move(stream));
auto add_non_joined_rows_step = std::make_unique<AddingDelayedSourceStep>(
Expand Down
14 changes: 12 additions & 2 deletions src/Interpreters/SubqueryForSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,20 @@ void SubqueryForSet::renameColumns(Block & block)
}
}

void SubqueryForSet::setJoinActions(ExpressionActionsPtr actions)
void SubqueryForSet::addJoinActions(ExpressionActionsPtr actions)
{
actions->execute(sample_block);
joined_block_actions = actions;
if (joined_block_actions == nullptr)
{
joined_block_actions = actions;
}
else
{
auto new_dag = ActionsDAG::merge(
std::move(*joined_block_actions->getActionsDAG().clone()),
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we split joined_block_actions into separate variables not to use merge here? Now it consist from createJoinedBlockActions and additional converting step. @KochetovNicolai

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand, this actions are applied to right part of join. And we don't use separate step for this actions for now. So, I think it is ok to merge.

std::move(*actions->getActionsDAG().clone()));
joined_block_actions = std::make_shared<ExpressionActions>(new_dag);
}
}

bool SubqueryForSet::insertJoinedBlock(Block & block)
Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/SubqueryForSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ struct SubqueryForSet
void makeSource(std::shared_ptr<InterpreterSelectWithUnionQuery> & interpreter,
NamesWithAliases && joined_block_aliases_);

void setJoinActions(ExpressionActionsPtr actions);
void addJoinActions(ExpressionActionsPtr actions);

bool insertJoinedBlock(Block & block);
void setTotals(Block totals);
Expand Down
Loading