Skip to content

Commit 0f3a53c

Browse files
Backport #78041 to 25.4: Distributed INSERT SELECT into ReplicatedMergeTree
1 parent d9ac1a5 commit 0f3a53c

27 files changed

+770
-161
lines changed

src/Core/Settings.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6304,6 +6304,9 @@ Index analysis done only on replica-coordinator and skipped on other replicas. E
63046304
)", BETA) \
63056305
DECLARE(Bool, parallel_replicas_only_with_analyzer, true, R"(
63066306
The analyzer should be enabled to use parallel replicas. With disabled analyzer query execution fallbacks to local execution, even if parallel reading from replicas is enabled. Using parallel replicas without the analyzer enabled is not supported
6307+
)", BETA) \
6308+
DECLARE(Bool, parallel_replicas_insert_select_local_pipeline, false, R"(
6309+
Use local pipeline during distributed INSERT SELECT with parallel replicas
63076310
)", BETA) \
63086311
DECLARE(Bool, parallel_replicas_for_cluster_engines, true, R"(
63096312
Replace table function engines with their -Cluster alternatives

src/Core/SettingsChangesHistory.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
9090
{"iceberg_snapshot_id", 0, 0, "New setting."},
9191
{"use_iceberg_metadata_files_cache", true, true, "New setting"},
9292
{"query_plan_join_shard_by_pk_ranges", false, false, "New setting"},
93+
{"parallel_replicas_insert_select_local_pipeline", false, false, "Use local pipeline during distributed INSERT SELECT with parallel replicas. Currently disabled due to performance issues"},
9394
/// Release closed. Please use 25.5
9495
});
9596
addSettingsChanges(settings_changes_history, "25.3",

src/Interpreters/ClusterProxy/executeQuery.cpp

Lines changed: 307 additions & 69 deletions
Large diffs are not rendered by default.

src/Interpreters/ClusterProxy/executeQuery.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
#include <Interpreters/Context_fwd.h>
55
#include <Parsers/IAST_fwd.h>
66

7+
#include <optional>
8+
79
namespace DB
810
{
911

@@ -38,6 +40,13 @@ using PlannerContextPtr = std::shared_ptr<PlannerContext>;
3840
class IQueryPlanStep;
3941
using QueryPlanStepPtr = std::unique_ptr<IQueryPlanStep>;
4042

43+
class ASTInsertQuery;
44+
45+
class QueryPipeline;
46+
47+
class ParallelReplicasReadingCoordinator;
48+
using ParallelReplicasReadingCoordinatorPtr = std::shared_ptr<ParallelReplicasReadingCoordinator>;
49+
4150
namespace ClusterProxy
4251
{
4352

@@ -58,7 +67,9 @@ using AdditionalShardFilterGenerator = std::function<ASTPtr(uint64_t)>;
5867
AdditionalShardFilterGenerator
5968
getShardFilterGeneratorForCustomKey(const Cluster & cluster, ContextPtr context, const ColumnsDescription & columns);
6069

70+
bool isSuitableForParallelReplicas(const ASTPtr & select, const ContextPtr & context);
6171
bool canUseParallelReplicasOnInitiator(const ContextPtr & context);
72+
ParallelReplicasReadingCoordinatorPtr dropReadFromRemoteInPlan(QueryPlan & query_plan);
6273

6374
/// Execute a distributed query, creating a query plan, from which the query pipeline can be built.
6475
/// `stream_factory` object encapsulates the logic of creating plans for a different type of query
@@ -79,6 +90,12 @@ void executeQuery(
7990
AdditionalShardFilterGenerator shard_filter_generator,
8091
bool is_remote_function);
8192

93+
std::optional<QueryPipeline> executeInsertSelectWithParallelReplicas(
94+
const ASTInsertQuery & query_ast,
95+
const ContextPtr & context,
96+
std::optional<QueryPipeline> pipeline = std::nullopt,
97+
std::optional<ParallelReplicasReadingCoordinatorPtr> coordinator = std::nullopt);
98+
8299
void executeQueryWithParallelReplicas(
83100
QueryPlan & query_plan,
84101
const StorageID & storage_id,

src/Interpreters/InterpreterInsertQuery.cpp

Lines changed: 174 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <Interpreters/Context_fwd.h>
2222
#include <Interpreters/ExpressionActions.h>
2323
#include <Interpreters/createSubcolumnsExtractionActions.h>
24+
#include <Interpreters/ClusterProxy/executeQuery.h>
2425
#include <Parsers/ASTConstraintDeclaration.h>
2526
#include <Parsers/ASTIdentifier.h>
2627
#include <Parsers/ASTFunction.h>
@@ -80,6 +81,9 @@ namespace Setting
8081
extern const SettingsSeconds lock_acquire_timeout;
8182
extern const SettingsUInt64 parallel_distributed_insert_select;
8283
extern const SettingsBool enable_parsing_to_custom_serialization;
84+
extern const SettingsUInt64 allow_experimental_parallel_reading_from_replicas;
85+
extern const SettingsBool parallel_replicas_local_plan;
86+
extern const SettingsBool parallel_replicas_insert_select_local_pipeline;
8387
}
8488

8589
namespace MergeTreeSetting
@@ -518,76 +522,29 @@ std::pair<std::vector<Chain>, std::vector<Chain>> InterpreterInsertQuery::buildP
518522
return {std::move(presink_chains), std::move(sink_chains)};
519523
}
520524

525+
static std::pair<QueryPipelineBuilder, ParallelReplicasReadingCoordinatorPtr>
526+
getLocalSelectPipelineForInserSelectWithParallelReplicas(const ASTPtr & select, const ContextPtr & context)
527+
{
528+
auto select_query_options = SelectQueryOptions(QueryProcessingStage::Complete, /*subquery_depth_=*/1);
521529

522-
QueryPipeline InterpreterInsertQuery::buildInsertSelectPipeline(ASTInsertQuery & query, StoragePtr table)
530+
InterpreterSelectQueryAnalyzer interpreter(select, context, select_query_options);
531+
auto & plan = interpreter.getQueryPlan();
532+
533+
/// Find reading steps for remote replicas and remove them,
534+
/// When building local pipeline, the local replica will be registered in the returned coordinator,
535+
/// and announce its snapshot. The snapshot will be used to assign read tasks to involved replicas
536+
/// So, the remote pipelines, which will be created later, should use the same coordinator
537+
auto parallel_replicas_coordinator = ClusterProxy::dropReadFromRemoteInPlan(plan);
538+
return {interpreter.buildQueryPipeline(), parallel_replicas_coordinator};
539+
}
540+
541+
QueryPipeline InterpreterInsertQuery::addInsertToSelectPipeline(ASTInsertQuery & query, StoragePtr table, QueryPipelineBuilder & pipeline)
523542
{
524543
const Settings & settings = getContext()->getSettingsRef();
525544

526545
auto metadata_snapshot = table->getInMemoryMetadataPtr();
527546
auto query_sample_block = getSampleBlock(query, table, metadata_snapshot, getContext(), no_destination, allow_materialized);
528547

529-
bool is_trivial_insert_select = false;
530-
531-
if (settings[Setting::optimize_trivial_insert_select])
532-
{
533-
const auto & select_query = query.select->as<ASTSelectWithUnionQuery &>();
534-
const auto & selects = select_query.list_of_selects->children;
535-
const auto & union_modes = select_query.list_of_modes;
536-
537-
/// ASTSelectWithUnionQuery is not normalized now, so it may pass some queries which can be Trivial select queries
538-
const auto mode_is_all = [](const auto & mode) { return mode == SelectUnionMode::UNION_ALL; };
539-
540-
is_trivial_insert_select =
541-
std::all_of(union_modes.begin(), union_modes.end(), std::move(mode_is_all))
542-
&& std::all_of(selects.begin(), selects.end(), isTrivialSelect);
543-
}
544-
545-
ContextPtr select_context = getContext();
546-
547-
if (is_trivial_insert_select)
548-
{
549-
/** When doing trivial INSERT INTO ... SELECT ... FROM table,
550-
* don't need to process SELECT with more than max_insert_threads
551-
* and it's reasonable to set block size for SELECT to the desired block size for INSERT
552-
* to avoid unnecessary squashing.
553-
*/
554-
555-
Settings new_settings = select_context->getSettingsCopy();
556-
557-
new_settings[Setting::max_threads] = std::max<UInt64>(1, settings[Setting::max_insert_threads]);
558-
559-
if (table->prefersLargeBlocks())
560-
{
561-
if (settings[Setting::min_insert_block_size_rows])
562-
new_settings[Setting::max_block_size] = settings[Setting::min_insert_block_size_rows];
563-
if (settings[Setting::min_insert_block_size_bytes])
564-
new_settings[Setting::preferred_block_size_bytes] = settings[Setting::min_insert_block_size_bytes];
565-
}
566-
567-
auto context_for_trivial_select = Context::createCopy(context);
568-
context_for_trivial_select->setSettings(new_settings);
569-
context_for_trivial_select->setInsertionTable(getContext()->getInsertionTable(), getContext()->getInsertionTableColumnNames());
570-
571-
select_context = context_for_trivial_select;
572-
}
573-
574-
QueryPipelineBuilder pipeline;
575-
576-
{
577-
auto select_query_options = SelectQueryOptions(QueryProcessingStage::Complete, 1);
578-
579-
if (settings[Setting::allow_experimental_analyzer])
580-
{
581-
InterpreterSelectQueryAnalyzer interpreter_select_analyzer(query.select, select_context, select_query_options);
582-
pipeline = interpreter_select_analyzer.buildQueryPipeline();
583-
}
584-
else
585-
{
586-
InterpreterSelectWithUnionQuery interpreter_select(query.select, select_context, select_query_options);
587-
pipeline = interpreter_select.buildQueryPipeline();
588-
}
589-
}
590-
591548
pipeline.dropTotalsAndExtremes();
592549

593550
/// Allow to insert Nullable into non-Nullable columns, NULL values will be added as defaults values.
@@ -744,6 +701,148 @@ QueryPipeline InterpreterInsertQuery::buildInsertSelectPipeline(ASTInsertQuery &
744701
return QueryPipelineBuilder::getPipeline(std::move(pipeline));
745702
}
746703

704+
QueryPipeline InterpreterInsertQuery::buildInsertSelectPipeline(ASTInsertQuery & query, StoragePtr table)
705+
{
706+
const Settings & settings = getContext()->getSettingsRef();
707+
708+
bool is_trivial_insert_select = false;
709+
710+
if (settings[Setting::optimize_trivial_insert_select])
711+
{
712+
const auto & select_query = query.select->as<ASTSelectWithUnionQuery &>();
713+
const auto & selects = select_query.list_of_selects->children;
714+
const auto & union_modes = select_query.list_of_modes;
715+
716+
/// ASTSelectWithUnionQuery is not normalized now, so it may pass some queries which can be Trivial select queries
717+
const auto mode_is_all = [](const auto & mode) { return mode == SelectUnionMode::UNION_ALL; };
718+
719+
is_trivial_insert_select =
720+
std::all_of(union_modes.begin(), union_modes.end(), std::move(mode_is_all))
721+
&& std::all_of(selects.begin(), selects.end(), isTrivialSelect);
722+
}
723+
724+
ContextPtr select_context = getContext();
725+
726+
if (is_trivial_insert_select)
727+
{
728+
/** When doing trivial INSERT INTO ... SELECT ... FROM table,
729+
* don't need to process SELECT with more than max_insert_threads
730+
* and it's reasonable to set block size for SELECT to the desired block size for INSERT
731+
* to avoid unnecessary squashing.
732+
*/
733+
734+
Settings new_settings = select_context->getSettingsCopy();
735+
736+
new_settings[Setting::max_threads] = std::max<UInt64>(1, settings[Setting::max_insert_threads]);
737+
738+
if (table->prefersLargeBlocks())
739+
{
740+
if (settings[Setting::min_insert_block_size_rows])
741+
new_settings[Setting::max_block_size] = settings[Setting::min_insert_block_size_rows];
742+
if (settings[Setting::min_insert_block_size_bytes])
743+
new_settings[Setting::preferred_block_size_bytes] = settings[Setting::min_insert_block_size_bytes];
744+
}
745+
746+
auto context_for_trivial_select = Context::createCopy(context);
747+
context_for_trivial_select->setSettings(new_settings);
748+
context_for_trivial_select->setInsertionTable(getContext()->getInsertionTable(), getContext()->getInsertionTableColumnNames());
749+
750+
select_context = context_for_trivial_select;
751+
}
752+
753+
QueryPipelineBuilder pipeline;
754+
755+
{
756+
auto select_query_options = SelectQueryOptions(QueryProcessingStage::Complete, 1);
757+
758+
if (settings[Setting::allow_experimental_analyzer])
759+
{
760+
InterpreterSelectQueryAnalyzer interpreter_select_analyzer(query.select, select_context, select_query_options);
761+
pipeline = interpreter_select_analyzer.buildQueryPipeline();
762+
}
763+
else
764+
{
765+
InterpreterSelectWithUnionQuery interpreter_select(query.select, select_context, select_query_options);
766+
pipeline = interpreter_select.buildQueryPipeline();
767+
}
768+
}
769+
770+
return addInsertToSelectPipeline(query, table, pipeline);
771+
}
772+
773+
std::pair<QueryPipeline, ParallelReplicasReadingCoordinatorPtr>
774+
InterpreterInsertQuery::buildLocalInsertSelectPipelineForParallelReplicas(ASTInsertQuery & query, const StoragePtr & table)
775+
{
776+
ContextPtr context_ptr = getContext();
777+
778+
const Settings & settings = context_ptr->getSettingsRef();
779+
780+
/** When doing trivial INSERT INTO ... SELECT ... FROM table,
781+
* don't need to process SELECT with more than max_insert_threads
782+
* and it's reasonable to set block size for SELECT to the desired block size for INSERT
783+
* to avoid unnecessary squashing.
784+
*/
785+
786+
Settings new_settings = context_ptr->getSettingsCopy();
787+
788+
new_settings[Setting::max_threads] = std::max<UInt64>(1, settings[Setting::max_insert_threads] + 1);
789+
790+
if (table->prefersLargeBlocks())
791+
{
792+
if (settings[Setting::min_insert_block_size_rows])
793+
new_settings[Setting::max_block_size] = settings[Setting::min_insert_block_size_rows];
794+
if (settings[Setting::min_insert_block_size_bytes])
795+
new_settings[Setting::preferred_block_size_bytes] = settings[Setting::min_insert_block_size_bytes];
796+
}
797+
798+
auto context_for_trivial_select = Context::createCopy(context_ptr);
799+
context_for_trivial_select->setSettings(new_settings);
800+
context_for_trivial_select->setInsertionTable(context_ptr->getInsertionTable(), context_ptr->getInsertionTableColumnNames());
801+
802+
auto [pipeline_builder, coordinator]
803+
= getLocalSelectPipelineForInserSelectWithParallelReplicas(query.select, context_for_trivial_select);
804+
auto local_pipeline = addInsertToSelectPipeline(query, table, pipeline_builder);
805+
return {std::move(local_pipeline), coordinator};
806+
}
807+
808+
std::optional<QueryPipeline> InterpreterInsertQuery::buildInsertSelectPipelineParallelReplicas(ASTInsertQuery & query, StoragePtr table)
809+
{
810+
auto context_ptr = getContext();
811+
const Settings & settings = context_ptr->getSettingsRef();
812+
if (!settings[Setting::allow_experimental_analyzer])
813+
return {};
814+
815+
if (!context_ptr->canUseParallelReplicasOnInitiator())
816+
return {};
817+
818+
if (settings[Setting::parallel_distributed_insert_select] != 2)
819+
return {};
820+
821+
const auto & select_query = query.select->as<ASTSelectWithUnionQuery &>();
822+
const auto & selects = select_query.list_of_selects->children;
823+
if (selects.size() > 1)
824+
return {};
825+
826+
if (!isTrivialSelect(selects.front()))
827+
return {};
828+
829+
if (!ClusterProxy::isSuitableForParallelReplicas(selects.front(), context_ptr))
830+
return {};
831+
832+
LOG_TRACE(
833+
getLogger("InterpreterInsertQuery"),
834+
"Building distributed insert select pipeline with parallel replicas: table={}",
835+
query.getTable());
836+
837+
if (settings[Setting::parallel_replicas_local_plan] && settings[Setting::parallel_replicas_insert_select_local_pipeline])
838+
{
839+
auto [local_pipeline, coordinator] = buildLocalInsertSelectPipelineForParallelReplicas(query, table);
840+
return ClusterProxy::executeInsertSelectWithParallelReplicas(query, context_ptr, std::move(local_pipeline), coordinator);
841+
}
842+
843+
return ClusterProxy::executeInsertSelectWithParallelReplicas(query, context_ptr);
844+
}
845+
747846

748847
QueryPipeline InterpreterInsertQuery::buildInsertPipeline(ASTInsertQuery & query, StoragePtr table)
749848
{
@@ -822,10 +921,11 @@ QueryPipeline InterpreterInsertQuery::buildInsertPipeline(ASTInsertQuery & query
822921

823922
BlockIO InterpreterInsertQuery::execute()
824923
{
825-
const Settings & settings = getContext()->getSettingsRef();
924+
auto context = getContext();
925+
const Settings & settings = context->getSettingsRef();
826926
auto & query = query_ptr->as<ASTInsertQuery &>();
827927

828-
if (getContext()->getServerSettings()[ServerSetting::disable_insertion_and_mutation]
928+
if (context->getServerSettings()[ServerSetting::disable_insertion_and_mutation]
829929
&& query.table_id.database_name != DatabaseCatalog::SYSTEM_DATABASE
830930
&& query.table_id.database_name != DatabaseCatalog::TEMPORARY_DATABASE)
831931
{
@@ -840,15 +940,15 @@ BlockIO InterpreterInsertQuery::execute()
840940
if (query.partition_by && !table->supportsPartitionBy())
841941
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "PARTITION BY clause is not supported by storage");
842942

843-
auto table_lock = table->lockForShare(getContext()->getInitialQueryId(), settings[Setting::lock_acquire_timeout]);
943+
auto table_lock = table->lockForShare(context->getInitialQueryId(), settings[Setting::lock_acquire_timeout]);
844944

845945
auto metadata_snapshot = table->getInMemoryMetadataPtr();
846-
auto query_sample_block = getSampleBlock(query, table, metadata_snapshot, getContext(), no_destination, allow_materialized);
946+
auto query_sample_block = getSampleBlock(query, table, metadata_snapshot, context, no_destination, allow_materialized);
847947

848948
/// For table functions we check access while executing
849949
/// getTable() -> ITableFunction::execute().
850950
if (!query.table_function)
851-
getContext()->checkAccess(AccessType::INSERT, query.table_id, query_sample_block.getNames());
951+
context->checkAccess(AccessType::INSERT, query.table_id, query_sample_block.getNames());
852952

853953
if (!allow_materialized)
854954
{
@@ -858,25 +958,24 @@ BlockIO InterpreterInsertQuery::execute()
858958
}
859959

860960
BlockIO res;
861-
862961
if (query.select)
863962
{
864963
if (settings[Setting::parallel_distributed_insert_select])
865964
{
866-
auto distributed = table->distributedWrite(query, getContext());
965+
auto distributed = table->distributedWrite(query, context);
867966
if (distributed)
868967
{
869968
res.pipeline = std::move(*distributed);
870969
}
871-
else
970+
if (!res.pipeline.initialized() && context->canUseParallelReplicasOnInitiator())
872971
{
873-
res.pipeline = buildInsertSelectPipeline(query, table);
972+
auto pipeline = buildInsertSelectPipelineParallelReplicas(query, table);
973+
if (pipeline)
974+
res.pipeline = std::move(*pipeline);
874975
}
875976
}
876-
else
877-
{
977+
if (!res.pipeline.initialized())
878978
res.pipeline = buildInsertSelectPipeline(query, table);
879-
}
880979
}
881980
else
882981
{

0 commit comments

Comments
 (0)