Skip to content

Commit 95e1ecd

Browse files
nerve-botclaude
andcommitted
Fix ReadBuffer canceled assertion in TCPHandler idle loop
TCPHandler::runImpl() idle loop called in->eof() on a ReadBuffer that had been canceled by a previous query's pipeline callback. In debug and sanitizer builds, ReadBuffer::next() has chassert(!isCanceled()) which aborts the server process. This was observed as recurring "Logical error: ReadBuffer is canceled" crashes in stress tests (amd_tsan, amd_msan, amd_debug) — STID 2508-2913, 2508-25af. Add ReadBuffer::eofOrCanceled() which checks isCanceled() before eof(), preventing the assertion. TCPHandler idle loop now uses eofOrCanceled(). Also add explicit rethrowExceptionIfHas() after cancel() in PullingAsyncPipelineExecutor::pull() as a defensive measure for the is_execution_finished path. Co-Authored-By: Claude Opus 4.6 <[email protected]>
1 parent 5db5478 commit 95e1ecd

File tree

4 files changed

+98
-3
lines changed

4 files changed

+98
-3
lines changed

src/IO/ReadBuffer.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,17 @@ class ReadBuffer : public BufferBase
8181
return !hasPendingData() && !next();
8282
}
8383

84+
/// Safe EOF check that handles canceled buffers. Returns true if the buffer
85+
/// is canceled OR at EOF. Use this instead of eof() when the buffer might
86+
/// have been canceled by a failed read (e.g., a broken TCP connection
87+
/// detected inside a pipeline callback). Calling eof() directly on a
88+
/// canceled buffer triggers chassert(!isCanceled()) in debug/sanitizer
89+
/// builds — see ReadBuffer::next().
90+
bool ALWAYS_INLINE eofOrCanceled()
91+
{
92+
return isCanceled() || eof();
93+
}
94+
8495
void ignore()
8596
{
8697
if (!eof())
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
#include <gtest/gtest.h>
2+
3+
#include <string>
4+
#include <IO/ReadBuffer.h>
5+
#include <IO/ReadBufferFromString.h>
6+
7+
/// Regression test for the "ReadBuffer is canceled" crash (STID 2508-2913).
8+
///
9+
/// Bug: TCPHandler::runImpl() idle loop called in->eof() on a ReadBuffer that
10+
/// had been canceled by a previous query's pipeline callback. In debug/sanitizer
11+
/// builds, ReadBuffer::next() has chassert(!isCanceled()) which aborts the
12+
/// server process.
13+
///
14+
/// Fix: Added ReadBuffer::eofOrCanceled() which checks isCanceled() BEFORE
15+
/// calling eof(). TCPHandler now uses eofOrCanceled() in the idle loop.
16+
17+
using namespace DB;
18+
19+
/// This test calls the PRODUCTION method ReadBuffer::eofOrCanceled() on a
20+
/// canceled buffer. Without the isCanceled() check inside eofOrCanceled(),
21+
/// eof() would be called on the canceled buffer, triggering
22+
/// chassert(!isCanceled()) → abort → test FAILS (crash).
23+
///
24+
/// With the fix (isCanceled() short-circuits before eof()), the test PASSES.
25+
TEST(ReadBufferCancelTest, EofOrCanceledOnCanceledBuffer)
26+
{
27+
std::string data = "some data in the buffer";
28+
ReadBufferFromString buf(data);
29+
30+
/// Consume all pending data so hasPendingData() returns false.
31+
/// This simulates a TCP ReadBuffer where all received bytes have been
32+
/// processed and eof() would need to call next() to check for more.
33+
buf.position() = buf.buffer().end();
34+
35+
/// Simulate what happens when a callback's read from the TCP buffer
36+
/// fails: ReadBuffer::next() catches the exception from nextImpl()
37+
/// and calls cancel(), setting canceled = true.
38+
buf.cancel();
39+
40+
/// This is the exact method called by TCPHandler::runImpl() idle loop.
41+
/// If eofOrCanceled() doesn't check isCanceled() first, this crashes
42+
/// with "ReadBuffer is canceled" in debug/sanitizer builds.
43+
EXPECT_TRUE(buf.eofOrCanceled());
44+
}
45+
46+
/// Verify eofOrCanceled() behaves like eof() on a non-canceled buffer.
47+
TEST(ReadBufferCancelTest, EofOrCanceledOnNormalBuffer)
48+
{
49+
std::string data = "test data";
50+
ReadBufferFromString buf(data);
51+
52+
/// Buffer has pending data — not at EOF
53+
EXPECT_FALSE(buf.eofOrCanceled());
54+
55+
/// Consume all data
56+
buf.position() = buf.buffer().end();
57+
58+
/// Now at EOF (no more data, not canceled)
59+
EXPECT_TRUE(buf.eofOrCanceled());
60+
}
61+
62+
/// Verify that cancel() is persistent and isCanceled() reflects the state.
63+
TEST(ReadBufferCancelTest, CancelIsPersistent)
64+
{
65+
std::string data = "test";
66+
ReadBufferFromString buf(data);
67+
68+
EXPECT_FALSE(buf.isCanceled());
69+
buf.cancel();
70+
EXPECT_TRUE(buf.isCanceled());
71+
72+
/// Cancel is permanent — by design, a canceled TCP connection cannot be reused
73+
EXPECT_TRUE(buf.isCanceled());
74+
}

src/Processors/Executors/PullingAsyncPipelineExecutor.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,13 @@ bool PullingAsyncPipelineExecutor::pull(Chunk & chunk, uint64_t milliseconds)
115115
{
116116
/// If lazy format is finished, we don't cancel pipeline but wait for main thread to be finished.
117117
data->is_finished = true;
118-
/// Wait thread and rethrow exception if any.
118+
/// Wait for the pipeline thread to finish.
119119
cancel();
120+
/// Rethrow any exception from the pipeline thread. Without this check,
121+
/// exceptions from pipeline callbacks (e.g. ClusterFunctionReadTaskCallback
122+
/// canceling the TCP ReadBuffer) would be silently lost when pull() returns
123+
/// false, causing the caller to believe the query succeeded.
124+
data->rethrowExceptionIfHas();
120125
return false;
121126
}
122127

src/Server/TCPHandler.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -492,9 +492,14 @@ void TCPHandler::runImpl()
492492
}
493493

494494
/// If we need to shut down, or client disconnects.
495-
if (!tcp_server.isOpen() || server.isCancelled() || in->eof())
495+
/// Use eofOrCanceled() instead of eof() as a safety net: if a previous
496+
/// query's pipeline callback canceled the ReadBuffer but the exception
497+
/// was not propagated, we must close the connection rather than hitting
498+
/// chassert(!isCanceled()) inside eof().
499+
if (!tcp_server.isOpen() || server.isCancelled() || in->eofOrCanceled())
496500
{
497-
LOG_TEST(log, "Closing connection (open: {}, cancelled: {}, eof: {})", tcp_server.isOpen(), server.isCancelled(), in->eof());
501+
LOG_TEST(log, "Closing connection (open: {}, cancelled: {}, eof_or_canceled: {})",
502+
tcp_server.isOpen(), server.isCancelled(), in->eofOrCanceled());
498503
return;
499504
}
500505
}

0 commit comments

Comments
 (0)