Skip to content

Commit 817dca9

Browse files
Backport #92807 to 25.12: Fix crash when receiving from disconnected Connection
1 parent e1a72d7 commit 817dca9

File tree

4 files changed

+9
-5
lines changed

4 files changed

+9
-5
lines changed

src/Processors/Sources/RemoteSource.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,11 @@ ISource::Status RemoteSource::prepare()
9393
if (is_async_state)
9494
return Status::Async;
9595

96-
if (executor_finished)
96+
if (query_executor->isFinished())
97+
{
98+
getPort().finish();
9799
return Status::Finished;
100+
}
98101

99102
Status status = ISource::prepare();
100103
/// To avoid resetting the connection (because of "unfinished" query) in the
@@ -126,7 +129,6 @@ void RemoteSource::work()
126129
if (need_drain)
127130
{
128131
query_executor->finish();
129-
executor_finished = true;
130132
return;
131133
}
132134

@@ -248,9 +250,7 @@ void RemoteSource::onCancel() noexcept
248250
void RemoteSource::onUpdatePorts()
249251
{
250252
if (getPort().isFinished())
251-
{
252253
query_executor->finish();
253-
}
254254
}
255255

256256

src/Processors/Sources/RemoteSource.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ class RemoteSource final : public ISource
4848
private:
4949
std::atomic_bool was_query_sent = false;
5050
bool need_drain = false;
51-
bool executor_finished = false;
5251
bool add_aggregation_info = false;
5352
RemoteQueryExecutorPtr query_executor;
5453
RowsBeforeStepCounterPtr rows_before_limit;

src/QueryPipeline/RemoteQueryExecutor.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -778,6 +778,9 @@ void RemoteQueryExecutor::finish()
778778
{
779779
LockAndBlocker guard(was_cancelled_mutex);
780780

781+
/// To make sure finish is only called once
782+
SCOPE_EXIT({ finished = true; });
783+
781784
/** If one of:
782785
* - nothing started to do;
783786
* - received all packets before EndOfStream;

src/QueryPipeline/RemoteQueryExecutor.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,8 @@ class RemoteQueryExecutor
221221
/// return true if parallel replica packet was processed
222222
bool processParallelReplicaPacketIfAny();
223223

224+
bool isFinished() const { return finished; }
225+
224226
private:
225227
RemoteQueryExecutor(
226228
const String & query_,

0 commit comments

Comments
 (0)