Skip to content

Commit 69b3955

Browse files
tavplubixakuzm
authored andcommitted
Wait for jobs to finish on exception (fixes rare segfaults) (#7350)
(cherry picked from commit 0046b9f)
1 parent 38aece6 commit 69b3955

29 files changed

+154
-103
lines changed

dbms/programs/benchmark/Benchmark.cpp

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -274,15 +274,24 @@ class Benchmark : public Poco::Util::Application
274274
pcg64 generator(randomSeed());
275275
std::uniform_int_distribution<size_t> distribution(0, queries.size() - 1);
276276

277-
for (size_t i = 0; i < concurrency; ++i)
277+
try
278278
{
279-
EntryPtrs connection_entries;
280-
connection_entries.reserve(connections.size());
279+
for (size_t i = 0; i < concurrency; ++i)
280+
{
281+
EntryPtrs connection_entries;
282+
connection_entries.reserve(connections.size());
281283

282-
for (const auto & connection : connections)
283-
connection_entries.emplace_back(std::make_shared<Entry>(connection->get(ConnectionTimeouts::getTCPTimeoutsWithoutFailover(settings))));
284+
for (const auto & connection : connections)
285+
connection_entries.emplace_back(std::make_shared<Entry>(
286+
connection->get(ConnectionTimeouts::getTCPTimeoutsWithoutFailover(settings))));
284287

285-
pool.schedule(std::bind(&Benchmark::thread, this, connection_entries));
288+
pool.scheduleOrThrowOnError(std::bind(&Benchmark::thread, this, connection_entries));
289+
}
290+
}
291+
catch (...)
292+
{
293+
pool.wait();
294+
throw;
286295
}
287296

288297
InterruptListener interrupt_listener;

dbms/programs/copier/ClusterCopier.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -895,7 +895,7 @@ class ClusterCopier
895895
ThreadPool thread_pool(num_threads ? num_threads : 2 * getNumberOfPhysicalCPUCores());
896896

897897
for (const TaskShardPtr & task_shard : task_table.all_shards)
898-
thread_pool.schedule([this, timeouts, task_shard]() { discoverShardPartitions(timeouts, task_shard); });
898+
thread_pool.scheduleOrThrowOnError([this, timeouts, task_shard]() { discoverShardPartitions(timeouts, task_shard); });
899899

900900
LOG_DEBUG(log, "Waiting for " << thread_pool.active() << " setup jobs");
901901
thread_pool.wait();
@@ -2038,7 +2038,7 @@ class ClusterCopier
20382038
ThreadPool thread_pool(std::min<UInt64>(num_shards, getNumberOfPhysicalCPUCores()));
20392039

20402040
for (UInt64 shard_index = 0; shard_index < num_shards; ++shard_index)
2041-
thread_pool.schedule([=] { do_for_shard(shard_index); });
2041+
thread_pool.scheduleOrThrowOnError([=] { do_for_shard(shard_index); });
20422042

20432043
thread_pool.wait();
20442044
}

dbms/programs/server/TCPHandler.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -565,7 +565,7 @@ void TCPHandler::processOrdinaryQueryWithProcessors(size_t num_threads)
565565
auto executor = pipeline.execute();
566566
std::atomic_bool exception = false;
567567

568-
pool.schedule([&]()
568+
pool.scheduleOrThrowOnError([&]()
569569
{
570570
/// ThreadStatus thread_status;
571571

dbms/src/Common/ThreadPool.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,13 +121,13 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, int priority, std::opti
121121
}
122122

123123
template <typename Thread>
124-
void ThreadPoolImpl<Thread>::schedule(Job job, int priority)
124+
void ThreadPoolImpl<Thread>::scheduleOrThrowOnError(Job job, int priority)
125125
{
126126
scheduleImpl<void>(std::move(job), priority, std::nullopt);
127127
}
128128

129129
template <typename Thread>
130-
bool ThreadPoolImpl<Thread>::trySchedule(Job job, int priority, uint64_t wait_microseconds)
130+
bool ThreadPoolImpl<Thread>::trySchedule(Job job, int priority, uint64_t wait_microseconds) noexcept
131131
{
132132
return scheduleImpl<bool>(std::move(job), priority, wait_microseconds);
133133
}

dbms/src/Common/ThreadPool.h

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,18 +36,23 @@ class ThreadPoolImpl
3636
ThreadPoolImpl(size_t max_threads_, size_t max_free_threads_, size_t queue_size_);
3737

3838
/// Add new job. Locks until number of scheduled jobs is less than maximum or exception in one of threads was thrown.
39-
/// If an exception in some thread was thrown, method silently returns, and exception will be rethrown only on call to 'wait' function.
39+
/// If any thread was throw an exception, first exception will be rethrown from this method,
40+
/// and exception will be cleared.
41+
/// Also throws an exception if cannot create thread.
4042
/// Priority: greater is higher.
41-
void schedule(Job job, int priority = 0);
43+
/// NOTE: Probably you should call wait() if exception was thrown. If some previously scheduled jobs are using some objects,
44+
/// located on stack of current thread, the stack must not be unwinded until all jobs finished. However,
45+
/// if ThreadPool is a local object, it will wait for all scheduled jobs in own destructor.
46+
void scheduleOrThrowOnError(Job job, int priority = 0);
4247

43-
/// Wait for specified amount of time and schedule a job or return false.
44-
bool trySchedule(Job job, int priority = 0, uint64_t wait_microseconds = 0);
48+
/// Similar to scheduleOrThrowOnError(...). Wait for specified amount of time and schedule a job or return false.
49+
bool trySchedule(Job job, int priority = 0, uint64_t wait_microseconds = 0) noexcept;
4550

46-
/// Wait for specified amount of time and schedule a job or throw an exception.
51+
/// Similar to scheduleOrThrowOnError(...). Wait for specified amount of time and schedule a job or throw an exception.
4752
void scheduleOrThrow(Job job, int priority = 0, uint64_t wait_microseconds = 0);
4853

4954
/// Wait for all currently active jobs to be done.
50-
/// You may call schedule and wait many times in arbitary order.
55+
/// You may call schedule and wait many times in arbitrary order.
5156
/// If any thread was throw an exception, first exception will be rethrown from this method,
5257
/// and exception will be cleared.
5358
void wait();
@@ -140,7 +145,7 @@ class ThreadFromGlobalPool
140145
explicit ThreadFromGlobalPool(Function && func, Args &&... args)
141146
: state(std::make_shared<Poco::Event>())
142147
{
143-
/// NOTE: If this will throw an exception, the descructor won't be called.
148+
/// NOTE: If this will throw an exception, the destructor won't be called.
144149
GlobalThreadPool::instance().scheduleOrThrow([
145150
state = state,
146151
func = std::forward<Function>(func),

dbms/src/Common/tests/gtest_thread_pool_concurrent_wait.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,14 @@ TEST(ThreadPool, ConcurrentWait)
2121
ThreadPool pool(num_threads);
2222

2323
for (size_t i = 0; i < num_jobs; ++i)
24-
pool.schedule(worker);
24+
pool.scheduleOrThrowOnError(worker);
2525

2626
constexpr size_t num_waiting_threads = 4;
2727

2828
ThreadPool waiting_pool(num_waiting_threads);
2929

3030
for (size_t i = 0; i < num_waiting_threads; ++i)
31-
waiting_pool.schedule([&pool]{ pool.wait(); });
31+
waiting_pool.scheduleOrThrowOnError([&pool] { pool.wait(); });
3232

3333
waiting_pool.wait();
3434
}

dbms/src/Common/tests/gtest_thread_pool_global_full.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,11 @@ TEST(ThreadPool, GlobalFull1)
2929
ThreadPool pool(num_jobs);
3030

3131
for (size_t i = 0; i < capacity; ++i)
32-
pool.schedule(func);
32+
pool.scheduleOrThrowOnError(func);
3333

3434
for (size_t i = capacity; i < num_jobs; ++i)
3535
{
36-
EXPECT_THROW(pool.schedule(func), DB::Exception);
36+
EXPECT_THROW(pool.scheduleOrThrowOnError(func), DB::Exception);
3737
++counter;
3838
}
3939

@@ -66,10 +66,10 @@ TEST(ThreadPool, GlobalFull2)
6666

6767
ThreadPool pool(capacity, 0, capacity);
6868
for (size_t i = 0; i < capacity; ++i)
69-
pool.schedule(func);
69+
pool.scheduleOrThrowOnError(func);
7070

7171
ThreadPool another_pool(1);
72-
EXPECT_THROW(another_pool.schedule(func), DB::Exception);
72+
EXPECT_THROW(another_pool.scheduleOrThrowOnError(func), DB::Exception);
7373

7474
++counter;
7575

@@ -78,7 +78,7 @@ TEST(ThreadPool, GlobalFull2)
7878
global_pool.wait();
7979

8080
for (size_t i = 0; i < capacity; ++i)
81-
another_pool.schedule([&] { ++counter; });
81+
another_pool.scheduleOrThrowOnError([&] { ++counter; });
8282

8383
another_pool.wait();
8484
EXPECT_EQ(counter, capacity * 2 + 1);

dbms/src/Common/tests/gtest_thread_pool_limit.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ int test()
1414

1515
std::atomic<int> counter{0};
1616
for (size_t i = 0; i < 10; ++i)
17-
pool.schedule([&]{ ++counter; });
17+
pool.scheduleOrThrowOnError([&]{ ++counter; });
1818
pool.wait();
1919

2020
return counter;

dbms/src/Common/tests/gtest_thread_pool_loop.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ TEST(ThreadPool, Loop)
1414
size_t threads = 16;
1515
ThreadPool pool(threads);
1616
for (size_t j = 0; j < threads; ++j)
17-
pool.schedule([&]{ ++res; });
17+
pool.scheduleOrThrowOnError([&] { ++res; });
1818
pool.wait();
1919
}
2020

dbms/src/Common/tests/gtest_thread_pool_schedule_exception.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,12 @@ bool check()
99
{
1010
ThreadPool pool(10);
1111

12-
pool.schedule([]{ throw std::runtime_error("Hello, world!"); });
12+
pool.scheduleOrThrowOnError([] { throw std::runtime_error("Hello, world!"); });
1313

1414
try
1515
{
1616
for (size_t i = 0; i < 100; ++i)
17-
pool.schedule([]{}); /// An exception will be rethrown from this method.
17+
pool.scheduleOrThrowOnError([] {}); /// An exception will be rethrown from this method.
1818
}
1919
catch (const std::runtime_error &)
2020
{

0 commit comments

Comments
 (0)