File tree Expand file tree Collapse file tree 4 files changed +9
-5
lines changed
Expand file tree Collapse file tree 4 files changed +9
-5
lines changed Original file line number Diff line number Diff line change @@ -93,8 +93,11 @@ ISource::Status RemoteSource::prepare()
9393 if (is_async_state)
9494 return Status::Async;
9595
96- if (executor_finished)
96+ if (query_executor->isFinished ())
97+ {
98+ getPort ().finish ();
9799 return Status::Finished;
100+ }
98101
99102 Status status = ISource::prepare ();
100103 // / To avoid resetting the connection (because of "unfinished" query) in the
@@ -126,7 +129,6 @@ void RemoteSource::work()
126129 if (need_drain)
127130 {
128131 query_executor->finish ();
129- executor_finished = true ;
130132 return ;
131133 }
132134
@@ -248,9 +250,7 @@ void RemoteSource::onCancel() noexcept
248250void RemoteSource::onUpdatePorts ()
249251{
250252 if (getPort ().isFinished ())
251- {
252253 query_executor->finish ();
253- }
254254}
255255
256256
Original file line number Diff line number Diff line change @@ -48,7 +48,6 @@ class RemoteSource final : public ISource
4848private:
4949 std::atomic_bool was_query_sent = false ;
5050 bool need_drain = false ;
51- bool executor_finished = false ;
5251 bool add_aggregation_info = false ;
5352 RemoteQueryExecutorPtr query_executor;
5453 RowsBeforeStepCounterPtr rows_before_limit;
Original file line number Diff line number Diff line change @@ -778,6 +778,9 @@ void RemoteQueryExecutor::finish()
778778{
779779 LockAndBlocker guard (was_cancelled_mutex);
780780
781+ // / To make sure finish is only called once
782+ SCOPE_EXIT ({ finished = true ; });
783+
781784 /* * If one of:
782785 * - nothing started to do;
783786 * - received all packets before EndOfStream;
Original file line number Diff line number Diff line change @@ -221,6 +221,8 @@ class RemoteQueryExecutor
221221 // / return true if parallel replica packet was processed
222222 bool processParallelReplicaPacketIfAny ();
223223
224+ bool isFinished () const { return finished; }
225+
224226private:
225227 RemoteQueryExecutor (
226228 const String & query_,
You can’t perform that action at this time.
0 commit comments