Skip to content

Commit ea61af9

Browse files
committed
impl
1 parent ab43fe9 commit ea61af9

File tree

1 file changed

+42
-29
lines changed

1 file changed

+42
-29
lines changed

src/Server/TCPHandler.cpp

Lines changed: 42 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,48 +1,48 @@
1-
#include <Interpreters/AsynchronousInsertQueue.h>
2-
#include <Interpreters/Squashing.h>
3-
#include <Parsers/ASTInsertQuery.h>
41
#include <algorithm>
52
#include <exception>
63
#include <memory>
74
#include <mutex>
8-
#include <vector>
95
#include <string_view>
10-
#include <Poco/Net/NetException.h>
11-
#include <Poco/Net/SocketAddress.h>
12-
#include <Poco/Util/LayeredConfiguration.h>
13-
#include <Common/CurrentThread.h>
14-
#include <Common/Stopwatch.h>
15-
#include <Common/NetException.h>
16-
#include <Common/setThreadName.h>
17-
#include <Common/OpenSSLHelpers.h>
18-
#include <IO/Progress.h>
6+
#include <vector>
7+
#include <Access/AccessControl.h>
8+
#include <Access/Credentials.h>
199
#include <Compression/CompressedReadBuffer.h>
2010
#include <Compression/CompressedWriteBuffer.h>
21-
#include <IO/ReadBufferFromPocoSocket.h>
22-
#include <IO/WriteBufferFromPocoSocket.h>
11+
#include <Compression/CompressionFactory.h>
12+
#include <Core/ExternalTable.h>
13+
#include <Core/ServerSettings.h>
14+
#include <Formats/NativeReader.h>
15+
#include <Formats/NativeWriter.h>
2316
#include <IO/LimitReadBuffer.h>
17+
#include <IO/Progress.h>
18+
#include <IO/ReadBufferFromPocoSocket.h>
2419
#include <IO/ReadHelpers.h>
20+
#include <IO/WriteBufferFromPocoSocket.h>
2521
#include <IO/WriteHelpers.h>
26-
#include <Formats/NativeReader.h>
27-
#include <Formats/NativeWriter.h>
28-
#include <Interpreters/executeQuery.h>
29-
#include <Interpreters/TablesStatus.h>
22+
#include <Interpreters/AsynchronousInsertQueue.h>
3023
#include <Interpreters/InternalTextLogsQueue.h>
3124
#include <Interpreters/OpenTelemetrySpanLog.h>
3225
#include <Interpreters/Session.h>
26+
#include <Interpreters/Squashing.h>
27+
#include <Interpreters/TablesStatus.h>
28+
#include <Interpreters/executeQuery.h>
29+
#include <Parsers/ASTInsertQuery.h>
3330
#include <Server/TCPServer.h>
34-
#include <Storages/StorageReplicatedMergeTree.h>
3531
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
3632
#include <Storages/ObjectStorage/StorageObjectStorageCluster.h>
37-
#include <Core/ExternalTable.h>
38-
#include <Core/ServerSettings.h>
39-
#include <Access/AccessControl.h>
40-
#include <Access/Credentials.h>
41-
#include <Compression/CompressionFactory.h>
42-
#include <Common/logger_useful.h>
33+
#include <Storages/StorageReplicatedMergeTree.h>
34+
#include <Poco/Net/NetException.h>
35+
#include <Poco/Net/SocketAddress.h>
36+
#include <Poco/Util/LayeredConfiguration.h>
4337
#include <Common/CurrentMetrics.h>
38+
#include <Common/CurrentThread.h>
39+
#include <Common/NetException.h>
40+
#include <Common/OpenSSLHelpers.h>
41+
#include <Common/Stopwatch.h>
42+
#include <Common/logger_useful.h>
43+
#include <Common/scope_guard_safe.h>
44+
#include <Common/setThreadName.h>
4445
#include <Common/thread_local_rng.h>
45-
#include <fmt/format.h>
4646

4747
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
4848
#include <Processors/Executors/PushingPipelineExecutor.h>
@@ -61,6 +61,8 @@
6161

6262
#include <Common/config_version.h>
6363

64+
#include <fmt/format.h>
65+
6466
using namespace std::literals;
6567
using namespace DB;
6668

@@ -1036,6 +1038,17 @@ void TCPHandler::processOrdinaryQuery()
10361038
PullingAsyncPipelineExecutor executor(pipeline);
10371039
CurrentMetrics::Increment query_thread_metric_increment{CurrentMetrics::QueryThread};
10381040

1041+
/// The following may happen:
1042+
/// * current thread is holding the lock
1043+
/// * because of the exception we unwind the stack and call the destructor of `executor`
1044+
/// * the destructor calls cancel() and waits for all query threads to finish
1045+
/// * at the same time one of the query threads is trying to acquire the lock, e.g. inside `merge_tree_read_task_callback`
1046+
/// * deadlock
1047+
SCOPE_EXIT({
1048+
if (out_lock.owns_lock())
1049+
out_lock.unlock();
1050+
});
1051+
10391052
Block block;
10401053
while (executor.pull(block, interactive_delay / 1000))
10411054
{
@@ -1079,8 +1092,7 @@ void TCPHandler::processOrdinaryQuery()
10791092
}
10801093

10811094
/// This lock wasn't acquired before and we make .lock() call here
1082-
/// so everything under this line is covered even together
1083-
/// with sendProgress() out of the scope
1095+
/// so everything under this line is covered.
10841096
out_lock.lock();
10851097

10861098
/** If data has run out, we will send the profiling data and total values to
@@ -1107,6 +1119,7 @@ void TCPHandler::processOrdinaryQuery()
11071119
last_sent_snapshots.clear();
11081120
}
11091121

1122+
out_lock.lock();
11101123
sendProgress();
11111124
}
11121125

0 commit comments

Comments
 (0)