Skip to content

Handle KILL requests while running pipeline executors#26675

Merged
KochetovNicolai merged 2 commits intoClickHouse:masterfrom
Algunenano:kill_better
Aug 2, 2021
Merged

Handle KILL requests while running pipeline executors#26675
KochetovNicolai merged 2 commits intoClickHouse:masterfrom
Algunenano:kill_better

Conversation

@Algunenano
Copy link
Copy Markdown
Member

I hereby agree to the terms of the CLA available at: https://yandex.ru/legal/cla/?lang=en

Changelog category (leave one):

  • Improvement

Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):
Improve handling of KILL QUERY requests.

Detailed description / Documentation draft:

Related to #26554. It addresses the KILL not working for those queries, but not the timeout not triggering.

Things that I tried before deciding in favor of using a callback:

  • Checking if the process_list_entry was killed in the handler loop. This worked well, except that it only works for the handlers that have external loops you can use to check the flag (TCPHandler, GRPCServer), leaving the HTTP handler not detecting it.
  • Add a pointer to the pipeline executor to QueryStatus, so it calls cancel() on it when a request is received (similar to what is done with the BlockInputStreamPtr). The problem this had was that there are multiple pipeline executors and they don't have a base class in common.
  • The current proposal: Add an optional callback to QueryStatus that gets called on cancelQuery. This allows anything to be called in addition to the current behaviour (calling cancel on input and output streams) which isn't enough in some cases.

@robot-clickhouse robot-clickhouse added the pr-improvement Pull request with some product improvements label Jul 21, 2021
@Algunenano
Copy link
Copy Markdown
Member Author

TSAN found a valid issue as QueryState::is_cancelled was being written / read by 2 threads at the same time.

GRPC should be fine (Call::want_to_cancel was already atomic), and PipelineExecutor::cancel() should be fine too (as everything is either atomic or accessed under mutex, or both).

@Algunenano
Copy link
Copy Markdown
Member Author

Stress test (thread) — Sanitizer assert (in stderr.log)

Looking at stderr.log it's pointing to an issue in rocksdb:

WARNING: ThreadSanitizer: unlock of an unlocked mutex (or by a wrong thread) (pid=432)
    #0 pthread_mutex_unlock <null> (clickhouse+0x952d586)
    #1 rocksdb::port::Mutex::Unlock() obj-x86_64-linux-gnu/../contrib/rocksdb/port/port_posix.cc:90:25 (clickhouse+0x1a7bbe40)
    #2 rocksdb::(anonymous namespace)::PosixFileSystem::LockFile(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, rocksdb::IOOptions const&, rocksdb::FileLock**, rocksdb::IODebugContext*) obj-x86_64-linux-gnu/../contrib/rocksdb/env/fs_posix.cc:817:26 (clickhouse+0x1a7c6ba3)
    #3 rocksdb::CompositeEnv::LockFile(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, rocksdb::FileLock**) obj-x86_64-linux-gnu/../contrib/rocksdb/env/composite_env_wrapper.h:154:26 (clickhouse+0x1a587f54)
...

And looking at the logs, in stress_run_logs.tar.gz > stress_test_run_0.txt is complaining about a data race in ThreadFromGlobalPool::joinable():

==================
WARNING: ThreadSanitizer: data race (pid=44510)
 Read of size 8 at 0x7b6000003778 by main thread:
   #0 std::__1::shared_ptr<Poco::Event>::get() const obj-x86_64-linux-gnu/../contrib/libcxx/include/memory:2835:49 (clickhouse+0x1529e8ac)
   #1 std::__1::shared_ptr<Poco::Event>::operator bool() const obj-x86_64-linux-gnu/../contrib/libcxx/include/memory:2851:62 (clickhouse+0x1529e8ac)
   #2 bool std::__1::operator!=<Poco::Event>(std::__1::shared_ptr<Poco::Event> const&, std::nullptr_t) obj-x86_64-linux-gnu/../contrib/libcxx/include/memory:3447:30 (clickhouse+0x1529e8ac)
   #3 ThreadFromGlobalPool::joinable() const obj-x86_64-linux-gnu/../src/Common/ThreadPool.h:223:22 (clickhouse+0x1529e8ac)
   #4 DB::ParallelParsingInputFormat::finishAndWait() obj-x86_64-linux-gnu/../src/Processors/Formats/Impl/ParallelParsingInputFormat.h:272:32 (clickhouse+0x1529e8ac)
   #5 DB::ParallelParsingInputFormat::onCancel() obj-x86_64-linux-gnu/../src/Processors/Formats/Impl/ParallelParsingInputFormat.h:137:9 (clickhouse+0x1529de19)
   #6 DB::IProcessor::cancel() obj-x86_64-linux-gnu/../src/Processors/IProcessor.h:237:9 (clickhouse+0x116228ef)
   #7 DB::InputStreamFromInputFormat::cancel(bool) obj-x86_64-linux-gnu/../src/Processors/Formats/InputStreamFromInputFormat.h:31:23 (clickhouse+0x116228ef)
   #8 DB::IBlockInputStream::cancel(bool)::$_2::operator()(DB::IBlockInputStream&) const obj-x86_64-linux-gnu/../src/DataStreams/IBlockInputStream.cpp:285:15 (clickhouse+0x13b1d4ce)
   #9 void DB::IBlockInputStream::forEachChild<DB::IBlockInputStream::cancel(bool)::$_2>(DB::IBlockInputStream::cancel(bool)::$_2&&) obj-x86_64-linux-gnu/../src/DataStreams/IBlockInputStream.h:265:17 (clickhouse+0x13b1d4ce)
   #10 DB::IBlockInputStream::cancel(bool) obj-x86_64-linux-gnu/../src/DataStreams/IBlockInputStream.cpp:283:5 (clickhouse+0x13b1d4ce)
   #11 DB::AsynchronousBlockInputStream::cancel(bool) obj-x86_64-linux-gnu/../src/DataStreams/AsynchronousBlockInputStream.h:84:28 (clickhouse+0x13b16d00)
   #12 DB::Client::sendDataFrom(DB::ReadBuffer&, DB::Block&, DB::ColumnsDescription const&) obj-x86_64-linux-gnu/../programs/client/Client.cpp:1952:36 (clickhouse+0x96bcc37)
   #13 DB::Client::sendData(DB::Block&, DB::ColumnsDescription const&) obj-x86_64-linux-gnu/../programs/client/Client.cpp:1904:17 (clickhouse+0x96b7a3e)
   #14 DB::Client::processInsertQuery() obj-x86_64-linux-gnu/../programs/client/Client.cpp:1816:13 (clickhouse+0x96b2117)
   #15 DB::Client::processParsedSingleQuery(std::__1::optional<bool>) obj-x86_64-linux-gnu/../programs/client/Client.cpp:1675:17 (clickhouse+0x96b0f17)
   #16 DB::Client::processTextAsSingleQuery(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) obj-x86_64-linux-gnu/../programs/client/Client.cpp:1598:9 (clickhouse+0x96ac7fa)
   #17 DB::Client::processQueryText(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) obj-x86_64-linux-gnu/../programs/client/Client.cpp:924:13 (clickhouse+0x969f84a)
   #18 DB::Client::nonInteractive() obj-x86_64-linux-gnu/../programs/client/Client.cpp:913:13 (clickhouse+0x96a02a0)
   #19 DB::Client::mainImpl() obj-x86_64-linux-gnu/../programs/client/Client.cpp:717:13 (clickhouse+0x9699e13)
   #20 DB::Client::main(std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&) obj-x86_64-linux-gnu/../programs/client/Client.cpp:301:20 (clickhouse+0x9695ba0)
   #21 Poco::Util::Application::run() obj-x86_64-linux-gnu/../contrib/poco/Util/src/Application.cpp:334:8 (clickhouse+0x18479b2b)
   #22 mainEntryClickHouseClient(int, char**) obj-x86_64-linux-gnu/../programs/client/Client.cpp:2771:23 (clickhouse+0x96875e4)
   #23 main obj-x86_64-linux-gnu/../programs/main.cpp:366:12 (clickhouse+0x95a1559)

 Previous write of size 8 at 0x7b6000003778 by thread T1:
   #0 std::__1::enable_if<(is_move_constructible<Poco::Event*>::value) && (is_move_assignable<Poco::Event*>::value), void>::type std::__1::swap<Poco::Event*>(Poco::Event*&, Poco::Event*&) obj-x86_64-linux-gnu/../contrib/libcxx/include/type_traits:3935:9 (clickhouse+0x1529e8d8)
   #1 std::__1::shared_ptr<Poco::Event>::swap(std::__1::shared_ptr<Poco::Event>&) obj-x86_64-linux-gnu/../contrib/libcxx/include/memory:3299:5 (clickhouse+0x1529e8d8)
   #2 std::__1::shared_ptr<Poco::Event>::reset() obj-x86_64-linux-gnu/../contrib/libcxx/include/memory:3308:18 (clickhouse+0x1529e8d8)
   #3 ThreadFromGlobalPool::join() obj-x86_64-linux-gnu/../src/Common/ThreadPool.h:211:15 (clickhouse+0x1529e8d8)
   #4 DB::ParallelParsingInputFormat::finishAndWait() obj-x86_64-linux-gnu/../src/Processors/Formats/Impl/ParallelParsingInputFormat.h:273:32 (clickhouse+0x1529e8d8)
   #5 DB::ParallelParsingInputFormat::onCancel() obj-x86_64-linux-gnu/../src/Processors/Formats/Impl/ParallelParsingInputFormat.h:137:9 (clickhouse+0x1529de19)
   #6 DB::IProcessor::cancel() obj-x86_64-linux-gnu/../src/Processors/IProcessor.h:237:9 (clickhouse+0x116228ef)
   #7 DB::InputStreamFromInputFormat::cancel(bool) obj-x86_64-linux-gnu/../src/Processors/Formats/InputStreamFromInputFormat.h:31:23 (clickhouse+0x116228ef)
   #8 DB::IBlockInputStream::read() obj-x86_64-linux-gnu/../src/DataStreams/IBlockInputStream.cpp:80:9 (clickhouse+0x13b1bcb4)
   #9 DB::AsynchronousBlockInputStream::calculate() obj-x86_64-linux-gnu/../src/DataStreams/AsynchronousBlockInputStream.cpp:74:34 (clickhouse+0x13b160b3)
   #10 DB::AsynchronousBlockInputStream::next()::$_0::operator()() const obj-x86_64-linux-gnu/../src/DataStreams/AsynchronousBlockInputStream.cpp:59:9 (clickhouse+0x13b164b4)
   #11 decltype(std::__1::forward<DB::AsynchronousBlockInputStream::next()::$_0&>(fp)()) std::__1::__invoke<DB::AsynchronousBlockInputStream::next()::$_0&>(DB::AsynchronousBlockInputStream::next()::$_0&) obj-x86_64-linux-gnu/../contrib/libcxx/include/type_traits:3676:1 (clickhouse+0x13b164b4)
   #12 void std::__1::__invoke_void_return_wrapper<void>::__call<DB::AsynchronousBlockInputStream::next()::$_0&>(DB::AsynchronousBlockInputStream::next()::$_0&) obj-x86_64-linux-gnu/../contrib/libcxx/include/__functional_base:348:9 (clickhouse+0x13b164b4)
   #13 std::__1::__function::__default_alloc_func<DB::AsynchronousBlockInputStream::next()::$_0, void ()>::operator()() obj-x86_64-linux-gnu/../contrib/libcxx/include/functional:1608:12 (clickhouse+0x13b164b4)
   #14 void std::__1::__function::__policy_invoker<void ()>::__call_impl<std::__1::__function::__default_alloc_func<DB::AsynchronousBlockInputStream::next()::$_0, void ()> >(std::__1::__function::__policy_storage const*) obj-x86_64-linux-gnu/../contrib/libcxx/include/functional:2089:16 (clickhouse+0x13b164b4)
   #15 std::__1::__function::__policy_func<void ()>::operator()() const obj-x86_64-linux-gnu/../contrib/libcxx/include/functional:2221:16 (clickhouse+0x9646750)
   #16 std::__1::function<void ()>::operator()() const obj-x86_64-linux-gnu/../contrib/libcxx/include/functional:2560:12 (clickhouse+0x9646750)
   #17 ThreadPoolImpl<ThreadFromGlobalPool>::worker(std::__1::__list_iterator<ThreadFromGlobalPool, void*>) obj-x86_64-linux-gnu/../src/Common/ThreadPool.cpp:266:17 (clickhouse+0x9646750)
   #18 void ThreadPoolImpl<ThreadFromGlobalPool>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda0'()::operator()() const obj-x86_64-linux-gnu/../src/Common/ThreadPool.cpp:136:73 (clickhouse+0x9648ba0)
   #19 decltype(std::__1::forward<void>(fp)(std::__1::forward<void ThreadPoolImpl<ThreadFromGlobalPool>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda0'()&>(fp0)...)) std::__1::__invoke_constexpr<void ThreadPoolImpl<ThreadFromGlobalPool>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda0'()&>(void&&, void ThreadPoolImpl<ThreadFromGlobalPool>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda0'()&...) obj-x86_64-linux-gnu/../contrib/libcxx/include/type_traits:3682:1 (clickhouse+0x9648ba0)
   #20 decltype(auto) std::__1::__apply_tuple_impl<void ThreadPoolImpl<ThreadFromGlobalPool>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda0'()&, std::__1::tuple<>&>(void&&, void ThreadPoolImpl<ThreadFromGlobalPool>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda0'()&, std::__1::__tuple_indices<std::__1::tuple<>&...>) obj-x86_64-linux-gnu/../contrib/libcxx/include/tuple:1415:1 (clickhouse+0x9648ba0)
   #21 decltype(auto) std::__1::apply<void ThreadPoolImpl<ThreadFromGlobalPool>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda0'()&, std::__1::tuple<>&>(void&&, void ThreadPoolImpl<ThreadFromGlobalPool>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda0'()&) obj-x86_64-linux-gnu/../contrib/libcxx/include/tuple:1424:1 (clickhouse+0x9648ba0)
   #22 ThreadFromGlobalPool::ThreadFromGlobalPool<void ThreadPoolImpl<ThreadFromGlobalPool>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda0'()>(void&&, void ThreadPoolImpl<ThreadFromGlobalPool>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda0'()&&...)::'lambda'()::operator()() obj-x86_64-linux-gnu/../src/Common/ThreadPool.h:182:13 (clickhouse+0x9648ba0)
   #23 decltype(std::__1::forward<void>(fp)(std::__1::forward<void ThreadPoolImpl<ThreadFromGlobalPool>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda0'()>(fp0)...)) std::__1::__invoke<ThreadFromGlobalPool::ThreadFromGlobalPool<void ThreadPoolImpl<ThreadFromGlobalPool>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda0'()>(void&&, void ThreadPoolImpl<ThreadFromGlobalPool>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda0'()&&...)::'lambda'()&>(void&&, void ThreadPoolImpl<ThreadFromGlobalPool>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda0'()&&...) obj-x86_64-linux-gnu/../contrib/libcxx/include/type_traits:3676:1 (clickhouse+0x9648b01)
   #24 void std::__1::__invoke_void_return_wrapper<void>::__call<ThreadFromGlobalPool::ThreadFromGlobalPool<void ThreadPoolImpl<ThreadFromGlobalPool>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda0'()>(void&&, void ThreadPoolImpl<ThreadFromGlobalPool>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda0'()&&...)::'lambda'()&>(void&&...) obj-x86_64-linux-gnu/../contrib/libcxx/include/__functional_base:348:9 (clickhouse+0x9648b01)
   #25 std::__1::__function::__default_alloc_func<ThreadFromGlobalPool::ThreadFromGlobalPool<void ThreadPoolImpl<ThreadFromGlobalPool>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda0'()>(void&&, void ThreadPoolImpl<ThreadFromGlobalPool>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda0'()&&...)::'lambda'(), void ()>::operator()() obj-x86_64-linux-gnu/../contrib/libcxx/include/functional:1608:12 (clickhouse+0x9648b01)
   #26 void std::__1::__function::__policy_invoker<void ()>::__call_impl<std::__1::__function::__default_alloc_func<ThreadFromGlobalPool::ThreadFromGlobalPool<void ThreadPoolImpl<ThreadFromGlobalPool>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda0'()>(void&&, void ThreadPoolImpl<ThreadFromGlobalPool>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda0'()&&...)::'lambda'(), void ()> >(std::__1::__function::__policy_storage const*) obj-x86_64-linux-gnu/../contrib/libcxx/include/functional:2089:16 (clickhouse+0x9648b01)
   #27 std::__1::__function::__policy_func<void ()>::operator()() const obj-x86_64-linux-gnu/../contrib/libcxx/include/functional:2221:16 (clickhouse+0x9644015)
   #28 std::__1::function<void ()>::operator()() const obj-x86_64-linux-gnu/../contrib/libcxx/include/functional:2560:12 (clickhouse+0x9644015)
   #29 ThreadPoolImpl<std::__1::thread>::worker(std::__1::__list_iterator<std::__1::thread, void*>) obj-x86_64-linux-gnu/../src/Common/ThreadPool.cpp:266:17 (clickhouse+0x9644015)
   #30 void ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda0'()::operator()() const obj-x86_64-linux-gnu/../src/Common/ThreadPool.cpp:136:73 (clickhouse+0x9647398)
   #31 decltype(std::__1::forward<void>(fp)(std::__1::forward<void ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda0'()>(fp0)...)) std::__1::__invoke<void ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda0'()>(void&&, void ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda0'()&&...) obj-x86_64-linux-gnu/../contrib/libcxx/include/type_traits:3676:1 (clickhouse+0x9647398)
   #32 void std::__1::__thread_execute<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, void ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda0'()>(std::__1::tuple<void, void ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda0'()>&, std::__1::__tuple_indices<>) obj-x86_64-linux-gnu/../contrib/libcxx/include/thread:280:5 (clickhouse+0x9647398)
   #33 void* std::__1::__thread_proxy<std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, void ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda0'()> >(void*) obj-x86_64-linux-gnu/../contrib/libcxx/include/thread:291:5 (clickhouse+0x9647398)

 Location is heap block of size 968 at 0x7b6000003400 allocated by main thread:
   #0 operator new(unsigned long) <null> (clickhouse+0x959ea27)
   #1 void* std::__1::__libcpp_operator_new<unsigned long>(unsigned long) obj-x86_64-linux-gnu/../contrib/libcxx/include/new:235:10 (clickhouse+0x1517c125)
   #2 std::__1::__libcpp_allocate(unsigned long, unsigned long) obj-x86_64-linux-gnu/../contrib/libcxx/include/new:261:10 (clickhouse+0x1517c125)
   #3 std::__1::allocator<std::__1::__shared_ptr_emplace<DB::ParallelParsingInputFormat, std::__1::allocator<DB::ParallelParsingInputFormat> > >::allocate(unsigned long) obj-x86_64-linux-gnu/../contrib/libcxx/include/memory:840:38 (clickhouse+0x1517c125)

Both seem completely unrelated to this change. I'm not sure what should be done here. Open tickets for both?

@Algunenano
Copy link
Copy Markdown
Member Author

Reported both issues. The failing integration tests look flaky

@Algunenano
Copy link
Copy Markdown
Member Author

Rebased on top of master/HEAD

@KochetovNicolai KochetovNicolai self-assigned this Jul 26, 2021
@KochetovNicolai
Copy link
Copy Markdown
Member

It's generally fine.

The problem this had was that there are multiple pipeline executors and they don't have a base class in common

Actually, there is one PipelineExecutor, which already accept QueryStatus * (I hope it is correct in most of the cases :). All other executors are wrappers over main PipelineExecutor. So, I think, it is easier to register PipelineExecutor (or several) in QueryStatus as soon as we start execution (and also deregister on query finish). We can also (de)register callback.

@Algunenano
Copy link
Copy Markdown
Member Author

Actually, there is one PipelineExecutor, which already accept QueryStatus * (I hope it is correct in most of the cases :). All other executors are wrappers over main PipelineExecutor

The problem I had with that is that we need to call cancel() over the wrapper not the internal executor so that the whole process is cancelled. For example, TCPHandler::processOrdinaryQueryWithProcessors uses a PullingAsyncPipelineExecutor while the executeQuery uses a raw PipelineExecutor so I don't see how to register a PipeLineExecutor for both cases.
Am I wrong understanding that we need to call PullingAsyncPipelineExecutor::cancel() and not its data->executor->cancel() directly (which is the common part)?

So, I think, it is easier to register PipelineExecutor (or several) in QueryStatus as soon as we start execution (and also deregister on query finish)

Maybe it could be done when creating the PipelineExecutor and removing it on finish()? If it's passed correctly (not NULL for external queries) then we could handle registration and deregistration there (give QueryStatus an array of pointers to PipelineExecutor and have it call to cancel() on all of them). Would that be safe?

Another question related to this indirectly. Any ideas on how to improve timeout handling on the PipelineExecutor? Currently it's only checked very rarely (at the end) which can be hours in some extreme cases.
One idea I had was to add Pipe::checkTimeLimit() that calls processor->checkTimeLimit() (if its a ISourceWithProgress) which can check with their streams, so it does something like Pipeline -> Pipe -> processor -> stream; but the problem I had was where / when to check it. In the TCPHandler I could add it to its loop, but that doesn't work for the HTTPHandler. Postgis does a similar check during allocations (but it's only a boolean flag check) but I don't know what would be the best way to do a similar check in CH. Maybe creating a task at the start of the query in a different thread (and that can call cancel too)?

@KochetovNicolai
Copy link
Copy Markdown
Member

Am I wrong understanding that we need to call PullingAsyncPipelineExecutor::cancel() and not its data->executor->cancel() directly (which is the common part)?

Well, we have to call PullingAsyncPipelineExecutor::cancel() indeed. Now it is only needed to flush a queue in output format, but I think I can do it the way that calling internal executor would be enough.

Maybe it could be done when creating the PipelineExecutor and removing it on finish()? If it's passed correctly (not NULL for external queries) then we could handle registration and deregistration there (give QueryStatus an array of pointers to PipelineExecutor and have it call to cancel() on all of them). Would that be safe?

Yes, I had the same idea. To use RAII or something.

Any ideas on how to improve timeout handling on the PipelineExecutor?

Right now, timeout is checked in SourceWithProgress for each chunk of data. It is not the best approach, I have just implemented it the same way as it is done in IBlockInputStream.
I think timeouts like query execution time must be checked in PipelineExecutor itself (maybe, also, other limits like max_rows/bytes_to_read). We would not need another one separate thread - every executing thread may check limits itself per iteration.
I think we need to put those limits into pipeline (or maybe QueryStatus).
Checking per allocation is probably an overkill. We need to wait for executing threads anyway.

@Algunenano
Copy link
Copy Markdown
Member Author

Well, we have to call PullingAsyncPipelineExecutor::cancel() indeed. Now it is only needed to flush a queue in output format, but I think I can do it the way that calling internal executor would be enough.

Looking at the code it seems that if the internal PipelineExecutor is cancelled, the PullingAsyncPipelineExecutor will be cancelled either in the next pull or in the destructor (if an exception is thrown) so it looks safe.

I'm going to try and change this into a automatic registration in the PipelineExecutor (constructor and destructor maybe?) and keep a list of executors in the QueryStatus and see what happens.

@KochetovNicolai
Copy link
Copy Markdown
Member

Looking at the code it seems that if the internal PipelineExecutor is cancelled, the PullingAsyncPipelineExecutor will be cancelled either in the next pull or in the destructor (if an exception is thrown) so it looks safe.

Yes. But also maybe we need changes from #26828

@Algunenano Algunenano force-pushed the kill_better branch 3 times, most recently from 6e5d740 to 89fa2fd Compare July 28, 2021 16:12
@Algunenano
Copy link
Copy Markdown
Member Author

Rebased on top of master (includes #26828)

@Algunenano
Copy link
Copy Markdown
Member Author

Rebased again on top of master, which should remove some of the failures in CI (01921_datatype_date32). The others seem unrelated too.

Copy link
Copy Markdown
Member

@KochetovNicolai KochetovNicolai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perfect!

@KochetovNicolai
Copy link
Copy Markdown
Member

Integration tests (thread) - flacky

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

pr-improvement Pull request with some product improvements

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants