Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 72 additions & 1 deletion src/Storages/StorageDistributed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <Common/Macros.h>
#include <Common/escapeForFileName.h>
#include <Common/typeid_cast.h>
#include <Common/quoteString.h>

#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTExpressionList.h>
Expand Down Expand Up @@ -73,6 +74,7 @@ namespace ErrorCodes
extern const int NO_SUCH_COLUMN_IN_TABLE;
extern const int TOO_MANY_ROWS;
extern const int UNABLE_TO_SKIP_UNUSED_SHARDS;
extern const int LOGICAL_ERROR;
}

namespace ActionLocks
Expand Down Expand Up @@ -378,8 +380,77 @@ StoragePtr StorageDistributed::createWithOwnCluster(
}


QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(const Context &context, QueryProcessingStage::Enum to_stage, const ASTPtr &query_ptr) const
bool StorageDistributed::canForceGroupByNoMerge(const Context &context, QueryProcessingStage::Enum to_stage, const ASTPtr & query_ptr) const
{
const auto & settings = context.getSettingsRef();
std::string reason;

if (settings.distributed_group_by_no_merge)
return true;
/// Distributed-over-Distributed (see getQueryProcessingStageImpl())
if (to_stage == QueryProcessingStage::WithMergeableState)
return false;
if (!settings.optimize_skip_unused_shards)
return false;
if (!has_sharding_key)
return false;

const auto & select = query_ptr->as<ASTSelectQuery &>();

if (select.orderBy())
return false;

if (select.distinct)
{
for (auto & expr : select.select()->children)
{
auto id = expr->as<ASTIdentifier>();
if (!id)
return false;
if (!sharding_key_expr->getSampleBlock().has(id->name))
return false;
}

reason = "DISTINCT " + backQuote(serializeAST(*select.select(), true));
}

// This can use distributed_group_by_no_merge but in this case limit stage
// should be done later (which is not the case right now).
if (select.limitBy() || select.limitLength())
return false;

const ASTPtr group_by = select.groupBy();
if (!group_by)
{
if (!select.distinct)
return false;
}
else
{
// injective functions are optimized out in optimizeGroupBy()
// hence all we need to check is that column in GROUP BY matches sharding expression
auto & group_exprs = group_by->children;
if (group_exprs.empty())
throw Exception("No ASTExpressionList in GROUP BY", ErrorCodes::LOGICAL_ERROR);

auto id = group_exprs[0]->as<ASTIdentifier>();
if (!id)
return false;
if (!sharding_key_expr->getSampleBlock().has(id->name))
return false;

reason = "GROUP BY " + backQuote(serializeAST(*group_by, true));
}

LOG_DEBUG(log, "Force distributed_group_by_no_merge for " << reason << " (injective)");
return true;
}

QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(const Context &context, QueryProcessingStage::Enum to_stage, const ASTPtr & query_ptr) const
{
if (canForceGroupByNoMerge(context, to_stage, query_ptr))
return QueryProcessingStage::Complete;

auto cluster = getOptimizedCluster(context, query_ptr);
return getQueryProcessingStageImpl(context, to_stage, cluster);
}
Expand Down
4 changes: 3 additions & 1 deletion src/Storages/StorageDistributed.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ class StorageDistributed final : public ext::shared_ptr_helper<StorageDistribute

bool isRemote() const override { return true; }

QueryProcessingStage::Enum getQueryProcessingStage(const Context &, QueryProcessingStage::Enum /*to_stage*/, const ASTPtr &) const override;
/// Return true if distributed_group_by_no_merge may be applied.
bool canForceGroupByNoMerge(const Context &, QueryProcessingStage::Enum to_stage, const ASTPtr &) const;
QueryProcessingStage::Enum getQueryProcessingStage(const Context &, QueryProcessingStage::Enum to_stage, const ASTPtr &) const override;

Pipes read(
const Names & column_names,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@ optimize_skip_unused_shards
optimize_skip_unused_shards lack of WHERE
0
1
0
1
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ SELECT DISTINCT id FROM dist_01213 WHERE id = 1 SETTINGS distributed_group_by_no
SELECT 'optimize_skip_unused_shards';
SELECT DISTINCT id FROM dist_01213 WHERE id = 1 SETTINGS optimize_skip_unused_shards=1;
-- check that querying all shards is ok
-- (there will be duplicates, since the INSERT was done via local table)
SELECT 'optimize_skip_unused_shards lack of WHERE';
SELECT DISTINCT id FROM dist_01213 SETTINGS optimize_skip_unused_shards=1;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
-
0
1
0
1
optimize_skip_unused_shards
0
1
0
1
GROUP BY number
1 0
1 1
1 0
1 1
GROUP BY number distributed_group_by_no_merge
1 0
1 1
1 0
1 1
GROUP BY number, 1
1 0
1 1
1 0
1 1
GROUP BY 1
4 0
GROUP BY number ORDER BY number DESC
2 1
2 0
GROUP BY toString(number)
1 0
1 1
1 0
1 1
GROUP BY number%2
2 0
2 1
countDistinct
2
countDistinct GROUP BY number
1
1
1
1
DISTINCT
0
1
0
1
HAVING
LIMIT
2 0
2 1
LIMIT BY
2 0
2 1
GROUP BY (Distributed-over-Distributed)
4 0
4 1
GROUP BY (Distributed-over-Distributed) distributed_group_by_no_merge
1 0
1 1
1 0
1 1
1 0
1 1
1 0
1 1
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
drop table if exists dist_01247;
drop table if exists data_01247;

create table data_01247 as system.numbers engine=Memory();
insert into data_01247 select * from system.numbers limit 2;
create table dist_01247 as data_01247 engine=Distributed(test_cluster_two_shards, currentDatabase(), data_01247, number);
-- since data is not inserted via distributed it will have duplicates
-- (and this is how we ensure that this optimization will work)

set max_distributed_connections=1;

select '-';
select * from dist_01247;

select 'optimize_skip_unused_shards';
set optimize_skip_unused_shards=1;
select * from dist_01247;

select 'GROUP BY number';
select count(), * from dist_01247 group by number;
select 'GROUP BY number distributed_group_by_no_merge';
select count(), * from dist_01247 group by number settings distributed_group_by_no_merge=1;

-- dumb, but should work, since "GROUP BY 1" optimized out
select 'GROUP BY number, 1';
select count(), * from dist_01247 group by number, 1;
select 'GROUP BY 1';
select count(), min(number) from dist_01247 group by 1;

select 'GROUP BY number ORDER BY number DESC';
select count(), * from dist_01247 group by number order by number desc;

select 'GROUP BY toString(number)';
select count(), * from dist_01247 group by toString(number);

select 'GROUP BY number%2';
select count(), any(number) from dist_01247 group by number%2;

select 'countDistinct';
select count(DISTINCT number) from dist_01247;

select 'countDistinct GROUP BY number';
select count(DISTINCT number) from dist_01247 group by number;

select 'DISTINCT';
select DISTINCT number from dist_01247;

select 'HAVING';
select count() cnt, * from dist_01247 group by number having cnt < 0;

select 'LIMIT';
select count(), * from dist_01247 group by number limit 1;
select count(), * from dist_01247 group by number limit 1 offset 1;

select 'LIMIT BY';
select count(), * from dist_01247 group by number limit 0 by number;
select count(), * from dist_01247 group by number limit 1 by number;

select 'GROUP BY (Distributed-over-Distributed)';
select count(), * from cluster(test_cluster_two_shards, currentDatabase(), dist_01247) group by number;
select 'GROUP BY (Distributed-over-Distributed) distributed_group_by_no_merge';
select count(), * from cluster(test_cluster_two_shards, currentDatabase(), dist_01247) group by number settings distributed_group_by_no_merge=1;