2121#include < Interpreters/Context_fwd.h>
2222#include < Interpreters/ExpressionActions.h>
2323#include < Interpreters/createSubcolumnsExtractionActions.h>
24+ #include < Interpreters/ClusterProxy/executeQuery.h>
2425#include < Parsers/ASTConstraintDeclaration.h>
2526#include < Parsers/ASTIdentifier.h>
2627#include < Parsers/ASTFunction.h>
@@ -80,6 +81,9 @@ namespace Setting
8081 extern const SettingsSeconds lock_acquire_timeout;
8182 extern const SettingsUInt64 parallel_distributed_insert_select;
8283 extern const SettingsBool enable_parsing_to_custom_serialization;
84+ extern const SettingsUInt64 allow_experimental_parallel_reading_from_replicas;
85+ extern const SettingsBool parallel_replicas_local_plan;
86+ extern const SettingsBool parallel_replicas_insert_select_local_pipeline;
8387}
8488
8589namespace MergeTreeSetting
@@ -518,76 +522,29 @@ std::pair<std::vector<Chain>, std::vector<Chain>> InterpreterInsertQuery::buildP
518522 return {std::move (presink_chains), std::move (sink_chains)};
519523}
520524
525+ static std::pair<QueryPipelineBuilder, ParallelReplicasReadingCoordinatorPtr>
526+ getLocalSelectPipelineForInserSelectWithParallelReplicas (const ASTPtr & select, const ContextPtr & context)
527+ {
528+ auto select_query_options = SelectQueryOptions (QueryProcessingStage::Complete, /* subquery_depth_=*/ 1 );
521529
522- QueryPipeline InterpreterInsertQuery::buildInsertSelectPipeline (ASTInsertQuery & query, StoragePtr table)
530+ InterpreterSelectQueryAnalyzer interpreter (select, context, select_query_options);
531+ auto & plan = interpreter.getQueryPlan ();
532+
533+ // / Find reading steps for remote replicas and remove them,
534+ // / When building local pipeline, the local replica will be registered in the returned coordinator,
535+ // / and announce its snapshot. The snapshot will be used to assign read tasks to involved replicas
536+ // / So, the remote pipelines, which will be created later, should use the same coordinator
537+ auto parallel_replicas_coordinator = ClusterProxy::dropReadFromRemoteInPlan (plan);
538+ return {interpreter.buildQueryPipeline (), parallel_replicas_coordinator};
539+ }
540+
541+ QueryPipeline InterpreterInsertQuery::addInsertToSelectPipeline (ASTInsertQuery & query, StoragePtr table, QueryPipelineBuilder & pipeline)
523542{
524543 const Settings & settings = getContext ()->getSettingsRef ();
525544
526545 auto metadata_snapshot = table->getInMemoryMetadataPtr ();
527546 auto query_sample_block = getSampleBlock (query, table, metadata_snapshot, getContext (), no_destination, allow_materialized);
528547
529- bool is_trivial_insert_select = false ;
530-
531- if (settings[Setting::optimize_trivial_insert_select])
532- {
533- const auto & select_query = query.select ->as <ASTSelectWithUnionQuery &>();
534- const auto & selects = select_query.list_of_selects ->children ;
535- const auto & union_modes = select_query.list_of_modes ;
536-
537- // / ASTSelectWithUnionQuery is not normalized now, so it may pass some queries which can be Trivial select queries
538- const auto mode_is_all = [](const auto & mode) { return mode == SelectUnionMode::UNION_ALL; };
539-
540- is_trivial_insert_select =
541- std::all_of (union_modes.begin (), union_modes.end (), std::move (mode_is_all))
542- && std::all_of (selects.begin (), selects.end (), isTrivialSelect);
543- }
544-
545- ContextPtr select_context = getContext ();
546-
547- if (is_trivial_insert_select)
548- {
549- /* * When doing trivial INSERT INTO ... SELECT ... FROM table,
550- * don't need to process SELECT with more than max_insert_threads
551- * and it's reasonable to set block size for SELECT to the desired block size for INSERT
552- * to avoid unnecessary squashing.
553- */
554-
555- Settings new_settings = select_context->getSettingsCopy ();
556-
557- new_settings[Setting::max_threads] = std::max<UInt64>(1 , settings[Setting::max_insert_threads]);
558-
559- if (table->prefersLargeBlocks ())
560- {
561- if (settings[Setting::min_insert_block_size_rows])
562- new_settings[Setting::max_block_size] = settings[Setting::min_insert_block_size_rows];
563- if (settings[Setting::min_insert_block_size_bytes])
564- new_settings[Setting::preferred_block_size_bytes] = settings[Setting::min_insert_block_size_bytes];
565- }
566-
567- auto context_for_trivial_select = Context::createCopy (context);
568- context_for_trivial_select->setSettings (new_settings);
569- context_for_trivial_select->setInsertionTable (getContext ()->getInsertionTable (), getContext ()->getInsertionTableColumnNames ());
570-
571- select_context = context_for_trivial_select;
572- }
573-
574- QueryPipelineBuilder pipeline;
575-
576- {
577- auto select_query_options = SelectQueryOptions (QueryProcessingStage::Complete, 1 );
578-
579- if (settings[Setting::allow_experimental_analyzer])
580- {
581- InterpreterSelectQueryAnalyzer interpreter_select_analyzer (query.select , select_context, select_query_options);
582- pipeline = interpreter_select_analyzer.buildQueryPipeline ();
583- }
584- else
585- {
586- InterpreterSelectWithUnionQuery interpreter_select (query.select , select_context, select_query_options);
587- pipeline = interpreter_select.buildQueryPipeline ();
588- }
589- }
590-
591548 pipeline.dropTotalsAndExtremes ();
592549
593550 // / Allow to insert Nullable into non-Nullable columns, NULL values will be added as defaults values.
@@ -744,6 +701,148 @@ QueryPipeline InterpreterInsertQuery::buildInsertSelectPipeline(ASTInsertQuery &
744701 return QueryPipelineBuilder::getPipeline (std::move (pipeline));
745702}
746703
704+ QueryPipeline InterpreterInsertQuery::buildInsertSelectPipeline (ASTInsertQuery & query, StoragePtr table)
705+ {
706+ const Settings & settings = getContext ()->getSettingsRef ();
707+
708+ bool is_trivial_insert_select = false ;
709+
710+ if (settings[Setting::optimize_trivial_insert_select])
711+ {
712+ const auto & select_query = query.select ->as <ASTSelectWithUnionQuery &>();
713+ const auto & selects = select_query.list_of_selects ->children ;
714+ const auto & union_modes = select_query.list_of_modes ;
715+
716+ // / ASTSelectWithUnionQuery is not normalized now, so it may pass some queries which can be Trivial select queries
717+ const auto mode_is_all = [](const auto & mode) { return mode == SelectUnionMode::UNION_ALL; };
718+
719+ is_trivial_insert_select =
720+ std::all_of (union_modes.begin (), union_modes.end (), std::move (mode_is_all))
721+ && std::all_of (selects.begin (), selects.end (), isTrivialSelect);
722+ }
723+
724+ ContextPtr select_context = getContext ();
725+
726+ if (is_trivial_insert_select)
727+ {
728+ /* * When doing trivial INSERT INTO ... SELECT ... FROM table,
729+ * don't need to process SELECT with more than max_insert_threads
730+ * and it's reasonable to set block size for SELECT to the desired block size for INSERT
731+ * to avoid unnecessary squashing.
732+ */
733+
734+ Settings new_settings = select_context->getSettingsCopy ();
735+
736+ new_settings[Setting::max_threads] = std::max<UInt64>(1 , settings[Setting::max_insert_threads]);
737+
738+ if (table->prefersLargeBlocks ())
739+ {
740+ if (settings[Setting::min_insert_block_size_rows])
741+ new_settings[Setting::max_block_size] = settings[Setting::min_insert_block_size_rows];
742+ if (settings[Setting::min_insert_block_size_bytes])
743+ new_settings[Setting::preferred_block_size_bytes] = settings[Setting::min_insert_block_size_bytes];
744+ }
745+
746+ auto context_for_trivial_select = Context::createCopy (context);
747+ context_for_trivial_select->setSettings (new_settings);
748+ context_for_trivial_select->setInsertionTable (getContext ()->getInsertionTable (), getContext ()->getInsertionTableColumnNames ());
749+
750+ select_context = context_for_trivial_select;
751+ }
752+
753+ QueryPipelineBuilder pipeline;
754+
755+ {
756+ auto select_query_options = SelectQueryOptions (QueryProcessingStage::Complete, 1 );
757+
758+ if (settings[Setting::allow_experimental_analyzer])
759+ {
760+ InterpreterSelectQueryAnalyzer interpreter_select_analyzer (query.select , select_context, select_query_options);
761+ pipeline = interpreter_select_analyzer.buildQueryPipeline ();
762+ }
763+ else
764+ {
765+ InterpreterSelectWithUnionQuery interpreter_select (query.select , select_context, select_query_options);
766+ pipeline = interpreter_select.buildQueryPipeline ();
767+ }
768+ }
769+
770+ return addInsertToSelectPipeline (query, table, pipeline);
771+ }
772+
773+ std::pair<QueryPipeline, ParallelReplicasReadingCoordinatorPtr>
774+ InterpreterInsertQuery::buildLocalInsertSelectPipelineForParallelReplicas (ASTInsertQuery & query, const StoragePtr & table)
775+ {
776+ ContextPtr context_ptr = getContext ();
777+
778+ const Settings & settings = context_ptr->getSettingsRef ();
779+
780+ /* * When doing trivial INSERT INTO ... SELECT ... FROM table,
781+ * don't need to process SELECT with more than max_insert_threads
782+ * and it's reasonable to set block size for SELECT to the desired block size for INSERT
783+ * to avoid unnecessary squashing.
784+ */
785+
786+ Settings new_settings = context_ptr->getSettingsCopy ();
787+
788+ new_settings[Setting::max_threads] = std::max<UInt64>(1 , settings[Setting::max_insert_threads] + 1 );
789+
790+ if (table->prefersLargeBlocks ())
791+ {
792+ if (settings[Setting::min_insert_block_size_rows])
793+ new_settings[Setting::max_block_size] = settings[Setting::min_insert_block_size_rows];
794+ if (settings[Setting::min_insert_block_size_bytes])
795+ new_settings[Setting::preferred_block_size_bytes] = settings[Setting::min_insert_block_size_bytes];
796+ }
797+
798+ auto context_for_trivial_select = Context::createCopy (context_ptr);
799+ context_for_trivial_select->setSettings (new_settings);
800+ context_for_trivial_select->setInsertionTable (context_ptr->getInsertionTable (), context_ptr->getInsertionTableColumnNames ());
801+
802+ auto [pipeline_builder, coordinator]
803+ = getLocalSelectPipelineForInserSelectWithParallelReplicas (query.select , context_for_trivial_select);
804+ auto local_pipeline = addInsertToSelectPipeline (query, table, pipeline_builder);
805+ return {std::move (local_pipeline), coordinator};
806+ }
807+
808+ std::optional<QueryPipeline> InterpreterInsertQuery::buildInsertSelectPipelineParallelReplicas (ASTInsertQuery & query, StoragePtr table)
809+ {
810+ auto context_ptr = getContext ();
811+ const Settings & settings = context_ptr->getSettingsRef ();
812+ if (!settings[Setting::allow_experimental_analyzer])
813+ return {};
814+
815+ if (!context_ptr->canUseParallelReplicasOnInitiator ())
816+ return {};
817+
818+ if (settings[Setting::parallel_distributed_insert_select] != 2 )
819+ return {};
820+
821+ const auto & select_query = query.select ->as <ASTSelectWithUnionQuery &>();
822+ const auto & selects = select_query.list_of_selects ->children ;
823+ if (selects.size () > 1 )
824+ return {};
825+
826+ if (!isTrivialSelect (selects.front ()))
827+ return {};
828+
829+ if (!ClusterProxy::isSuitableForParallelReplicas (selects.front (), context_ptr))
830+ return {};
831+
832+ LOG_TRACE (
833+ getLogger (" InterpreterInsertQuery" ),
834+ " Building distributed insert select pipeline with parallel replicas: table={}" ,
835+ query.getTable ());
836+
837+ if (settings[Setting::parallel_replicas_local_plan] && settings[Setting::parallel_replicas_insert_select_local_pipeline])
838+ {
839+ auto [local_pipeline, coordinator] = buildLocalInsertSelectPipelineForParallelReplicas (query, table);
840+ return ClusterProxy::executeInsertSelectWithParallelReplicas (query, context_ptr, std::move (local_pipeline), coordinator);
841+ }
842+
843+ return ClusterProxy::executeInsertSelectWithParallelReplicas (query, context_ptr);
844+ }
845+
747846
748847QueryPipeline InterpreterInsertQuery::buildInsertPipeline (ASTInsertQuery & query, StoragePtr table)
749848{
@@ -822,10 +921,11 @@ QueryPipeline InterpreterInsertQuery::buildInsertPipeline(ASTInsertQuery & query
822921
823922BlockIO InterpreterInsertQuery::execute ()
824923{
825- const Settings & settings = getContext ()->getSettingsRef ();
924+ auto context = getContext ();
925+ const Settings & settings = context->getSettingsRef ();
826926 auto & query = query_ptr->as <ASTInsertQuery &>();
827927
828- if (getContext () ->getServerSettings ()[ServerSetting::disable_insertion_and_mutation]
928+ if (context ->getServerSettings ()[ServerSetting::disable_insertion_and_mutation]
829929 && query.table_id .database_name != DatabaseCatalog::SYSTEM_DATABASE
830930 && query.table_id .database_name != DatabaseCatalog::TEMPORARY_DATABASE)
831931 {
@@ -840,15 +940,15 @@ BlockIO InterpreterInsertQuery::execute()
840940 if (query.partition_by && !table->supportsPartitionBy ())
841941 throw Exception (ErrorCodes::NOT_IMPLEMENTED, " PARTITION BY clause is not supported by storage" );
842942
843- auto table_lock = table->lockForShare (getContext () ->getInitialQueryId (), settings[Setting::lock_acquire_timeout]);
943+ auto table_lock = table->lockForShare (context ->getInitialQueryId (), settings[Setting::lock_acquire_timeout]);
844944
845945 auto metadata_snapshot = table->getInMemoryMetadataPtr ();
846- auto query_sample_block = getSampleBlock (query, table, metadata_snapshot, getContext () , no_destination, allow_materialized);
946+ auto query_sample_block = getSampleBlock (query, table, metadata_snapshot, context , no_destination, allow_materialized);
847947
848948 // / For table functions we check access while executing
849949 // / getTable() -> ITableFunction::execute().
850950 if (!query.table_function )
851- getContext () ->checkAccess (AccessType::INSERT, query.table_id , query_sample_block.getNames ());
951+ context ->checkAccess (AccessType::INSERT, query.table_id , query_sample_block.getNames ());
852952
853953 if (!allow_materialized)
854954 {
@@ -858,25 +958,24 @@ BlockIO InterpreterInsertQuery::execute()
858958 }
859959
860960 BlockIO res;
861-
862961 if (query.select )
863962 {
864963 if (settings[Setting::parallel_distributed_insert_select])
865964 {
866- auto distributed = table->distributedWrite (query, getContext () );
965+ auto distributed = table->distributedWrite (query, context );
867966 if (distributed)
868967 {
869968 res.pipeline = std::move (*distributed);
870969 }
871- else
970+ if (!res. pipeline . initialized () && context-> canUseParallelReplicasOnInitiator ())
872971 {
873- res.pipeline = buildInsertSelectPipeline (query, table);
972+ auto pipeline = buildInsertSelectPipelineParallelReplicas (query, table);
973+ if (pipeline)
974+ res.pipeline = std::move (*pipeline);
874975 }
875976 }
876- else
877- {
977+ if (!res.pipeline .initialized ())
878978 res.pipeline = buildInsertSelectPipeline (query, table);
879- }
880979 }
881980 else
882981 {
0 commit comments