1- #include < Interpreters/AsynchronousInsertQueue.h>
2- #include < Interpreters/Squashing.h>
3- #include < Parsers/ASTInsertQuery.h>
41#include < algorithm>
52#include < exception>
63#include < memory>
74#include < mutex>
8- #include < vector>
95#include < string_view>
10- #include < Poco/Net/NetException.h>
11- #include < Poco/Net/SocketAddress.h>
12- #include < Poco/Util/LayeredConfiguration.h>
13- #include < Common/CurrentThread.h>
14- #include < Common/Stopwatch.h>
15- #include < Common/NetException.h>
16- #include < Common/setThreadName.h>
17- #include < Common/OpenSSLHelpers.h>
18- #include < IO/Progress.h>
6+ #include < vector>
7+ #include < Access/AccessControl.h>
8+ #include < Access/Credentials.h>
199#include < Compression/CompressedReadBuffer.h>
2010#include < Compression/CompressedWriteBuffer.h>
21- #include < IO/ReadBufferFromPocoSocket.h>
22- #include < IO/WriteBufferFromPocoSocket.h>
11+ #include < Compression/CompressionFactory.h>
12+ #include < Core/ExternalTable.h>
13+ #include < Core/ServerSettings.h>
14+ #include < Formats/NativeReader.h>
15+ #include < Formats/NativeWriter.h>
2316#include < IO/LimitReadBuffer.h>
17+ #include < IO/Progress.h>
18+ #include < IO/ReadBufferFromPocoSocket.h>
2419#include < IO/ReadHelpers.h>
20+ #include < IO/WriteBufferFromPocoSocket.h>
2521#include < IO/WriteHelpers.h>
26- #include < Formats/NativeReader.h>
27- #include < Formats/NativeWriter.h>
28- #include < Interpreters/executeQuery.h>
29- #include < Interpreters/TablesStatus.h>
22+ #include < Interpreters/AsynchronousInsertQueue.h>
3023#include < Interpreters/InternalTextLogsQueue.h>
3124#include < Interpreters/OpenTelemetrySpanLog.h>
3225#include < Interpreters/Session.h>
26+ #include < Interpreters/Squashing.h>
27+ #include < Interpreters/TablesStatus.h>
28+ #include < Interpreters/executeQuery.h>
29+ #include < Parsers/ASTInsertQuery.h>
3330#include < Server/TCPServer.h>
34- #include < Storages/StorageReplicatedMergeTree.h>
3531#include < Storages/MergeTree/MergeTreeDataPartUUID.h>
3632#include < Storages/ObjectStorage/StorageObjectStorageCluster.h>
37- #include < Core/ExternalTable.h>
38- #include < Core/ServerSettings.h>
39- #include < Access/AccessControl.h>
40- #include < Access/Credentials.h>
41- #include < Compression/CompressionFactory.h>
42- #include < Common/logger_useful.h>
33+ #include < Storages/StorageReplicatedMergeTree.h>
34+ #include < Poco/Net/NetException.h>
35+ #include < Poco/Net/SocketAddress.h>
36+ #include < Poco/Util/LayeredConfiguration.h>
4337#include < Common/CurrentMetrics.h>
38+ #include < Common/CurrentThread.h>
39+ #include < Common/NetException.h>
40+ #include < Common/OpenSSLHelpers.h>
41+ #include < Common/Stopwatch.h>
42+ #include < Common/logger_useful.h>
43+ #include < Common/scope_guard_safe.h>
44+ #include < Common/setThreadName.h>
4445#include < Common/thread_local_rng.h>
45- #include < fmt/format.h>
4646
4747#include < Processors/Executors/PullingAsyncPipelineExecutor.h>
4848#include < Processors/Executors/PushingPipelineExecutor.h>
6161
6262#include < Common/config_version.h>
6363
64+ #include < fmt/format.h>
65+
6466using namespace std ::literals;
6567using namespace DB ;
6668
@@ -1036,6 +1038,17 @@ void TCPHandler::processOrdinaryQuery()
10361038 PullingAsyncPipelineExecutor executor (pipeline);
10371039 CurrentMetrics::Increment query_thread_metric_increment{CurrentMetrics::QueryThread};
10381040
1041+ // / The following may happen:
1042+ // / * current thread is holding the lock
1043+ // / * because of the exception we unwind the stack and call the destructor of `executor`
1044+ // / * the destructor calls cancel() and waits for all query threads to finish
1045+ // / * at the same time one of the query threads is trying to acquire the lock, e.g. inside `merge_tree_read_task_callback`
1046+ // / * deadlock
1047+ SCOPE_EXIT ({
1048+ if (out_lock.owns_lock ())
1049+ out_lock.unlock ();
1050+ });
1051+
10391052 Block block;
10401053 while (executor.pull (block, interactive_delay / 1000 ))
10411054 {
@@ -1079,8 +1092,7 @@ void TCPHandler::processOrdinaryQuery()
10791092 }
10801093
10811094 // / This lock wasn't acquired before and we make .lock() call here
1082- // / so everything under this line is covered even together
1083- // / with sendProgress() out of the scope
1095+ // / so everything under this line is covered.
10841096 out_lock.lock ();
10851097
10861098 /* * If data has run out, we will send the profiling data and total values to
@@ -1107,6 +1119,7 @@ void TCPHandler::processOrdinaryQuery()
11071119 last_sent_snapshots.clear ();
11081120 }
11091121
1122+ out_lock.lock ();
11101123 sendProgress ();
11111124}
11121125
0 commit comments