Cherry pick #17006 to 20.8: Fix Distributed queries finishing (avoid connection resets)#17118
Merged
KochetovNicolai merged 6 commits intobackport/20.8/17006from Nov 19, 2020
Conversation
With optimize_distributed_group_by_sharding_key it is possible to get enough rows even before sending query to another shard and in this case it shouldn't send anything to others. This reduces NETWORK_ERROR in the 01563_distributed_query_finish from 2 to 1.
This patch fixes the 01563_distributed_query_finish test.
Since onCancel() and generate() can be called from different threads
TSAN reports:
WARNING: ThreadSanitizer: data race (pid=253)
Write of size 1 at 0x7b50008c25c2 by thread T144 (mutexes: write M643587328754916744):
#0 DB::RemoteSource::onCancel() /build/obj-x86_64-linux-gnu/../src/Processors/Sources/RemoteSource.cpp:79:24 (clickhouse+0x11c2019d)
#1 DB::IProcessor::cancel() /build/obj-x86_64-linux-gnu/../src/Processors/IProcessor.h:235:9 (clickhouse+0x11c21def)
#2 DB::RemoteSource::onUpdatePorts() /build/obj-x86_64-linux-gnu/../src/Processors/Sources/RemoteSource.h:32:13 (clickhouse+0x11c21def)
#3 DB::PipelineExecutor::tryAddProcessorToStackIfUpdated(DB::ExecutingGraph::Edge&, std::__1::queue<DB::ExecutingGraph::Node*, std::__1::deque<DB::ExecutingGraph::Node*, std::__1::allocator<DB::ExecutingGraph::Node*> > >&, unsigned long) /build/obj-x86_64-linux-gnu/../src/Processors/Executors/PipelineExecutor.cpp:190:43 (clickhouse+0x11aaf954)
#4 DB::PipelineExecutor::prepareProcessor(unsigned long, unsigned long, std::__1::queue<DB::ExecutingGraph::Node*, std::__1::deque<DB::ExecutingGraph::Node*, std::__1::allocator<DB::ExecutingGraph::Node*> > >&, std::__1::unique_lock<std::__1::mutex>) /build/obj-x86_64-linux-gnu/../src/Processors/Executors/PipelineExecutor.cpp:296:18 (clickhouse+0x11ab0169)
#5 DB::PipelineExecutor::tryAddProcessorToStackIfUpdated(DB::ExecutingGraph::Edge&, std::__1::queue<DB::ExecutingGraph::Node*, std::__1::deque<DB::ExecutingGraph::Node*, std::__1::allocator<DB::ExecutingGraph::Node*> > >&, unsigned long) /build/obj-x86_64-linux-gnu/../src/Processors/Executors/PipelineExecutor.cpp:187:16 (clickhouse+0x11aaf9c2)
#6 DB::PipelineExecutor::prepareProcessor(unsigned long, unsigned long, std::__1::queue<DB::ExecutingGraph::Node*, std::__1::deque<DB::ExecutingGraph::Node*, std::__1::allocator<DB::ExecutingGraph::Node*> > >&, std::__1::unique_lock<std::__1::mutex>) /build/obj-x86_64-linux-gnu/../src/Processors/Executors/PipelineExecutor.cpp:296:18 (clickhouse+0x11ab0169)
#7 DB::PipelineExecutor::executeStepImpl(unsigned long, unsigned long, std::__1::atomic<bool>*) /build/obj-x86_64-linux-gnu/../src/Processors/Executors/PipelineExecutor.cpp:589:26 (clickhouse+0x11ab2a3e)
#8 DB::PipelineExecutor::executeSingleThread(unsigned long, unsigned long) /build/obj-x86_64-linux-gnu/../src/Processors/Executors/PipelineExecutor.cpp:477:5 (clickhouse+0x11ab538b)
#9 DB::PipelineExecutor::executeImpl(unsigned long)::$_4::operator()() const /build/obj-x86_64-linux-gnu/../src/Processors/Executors/PipelineExecutor.cpp:736:21 (clickhouse+0x11ab538b)
#10 decltype(std::__1::forward<DB::PipelineExecutor::executeImpl(unsigned long)::$_4&>(fp)()) std::__1::__invoke_constexpr<DB::PipelineExecutor::executeImpl(unsigned long)::$_4&>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/type_traits:3525:1 (clickhouse+0x11ab538b)
#11 decltype(auto) std::__1::__apply_tuple_impl<DB::PipelineExecutor::executeImpl(unsigned long)::$_4&, std::__1::tuple<>&>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&, std::__1::tuple<>&, std::__1::__tuple_indices<>) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/tuple:1415:1 (clickhouse+0x11ab538b)
#12 decltype(auto) std::__1::apply<DB::PipelineExecutor::executeImpl(unsigned long)::$_4&, std::__1::tuple<>&>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&, std::__1::tuple<>&) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/tuple:1424:1 (clickhouse+0x11ab538b)
#13 ThreadFromGlobalPool::ThreadFromGlobalPool<DB::PipelineExecutor::executeImpl(unsigned long)::$_4>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&&)::'lambda'()::operator()() /build/obj-x86_64-linux-gnu/../src/Common/ThreadPool.h:178:13 (clickhouse+0x11ab538b)
#14 decltype(std::__1::forward<DB::PipelineExecutor::executeImpl(unsigned long)::$_4>(fp)()) std::__1::__invoke<ThreadFromGlobalPool::ThreadFromGlobalPool<DB::PipelineExecutor::executeImpl(unsigned long)::$_4>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&&)::'lambda'()&>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&&) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/type_traits:3519:1 (clickhouse+0x11ab538b)
#15 void std::__1::__invoke_void_return_wrapper<void>::__call<ThreadFromGlobalPool::ThreadFromGlobalPool<DB::PipelineExecutor::executeImpl(unsigned long)::$_4>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&&)::'lambda'()&>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&&...) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/__functional_base:348:9 (clickhouse+0x11ab538b)
#16 std::__1::__function::__alloc_func<ThreadFromGlobalPool::ThreadFromGlobalPool<DB::PipelineExecutor::executeImpl(unsigned long)::$_4>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&&)::'lambda'(), std::__1::allocator<ThreadFromGlobalPool::ThreadFromGlobalPool<DB::PipelineExecutor::executeImpl(unsigned long)::$_4>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&&)::'lambda'()>, void ()>::operator()() /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/functional:1540:16 (clickhouse+0x11ab538b)
#17 std::__1::__function::__func<ThreadFromGlobalPool::ThreadFromGlobalPool<DB::PipelineExecutor::executeImpl(unsigned long)::$_4>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&&)::'lambda'(), std::__1::allocator<ThreadFromGlobalPool::ThreadFromGlobalPool<DB::PipelineExecutor::executeImpl(unsigned long)::$_4>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&&)::'lambda'()>, void ()>::operator()() /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/functional:1714:12 (clickhouse+0x11ab538b)
#18 std::__1::__function::__value_func<void ()>::operator()() const /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/functional:1867:16 (clickhouse+0x8346263)
#19 std::__1::function<void ()>::operator()() const /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/functional:2473:12 (clickhouse+0x8346263)
#20 ThreadPoolImpl<std::__1::thread>::worker(std::__1::__list_iterator<std::__1::thread, void*>) /build/obj-x86_64-linux-gnu/../src/Common/ThreadPool.cpp:236:17 (clickhouse+0x8346263)
#21 void ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda1'()::operator()() const /build/obj-x86_64-linux-gnu/../src/Common/ThreadPool.cpp:117:73 (clickhouse+0x8349ea8)
#22 decltype(std::__1::forward<void>(fp)(std::__1::forward<void ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda1'()>(fp0)...)) std::__1::__invoke<void ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda1'()>(void&&, void ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda1'()&&...) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/type_traits:3519:1 (clickhouse+0x8349ea8)
#23 void std::__1::__thread_execute<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, void ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda1'()>(std::__1::tuple<void, void ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda1'()>&, std::__1::__tuple_indices<>) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/thread:273:5 (clickhouse+0x8349ea8)
#24 void* std::__1::__thread_proxy<std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, void ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda1'()> >(void*) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/thread:284:5 (clickhouse+0x8349ea8)
Previous read of size 1 at 0x7b50008c25c2 by thread T91:
#0 DB::RemoteSource::generate() /build/obj-x86_64-linux-gnu/../src/Processors/Sources/RemoteSource.cpp:35:9 (clickhouse+0x11c1fb9e)
#1 DB::ISource::work() /build/obj-x86_64-linux-gnu/../src/Processors/ISource.cpp:48:31 (clickhouse+0x11a6c852)
#2 DB::SourceWithProgress::work() /build/obj-x86_64-linux-gnu/../src/Processors/Sources/SourceWithProgress.cpp:36:30 (clickhouse+0x11c26d1a)
#3 DB::executeJob(DB::IProcessor*) /build/obj-x86_64-linux-gnu/../src/Processors/Executors/PipelineExecutor.cpp:78:20 (clickhouse+0x11ab4836)
#4 DB::PipelineExecutor::addJob(DB::ExecutingGraph::Node*)::$_0::operator()() const /build/obj-x86_64-linux-gnu/../src/Processors/Executors/PipelineExecutor.cpp:95:13 (clickhouse+0x11ab4836)
#5 decltype(std::__1::forward<DB::PipelineExecutor::addJob(DB::ExecutingGraph::Node*)::$_0&>(fp)()) std::__1::__invoke<DB::PipelineExecutor::addJob(DB::ExecutingGraph::Node*)::$_0&>(DB::PipelineExecutor::addJob(DB::ExecutingGraph::Node*)::$_0&) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/type_traits:3519:1 (clickhouse+0x11ab4836)
#6 void std::__1::__invoke_void_return_wrapper<void>::__call<DB::PipelineExecutor::addJob(DB::ExecutingGraph::Node*)::$_0&>(DB::PipelineExecutor::addJob(DB::ExecutingGraph::Node*)::$_0&) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/__functional_base:348:9 (clickhouse+0x11ab4836)
#7 std::__1::__function::__alloc_func<DB::PipelineExecutor::addJob(DB::ExecutingGraph::Node*)::$_0, std::__1::allocator<DB::PipelineExecutor::addJob(DB::ExecutingGraph::Node*)::$_0>, void ()>::operator()() /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/functional:1540:16 (clickhouse+0x11ab4836)
#8 std::__1::__function::__func<DB::PipelineExecutor::addJob(DB::ExecutingGraph::Node*)::$_0, std::__1::allocator<DB::PipelineExecutor::addJob(DB::ExecutingGraph::Node*)::$_0>, void ()>::operator()() /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/functional:1714:12 (clickhouse+0x11ab4836)
#9 std::__1::__function::__value_func<void ()>::operator()() const /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/functional:1867:16 (clickhouse+0x11ab2801)
#10 std::__1::function<void ()>::operator()() const /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/functional:2473:12 (clickhouse+0x11ab2801)
#11 DB::PipelineExecutor::executeStepImpl(unsigned long, unsigned long, std::__1::atomic<bool>*) /build/obj-x86_64-linux-gnu/../src/Processors/Executors/PipelineExecutor.cpp:561:17 (clickhouse+0x11ab2801)
#12 DB::PipelineExecutor::executeSingleThread(unsigned long, unsigned long) /build/obj-x86_64-linux-gnu/../src/Processors/Executors/PipelineExecutor.cpp:477:5 (clickhouse+0x11ab538b)
#13 DB::PipelineExecutor::executeImpl(unsigned long)::$_4::operator()() const /build/obj-x86_64-linux-gnu/../src/Processors/Executors/PipelineExecutor.cpp:736:21 (clickhouse+0x11ab538b)
#14 decltype(std::__1::forward<DB::PipelineExecutor::executeImpl(unsigned long)::$_4&>(fp)()) std::__1::__invoke_constexpr<DB::PipelineExecutor::executeImpl(unsigned long)::$_4&>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/type_traits:3525:1 (clickhouse+0x11ab538b)
#15 decltype(auto) std::__1::__apply_tuple_impl<DB::PipelineExecutor::executeImpl(unsigned long)::$_4&, std::__1::tuple<>&>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&, std::__1::tuple<>&, std::__1::__tuple_indices<>) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/tuple:1415:1 (clickhouse+0x11ab538b)
#16 decltype(auto) std::__1::apply<DB::PipelineExecutor::executeImpl(unsigned long)::$_4&, std::__1::tuple<>&>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&, std::__1::tuple<>&) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/tuple:1424:1 (clickhouse+0x11ab538b)
#17 ThreadFromGlobalPool::ThreadFromGlobalPool<DB::PipelineExecutor::executeImpl(unsigned long)::$_4>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&&)::'lambda'()::operator()() /build/obj-x86_64-linux-gnu/../src/Common/ThreadPool.h:178:13 (clickhouse+0x11ab538b)
#18 decltype(std::__1::forward<DB::PipelineExecutor::executeImpl(unsigned long)::$_4>(fp)()) std::__1::__invoke<ThreadFromGlobalPool::ThreadFromGlobalPool<DB::PipelineExecutor::executeImpl(unsigned long)::$_4>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&&)::'lambda'()&>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&&) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/type_traits:3519:1 (clickhouse+0x11ab538b)
#19 void std::__1::__invoke_void_return_wrapper<void>::__call<ThreadFromGlobalPool::ThreadFromGlobalPool<DB::PipelineExecutor::executeImpl(unsigned long)::$_4>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&&)::'lambda'()&>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&&...) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/__functional_base:348:9 (clickhouse+0x11ab538b)
#20 std::__1::__function::__alloc_func<ThreadFromGlobalPool::ThreadFromGlobalPool<DB::PipelineExecutor::executeImpl(unsigned long)::$_4>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&&)::'lambda'(), std::__1::allocator<ThreadFromGlobalPool::ThreadFromGlobalPool<DB::PipelineExecutor::executeImpl(unsigned long)::$_4>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&&)::'lambda'()>, void ()>::operator()() /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/functional:1540:16 (clickhouse+0x11ab538b)
#21 std::__1::__function::__func<ThreadFromGlobalPool::ThreadFromGlobalPool<DB::PipelineExecutor::executeImpl(unsigned long)::$_4>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&&)::'lambda'(), std::__1::allocator<ThreadFromGlobalPool::ThreadFromGlobalPool<DB::PipelineExecutor::executeImpl(unsigned long)::$_4>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&&)::'lambda'()>, void ()>::operator()() /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/functional:1714:12 (clickhouse+0x11ab538b)
#22 std::__1::__function::__value_func<void ()>::operator()() const /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/functional:1867:16 (clickhouse+0x8346263)
#23 std::__1::function<void ()>::operator()() const /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/functional:2473:12 (clickhouse+0x8346263)
#24 ThreadPoolImpl<std::__1::thread>::worker(std::__1::__list_iterator<std::__1::thread, void*>) /build/obj-x86_64-linux-gnu/../src/Common/ThreadPool.cpp:236:17 (clickhouse+0x8346263)
#25 void ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda1'()::operator()() const /build/obj-x86_64-linux-gnu/../src/Common/ThreadPool.cpp:117:73 (clickhouse+0x8349ea8)
#26 decltype(std::__1::forward<void>(fp)(std::__1::forward<void ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda1'()>(fp0)...)) std::__1::__invoke<void ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda1'()>(void&&, void ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda1'()&&...) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/type_traits:3519:1 (clickhouse+0x8349ea8)
#27 void std::__1::__thread_execute<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, void ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda1'()>(std::__1::tuple<void, void ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda1'()>&, std::__1::__tuple_indices<>) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/thread:273:5 (clickhouse+0x8349ea8)
#28 void* std::__1::__thread_proxy<std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, void ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda1'()> >(void*) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/thread:284:5 (clickhouse+0x8349ea8)
Fix Distributed queries finishing (avoid connection resets)
…4d340bf4a67b8d307e9ee6fc5f0
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Original pull-request #17006
This pull-request is a first step of an automated backporting.
It contains changes like after calling a local command
git cherry-pick.If you intend to continue backporting this changes, then resolve all conflicts if any.
Otherwise, if you do not want to backport them, then just close this pull-request.
The check results does not matter at this step - you can safely ignore them.
Also this pull-request will be merged automatically as it reaches the mergeable state, but you always can merge it manually.