Skip to content

Commit 70d1f87

Browse files
pitroucyb70289
authored andcommitted
ARROW-15593: [C++] Make after-fork ThreadPool reinitialization thread-safe
Since after-fork reinitialization is triggered when one of the ThreadPool methods is called, it can be very well be called from multiple threads at once. Make it thread-safe. Closes #12358 from pitrou/ARROW-15593-thread-pool-race-condition Authored-by: Antoine Pitrou <[email protected]> Signed-off-by: Yibo Cai <[email protected]>
1 parent d056829 commit 70d1f87

File tree

5 files changed

+181
-35
lines changed

5 files changed

+181
-35
lines changed

cpp/src/arrow/util/mutex.cc

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@
1919

2020
#include <mutex>
2121

22+
#ifndef _WIN32
23+
#include <pthread.h>
24+
#include <atomic>
25+
#endif
26+
2227
#include "arrow/util/logging.h"
2328

2429
namespace arrow {
@@ -50,5 +55,31 @@ Mutex::Guard Mutex::Lock() {
5055

5156
Mutex::Mutex() : impl_(new Impl, [](Impl* impl) { delete impl; }) {}
5257

58+
#ifndef _WIN32
59+
namespace {
60+
61+
struct AfterForkState {
62+
// A global instance that will also register the atfork handler when
63+
// constructed.
64+
static AfterForkState instance;
65+
66+
// The mutex may be used at shutdown, so make it eternal.
67+
// The leak (only in child processes) is a small price to pay for robustness.
68+
Mutex* mutex = nullptr;
69+
70+
private:
71+
AfterForkState() {
72+
pthread_atfork(/*prepare=*/nullptr, /*parent=*/nullptr, /*child=*/&AfterFork);
73+
}
74+
75+
static void AfterFork() { instance.mutex = new Mutex; }
76+
};
77+
78+
AfterForkState AfterForkState::instance;
79+
} // namespace
80+
81+
Mutex* GlobalForkSafeMutex() { return AfterForkState::instance.mutex; }
82+
#endif // _WIN32
83+
5384
} // namespace util
5485
} // namespace arrow

cpp/src/arrow/util/mutex.h

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,5 +60,26 @@ class ARROW_EXPORT Mutex {
6060
std::unique_ptr<Impl, void (*)(Impl*)> impl_;
6161
};
6262

63+
#ifndef _WIN32
64+
/// Return a pointer to a process-wide, process-specific Mutex that can be used
65+
/// at any point in a child process. NULL is returned when called in the parent.
66+
///
67+
/// The rule is to first check that getpid() corresponds to the parent process pid
68+
/// and, if not, call this function to lock any after-fork reinitialization code.
69+
/// Like this:
70+
///
71+
/// std::atomic<pid_t> pid{getpid()};
72+
/// ...
73+
/// if (pid.load() != getpid()) {
74+
/// // In child process
75+
/// auto lock = GlobalForkSafeMutex()->Lock();
76+
/// if (pid.load() != getpid()) {
77+
/// // Reinitialize internal structures after fork
78+
/// ...
79+
/// pid.store(getpid());
80+
ARROW_EXPORT
81+
Mutex* GlobalForkSafeMutex();
82+
#endif
83+
6384
} // namespace util
6485
} // namespace arrow

cpp/src/arrow/util/thread_pool.cc

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
#include "arrow/util/io_util.h"
3030
#include "arrow/util/logging.h"
31+
#include "arrow/util/mutex.h"
3132

3233
namespace arrow {
3334
namespace internal {
@@ -235,24 +236,28 @@ ThreadPool::~ThreadPool() {
235236
void ThreadPool::ProtectAgainstFork() {
236237
#ifndef _WIN32
237238
pid_t current_pid = getpid();
238-
if (pid_ != current_pid) {
239-
// Reinitialize internal state in child process after fork()
240-
// Ideally we would use pthread_at_fork(), but that doesn't allow
241-
// storing an argument, hence we'd need to maintain a list of all
242-
// existing ThreadPools.
243-
int capacity = state_->desired_capacity_;
244-
245-
auto new_state = std::make_shared<ThreadPool::State>();
246-
new_state->please_shutdown_ = state_->please_shutdown_;
247-
new_state->quick_shutdown_ = state_->quick_shutdown_;
248-
249-
pid_ = current_pid;
250-
sp_state_ = new_state;
251-
state_ = sp_state_.get();
252-
253-
// Launch worker threads anew
254-
if (!state_->please_shutdown_) {
255-
ARROW_UNUSED(SetCapacity(capacity));
239+
if (pid_.load() != current_pid) {
240+
// Reinitialize internal state in child process after fork().
241+
{
242+
// Since after-fork reinitialization is triggered when one of the ThreadPool
243+
// methods is called, it can be very well be called from multiple threads
244+
// at once. Therefore, it needs to be guarded with a lock.
245+
auto lock = util::GlobalForkSafeMutex()->Lock();
246+
247+
if (pid_.load() != current_pid) {
248+
int capacity = state_->desired_capacity_;
249+
250+
auto new_state = std::make_shared<ThreadPool::State>();
251+
new_state->please_shutdown_ = state_->please_shutdown_;
252+
new_state->quick_shutdown_ = state_->quick_shutdown_;
253+
254+
sp_state_ = new_state;
255+
state_ = sp_state_.get();
256+
pid_ = current_pid;
257+
258+
// Launch worker threads anew
259+
ARROW_UNUSED(SetCapacity(capacity));
260+
}
256261
}
257262
}
258263
#endif

cpp/src/arrow/util/thread_pool.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@
2121
#include <unistd.h>
2222
#endif
2323

24+
#ifndef _WIN32
25+
#include <atomic>
26+
#endif
27+
2428
#include <cstdint>
2529
#include <memory>
2630
#include <queue>
@@ -373,7 +377,7 @@ class ARROW_EXPORT ThreadPool : public Executor {
373377
State* state_;
374378
bool shutdown_on_destroy_;
375379
#ifndef _WIN32
376-
pid_t pid_;
380+
std::atomic<pid_t> pid_;
377381
#endif
378382
};
379383

cpp/src/arrow/util/thread_pool_test.cc

Lines changed: 101 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
#include "arrow/testing/future_util.h"
3737
#include "arrow/testing/gtest_util.h"
3838
#include "arrow/util/io_util.h"
39+
#include "arrow/util/logging.h"
3940
#include "arrow/util/macros.h"
4041
#include "arrow/util/test_common.h"
4142
#include "arrow/util/thread_pool.h"
@@ -610,32 +611,42 @@ TEST_F(TestThreadPool, SubmitWithStopTokenCancelled) {
610611

611612
#if !(defined(_WIN32) || defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER) || \
612613
defined(THREAD_SANITIZER))
613-
TEST_F(TestThreadPool, ForkSafety) {
614-
pid_t child_pid;
615-
int child_status;
616614

615+
class TestThreadPoolForkSafety : public TestThreadPool {
616+
public:
617+
void CheckChildExit(int child_pid) {
618+
ASSERT_GT(child_pid, 0);
619+
int child_status;
620+
int got_pid = waitpid(child_pid, &child_status, 0);
621+
ASSERT_EQ(got_pid, child_pid);
622+
if (WIFSIGNALED(child_status)) {
623+
FAIL() << "Child terminated by signal " << WTERMSIG(child_status);
624+
}
625+
if (!WIFEXITED(child_status)) {
626+
FAIL() << "Child didn't terminate normally?? Child status = " << child_status;
627+
}
628+
ASSERT_EQ(WEXITSTATUS(child_status), 0);
629+
}
630+
};
631+
632+
TEST_F(TestThreadPoolForkSafety, Basics) {
617633
{
618634
// Fork after task submission
619635
auto pool = this->MakeThreadPool(3);
620636
ASSERT_OK_AND_ASSIGN(auto fut, pool->Submit(add<int>, 4, 5));
621637
ASSERT_OK_AND_EQ(9, fut.result());
622638

623-
child_pid = fork();
639+
auto child_pid = fork();
624640
if (child_pid == 0) {
625641
// Child: thread pool should be usable
626642
ASSERT_OK_AND_ASSIGN(fut, pool->Submit(add<int>, 3, 4));
627-
if (*fut.result() != 7) {
628-
std::exit(1);
629-
}
643+
ASSERT_FINISHES_OK_AND_EQ(7, fut);
630644
// Shutting down shouldn't hang or fail
631645
Status st = pool->Shutdown();
632646
std::exit(st.ok() ? 0 : 2);
633647
} else {
634648
// Parent
635-
ASSERT_GT(child_pid, 0);
636-
ASSERT_GT(waitpid(child_pid, &child_status, 0), 0);
637-
ASSERT_TRUE(WIFEXITED(child_status));
638-
ASSERT_EQ(WEXITSTATUS(child_status), 0);
649+
CheckChildExit(child_pid);
639650
ASSERT_OK(pool->Shutdown());
640651
}
641652
}
@@ -644,7 +655,7 @@ TEST_F(TestThreadPool, ForkSafety) {
644655
auto pool = this->MakeThreadPool(3);
645656
ASSERT_OK(pool->Shutdown());
646657

647-
child_pid = fork();
658+
auto child_pid = fork();
648659
if (child_pid == 0) {
649660
// Child
650661
// Spawning a task should return with error (pool was shutdown)
@@ -657,13 +668,87 @@ TEST_F(TestThreadPool, ForkSafety) {
657668
std::exit(0);
658669
} else {
659670
// Parent
660-
ASSERT_GT(child_pid, 0);
661-
ASSERT_GT(waitpid(child_pid, &child_status, 0), 0);
662-
ASSERT_TRUE(WIFEXITED(child_status));
663-
ASSERT_EQ(WEXITSTATUS(child_status), 0);
671+
CheckChildExit(child_pid);
664672
}
665673
}
666674
}
675+
676+
TEST_F(TestThreadPoolForkSafety, MultipleChildThreads) {
677+
// ARROW-15593: race condition in after-fork ThreadPool reinitialization
678+
// when SpawnReal() was called from multiple threads in a forked child.
679+
auto run_in_child = [](ThreadPool* pool) {
680+
const int n_threads = 5;
681+
std::vector<Future<int>> futures;
682+
std::vector<std::thread> threads;
683+
futures.reserve(n_threads);
684+
threads.reserve(n_threads);
685+
686+
auto run_in_thread = [&]() {
687+
auto maybe_fut = pool->Submit(add<int>, 3, 4);
688+
futures.push_back(DeferNotOk(std::move(maybe_fut)));
689+
};
690+
691+
for (int i = 0; i < n_threads; ++i) {
692+
threads.emplace_back(run_in_thread);
693+
}
694+
for (auto& thread : threads) {
695+
thread.join();
696+
}
697+
for (const auto& fut : futures) {
698+
ASSERT_FINISHES_OK_AND_EQ(7, fut);
699+
}
700+
};
701+
702+
{
703+
auto pool = this->MakeThreadPool(3);
704+
ASSERT_OK_AND_ASSIGN(auto fut, pool->Submit(add<int>, 4, 5));
705+
ASSERT_OK_AND_EQ(9, fut.result());
706+
707+
auto child_pid = fork();
708+
if (child_pid == 0) {
709+
// Child: spawn tasks from multiple threads at once
710+
run_in_child(pool.get());
711+
std::exit(0);
712+
} else {
713+
// Parent
714+
CheckChildExit(child_pid);
715+
ASSERT_OK(pool->Shutdown());
716+
}
717+
}
718+
}
719+
720+
TEST_F(TestThreadPoolForkSafety, NestedChild) {
721+
{
722+
auto pool = this->MakeThreadPool(3);
723+
ASSERT_OK_AND_ASSIGN(auto fut, pool->Submit(add<int>, 4, 5));
724+
ASSERT_OK_AND_EQ(9, fut.result());
725+
726+
auto child_pid = fork();
727+
if (child_pid == 0) {
728+
// Child
729+
ASSERT_OK_AND_ASSIGN(fut, pool->Submit(add<int>, 3, 4));
730+
// Fork while the task is running
731+
auto grandchild_pid = fork();
732+
if (grandchild_pid == 0) {
733+
// Grandchild
734+
ASSERT_OK_AND_ASSIGN(fut, pool->Submit(add<int>, 1, 2));
735+
ASSERT_FINISHES_OK_AND_EQ(3, fut);
736+
ASSERT_OK(pool->Shutdown());
737+
} else {
738+
// Child
739+
CheckChildExit(grandchild_pid);
740+
ASSERT_FINISHES_OK_AND_EQ(7, fut);
741+
ASSERT_OK(pool->Shutdown());
742+
}
743+
std::exit(0);
744+
} else {
745+
// Parent
746+
CheckChildExit(child_pid);
747+
ASSERT_OK(pool->Shutdown());
748+
}
749+
}
750+
}
751+
667752
#endif
668753

669754
TEST(TestGlobalThreadPool, Capacity) {

0 commit comments

Comments
 (0)