Skip to content

Conversation

@furszy
Copy link
Member

@furszy furszy commented Oct 23, 2025

This has been a recent discovery; the general thread pool class created for #26966, cleanly
integrates into the HTTP server. It simplifies init, shutdown and requests execution logic.
Replacing code that was never unit tested for code that is properly unit and fuzz tested.
Although our functional test framework extensively uses this RPC interface (that’s how
we’ve been ensuring its correct behavior so far - which is not the best).

This clearly separates the responsibilities:
The HTTP server now focuses solely on receiving and dispatching requests, while ThreadPool handles
concurrency, queuing, and execution.

This will also allows us to experiment with further performance improvements at the task queuing and
execution level, such as a lock-free structure or task prioritization or any other implementation detail
like coroutines in the future, without having to deal with HTTP code that lives on a different layer.

Note:
The rationale behind introducing the ThreadPool first is to be able to easily cherry-pick it across different
working paths. Some of the ones that are benefited from it are #26966 for the parallelization of the indexes
initial sync, #31132 for the parallelization of the inputs fetching procedure, #32061 for the libevent replacement,
the kernel API #30595 (#30595 (comment)) to avoid blocking validation among others use cases not publicly available.

Note 2:
I could have created a wrapper around the existing code and replaced the WorkQueue in a subsequent
commit, but it didn’t seem worth the extra commits and review effort. The ThreadPool implements
essentially the same functionality in a more modern and cleaner way.

@DrahtBot
Copy link
Contributor

DrahtBot commented Oct 23, 2025

The following sections might be updated with supplementary metadata relevant to reviewers and maintainers.

Code Coverage & Benchmarks

For details see: https://corecheck.dev/bitcoin/bitcoin/pulls/33689.

Reviews

See the guideline for information on the review process.

Type Reviewers
Concept ACK Eunovo
Stale ACK pinheadmz, sedited, ismaelsadeeq

If your review is incorrectly listed, please copy-paste <!--meta-tag:bot-skip--> into the comment that the bot should ignore.

Conflicts

Reviewers, this pull request conflicts with the following ones:

  • #34032 (util: Add some more Unexpected and Expected methods by maflcko)
  • #32061 (Replace libevent with our own HTTP and socket-handling implementation by pinheadmz)

If you consider this pull request important, please also help to review the conflicting pull requests. Ideally, start with the one that should be merged first.

@pinheadmz
Copy link
Member

concept ACK :-) will be reviewing this

@laanwj laanwj self-requested a review October 23, 2025 14:49
@laanwj
Copy link
Member

laanwj commented Oct 23, 2025

Adding myself as i wrote the original shitty code.

@Raimo33
Copy link
Contributor

Raimo33 commented Oct 24, 2025

Can't we use an already existing open source library instead of reinventing the wheel?

@furszy
Copy link
Member Author

furszy commented Oct 24, 2025

Can't we use an already existing open source library instead of reinventing the wheel?

That's a good question. It's usually where we all start.
Generally, the project consensus is to avoid introducing new external dependencies (unless they’re maintained by us) to minimize potential attack vectors. This doesn’t mean we should reinvent everything, just that we need to be very careful about what we decide to include.

That being said, for the changes introduced in this PR, can argue that we’re encapsulating, documenting, and unit + fuzz testing code that wasn’t covered before, while also improving separation of responsibilities. We’re not adding anything more complex or that behaves radically differently from what we currently have.
The nice property of this PR is that it will let us experiment with more complex approaches in the future without having to deal with application-specific code (like the HTTP server code). This also includes learning from other open source libraries for sure.

@sedited
Copy link
Contributor

sedited commented Oct 27, 2025

Approach ACK

@furszy furszy force-pushed the 2025_threadpool_http_server branch from bb71b0e to 195a962 Compare October 28, 2025 19:08
Copy link
Member

@ismaelsadeeq ismaelsadeeq left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concept ACK, thanks for the seperation from the initial index PR.

Copy link
Member

@pinheadmz pinheadmz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ACK 195a96258f970c384ce180d57e73616904ef5fa1

Built and tested on macos/arm64 and debian/x86. Reviewed each commit and left a few comments.

I also tested the branch against other popular software that consumes the HTTP interface by running their CI:

The http worker threads are clearly labeled and visible in htop!

Image
Show Signature
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA256

ACK 195a96258f970c384ce180d57e73616904ef5fa1
-----BEGIN PGP SIGNATURE-----

iQIzBAEBCAAdFiEE5hdzzW4BBA4vG9eM5+KYS2KJyToFAmkCMdwACgkQ5+KYS2KJ
yTriXA//RRnzezUHdmzRlKmoSDA+ZBHz0RY3z2LGk63izb/YdYaJY9JBZL2Y9BA8
K2nyexdSDC/DFOm4H56ddEe6ChlB7w+uZc92SgFSLSvavInpZ80KEJRk07vgoIL7
hwuyyevWyOOU32iz1NE3q316TMaJmzVsPhRGwbdmTXNwJLtUX6g4czfh28ajW1DC
Y9ULKwT36rFHRcKwC1YZYuBJUNBZWQgVBcydcmS1UEykY4mBnCW9knrATwn/29b7
2AYPV+yuaiy9OpDEOJ8iKtZOPGR36NrIUleMUqruq4Sy2/TnJtm3AKNK0336/Fxu
MqVyPKSyusg7kBA6f2h/2+NHpbyLoboYhjZew+HvED/aFfi+Jla+nxybkUYXfciL
pzbND22TTuRGB1fKU7AwPD0TO9JwOTU385iEdpoGq8rbT3EpgPr31N4TeDQDJx5t
jPzzWZYj43JMuIc3bm/K5S2HYSdFZUEDQC81kbND+jOLF6YkJwS9794anLO38tvi
fip/eLK8Nw4pmWnW63/9lc+Y/1gLpgLDMxxhA1NKJyytk7z2IRo7vJKck6TqAjcZ
nU8Wv9/ful5ndDJfLIKuYT9jqRk9ORohVwv+P+ppuO8jhFjhuswxPlFJKkMXTeZn
hGi8QCrAUuibvVuLfLKVExJqSmmeUAkTVKp5ZTWLwNB7IkZrSoo=
=hZYm
-----END PGP SIGNATURE-----

pinheadmz's public key is on openpgp.org

@furszy furszy force-pushed the 2025_threadpool_http_server branch from 195a962 to f5eca8d Compare October 29, 2025 15:55
@furszy
Copy link
Member Author

furszy commented Oct 29, 2025

Oh, awesome extensive test and review @pinheadmz! you rock!

Copy link
Member

@pinheadmz pinheadmz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ACK f5eca8d

minor changes since last ack

Show Signature
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA256

ACK f5eca8d5252a68f0fbc8b5efec021c75744555b6
-----BEGIN PGP SIGNATURE-----

iQIzBAEBCAAdFiEE5hdzzW4BBA4vG9eM5+KYS2KJyToFAmkCWuYACgkQ5+KYS2KJ
yTpb9RAAgxNlDQlqkVfnIK2Xt8BQmRsiG+1KHQGJA2d3RHoxjFg2129fj47y5i+t
GlatlcsXEcibI2D0F22sKSzY1u0LWy4GYLI1d12JAIewA99n/lRr1ktDk3v64pp8
kldvYpd8UBs7DCHSJhPs4HbOPgwIILPASdSbQTb+6m48X5jg+cu+yMBuANVq3sbB
9I8rUlbUgRva5voy3EGRkXsGTuwcaCoLDNHnnjrqQkJMYTym47rMF/xTZJJ11isF
QrpzFu+P2tFy1oj0bGd4e5EhqNk++qfRCuw9sGLgLL6YEHzC/ihVH/L5NAxoeJX4
L0yX09BG5wDrbUQvZn4w1uaQz39Uor5mb8+tZyDUyRmO9MrG5AHomUtfdbiYLwSO
007bLD+WX4Hxode47xCdhUhqflXqbHD2mhkf5ESyUgGu3smSHSQQosuzIJiVmTn9
b2UJG6see/tckFvart6YLn1AIu9uCyUMmB+hZIcSasZv95oEHv1YB3E5dcKUmq0d
A1cUHIhNvU4i/hNy2xgijgSjDdpVQMFbLYUt9y4ZnzpJXFd7mnbpHVUVM1P+Onvp
ScAtgvosXfbd5PjnAQWsMWgSmVHUtSc4t9GPjGlRYaVpHFkEbqWpWeKrYqMkU3id
9YiFP9BueWNbNV7OBlB0LcfCpwO2WbvfJOW93/jxDovc0tgohbU=
=c8aD
-----END PGP SIGNATURE-----

pinheadmz's public key is on openpgp.org

Copy link
Contributor

@sedited sedited left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ACK f5eca8d

I have fuzzed the thread pool for some time again, and did not find any new issues. Also the previously observed memory leak during fuzzing was not observed anymore. The changes to the http workers look sane to me.

Can you run the commits through clang-format-diff.py? There are a bunch of formatting issues in the first two commits.

@furszy furszy force-pushed the 2025_threadpool_http_server branch from f5eca8d to 435e8b5 Compare October 30, 2025 01:21
@furszy
Copy link
Member Author

furszy commented Oct 30, 2025

Can you run the commits through clang-format-diff.py? There are a bunch of formatting issues in the first two commits.

Pushed.

Copy link
Contributor

@l0rinc l0rinc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have started reviewing this PR but have only finished the first commit (e1eb4cd).

The depth of tests gives me hope that this transition will be smooth, the tests cover most of the functionality, even a few corner cases I haven't thought of!

It seemed, however, that I had more to say about that than anticipated. I didn't even get to reviewing the threadpool properly, just most of the tests.

I know receiving this amount of feedback can be daunting, but we both want this to succeed. I have spent a lot of time meticulously going through the details. I hope you will take it as I meant it: to make absolutely sure this won't cause any problems and that our test suite is rock solid.
I will continue the review, but wanted to make sure you're aware of the progress and thought we can synchronize more often this way (pun intended).

In general I think adding a threadpool can untangle complicated code as such, but we have to find a balance between IO and CPU bound tasks. I found that oversubscription is usually a smaller problem, so we can likely solve the IO/CPU contention problem by creating dedicated ThreadPools for each major task (http, script verification, input fetcher, compaction, etc). This way it won't be a general resource guardian (which can be a ThreadPool's main purpose), just a tool to avoid the overhead of starting/stopping threads.

As hinted in the original thread, I find the current structure to be harder to follow, I would appreciate if you would consider doing the extraction in smaller steps:

  • first step would be to use the new ThreadPool's structure, but extract the old logic into it;
  • add the tests (unit and fuzz, no need to split it) against the old impl in a separate commit;
  • in a last step swap it out with the new logic, keeping the same structure and tests.

This way we could debug and understand the old code before we jump into the new one - and the tests would guide us in guaranteeing that the behavior stayed basically the same.
I understand that's extra work, but I'm of course willing to help in whatever you need to be able to do a less risky transition.

I glanced quickly to the actual ThreadPool implementation, looks okay, but I want to investigate as part of my next wave of reviews why we need so much locking here and why we're relying on old primitives here instead of the C++20 concurrency tools (such as latches and barriers) and whether we can create and destroy the pool via RAII instead of manual start/stops.

Note that I have done an IBD before and after this change to see if everything was still working and I didn't see any regression!

here are all the changes I did locally while reviewing the change
diff --git a/src/test/threadpool_tests.cpp b/src/test/threadpool_tests.cpp
index e8200533cd..052784db37 100644
--- a/src/test/threadpool_tests.cpp
+++ b/src/test/threadpool_tests.cpp
@@ -2,270 +2,208 @@
 // Distributed under the MIT software license, see the accompanying
 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
 
+#include <common/system.h>
+#include <test/util/setup_common.h>
 #include <util/string.h>
 #include <util/threadpool.h>
+#include <util/time.h>
 
 #include <boost/test/unit_test.hpp>
 
-BOOST_AUTO_TEST_SUITE(threadpool_tests)
+#include <latch>
+#include <semaphore>
 
-constexpr auto TIMEOUT_SECS = std::chrono::seconds(120);
+using namespace std::chrono;
+constexpr auto TIMEOUT = seconds(120);
 
-template <typename T>
-void WaitFor(const std::vector<std::future<T>>& futures, const std::string& context)
+void WaitFor(std::span<const std::future<void>> futures)
 {
-    for (size_t i = 0; i < futures.size(); ++i) {
-        if (futures[i].wait_for(TIMEOUT_SECS) != std::future_status::ready) {
-            throw std::runtime_error("Timeout waiting for: " + context + ", task index " + util::ToString(i));
-        }
+    for (const auto& f : futures) {
+        BOOST_REQUIRE(f.wait_for(TIMEOUT) == std::future_status::ready);
     }
 }
 
 // Block a number of worker threads by submitting tasks that wait on `blocker_future`.
 // Returns the futures of the blocking tasks, ensuring all have started and are waiting.
-std::vector<std::future<void>> BlockWorkers(ThreadPool& threadPool, std::shared_future<void>& blocker_future, int num_of_threads_to_block, const std::string& context)
+std::vector<std::future<void>> BlockWorkers(ThreadPool& threadPool, std::counting_semaphore<>& release_sem, size_t num_of_threads_to_block)
 {
-    // Per-thread ready promises to ensure all workers are actually blocked
-    std::vector<std::promise<void>> ready_promises(num_of_threads_to_block);
-    std::vector<std::future<void>> ready_futures;
-    ready_futures.reserve(num_of_threads_to_block);
-    for (auto& p : ready_promises) ready_futures.emplace_back(p.get_future());
-
-    // Fill all workers with blocking tasks
-    std::vector<std::future<void>> blocking_tasks;
-    for (int i = 0; i < num_of_threads_to_block; i++) {
-        std::promise<void>& ready = ready_promises[i];
-        blocking_tasks.emplace_back(threadPool.Submit([blocker_future, &ready]() {
-            ready.set_value();
-            blocker_future.wait();
-        }));
-    }
+    assert(threadPool.WorkersCount() >= num_of_threads_to_block);
+    std::latch ready{std::ptrdiff_t(num_of_threads_to_block)};
 
-    // Wait until all threads are actually blocked
-    WaitFor(ready_futures, context);
+    std::vector<std::future<void>> blocking_tasks(num_of_threads_to_block);
+    for (auto& f : blocking_tasks) f = threadPool.Submit([&] {
+        ready.count_down();
+        release_sem.acquire();
+    });
+
+    ready.wait();
     return blocking_tasks;
 }
 
-BOOST_AUTO_TEST_CASE(threadpool_basic)
+BOOST_FIXTURE_TEST_SUITE(threadpool_tests, BasicTestingSetup)
+
+const size_t NUM_WORKERS_DEFAULT{size_t(GetNumCores()) + 1}; // we need to make sure there's *some* contention
+
+BOOST_AUTO_TEST_CASE(submit_to_non_started_pool_throws)
 {
-    // Test Cases
-    // 0) Submit task to a non-started pool.
-    // 1) Submit tasks and verify completion.
-    // 2) Maintain all threads busy except one.
-    // 3) Wait for work to finish.
-    // 4) Wait for result object.
-    // 5) The task throws an exception, catch must be done in the consumer side.
-    // 6) Busy workers, help them by processing tasks from outside.
-    // 7) Recursive submission of tasks.
-    // 8) Submit task when all threads are busy, stop pool and verify the task gets executed.
-
-    const int NUM_WORKERS_DEFAULT = 3;
-    const std::string POOL_NAME = "test";
-
-    // Test case 0, submit task to a non-started pool
-    {
-        ThreadPool threadPool(POOL_NAME);
-        bool err = false;
-        try {
-            threadPool.Submit([]() { return false; });
-        } catch (const std::runtime_error&) {
-            err = true;
-        }
-        BOOST_CHECK(err);
+    ThreadPool threadPool{"not_started"};
+    BOOST_CHECK_EXCEPTION(threadPool.Submit([] { return 0; }), std::runtime_error, HasReason{"No active workers"});
+}
+
+BOOST_AUTO_TEST_CASE(submit_and_verify_completion)
+{
+    ThreadPool threadPool{"completion"};
+    threadPool.Start(NUM_WORKERS_DEFAULT);
+
+    const auto num_tasks{1 + m_rng.randrange<size_t>(50)};
+    std::atomic_size_t counter{0};
+
+    std::vector<std::future<void>> futures(num_tasks);
+    for (size_t i{0}; i < num_tasks; ++i) {
+        futures[i] = threadPool.Submit([&counter, i] { counter.fetch_add(i, std::memory_order_relaxed); });
     }
 
-    // Test case 1, submit tasks and verify completion.
-    {
-        int num_tasks = 50;
+    WaitFor(futures);
+    BOOST_CHECK_EQUAL(counter.load(std::memory_order_relaxed), (num_tasks - 1) * num_tasks / 2);
+    BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
+}
 
-        ThreadPool threadPool(POOL_NAME);
-        threadPool.Start(NUM_WORKERS_DEFAULT);
-        std::atomic<int> counter = 0;
+BOOST_AUTO_TEST_CASE(limited_free_workers_processes_all_task)
+{
+    ThreadPool threadPool{"block_counts"};
+    threadPool.Start(NUM_WORKERS_DEFAULT);
+    const auto num_tasks{1 + m_rng.randrange<size_t>(20)};
 
-        // Store futures to ensure completion before checking counter.
-        std::vector<std::future<void>> futures;
-        futures.reserve(num_tasks);
+    for (size_t free{1}; free < NUM_WORKERS_DEFAULT; ++free) {
+        BOOST_TEST_MESSAGE("Testing with " << free << " available workers");
+        std::counting_semaphore sem{0};
+        const auto blocking_tasks{BlockWorkers(threadPool, sem, free)};
 
-        for (int i = 1; i <= num_tasks; i++) {
-            futures.emplace_back(threadPool.Submit([&counter, i]() {
-                counter.fetch_add(i);
-            }));
-        }
+        size_t counter{0};
+        std::vector<std::future<void>> futures(num_tasks);
+        for (auto& f : futures) f = threadPool.Submit([&counter] { ++counter; });
 
-        // Wait for all tasks to finish
-        WaitFor(futures, /*context=*/"test1 task");
-        int expected_value = (num_tasks * (num_tasks + 1)) / 2; // Gauss sum.
-        BOOST_CHECK_EQUAL(counter.load(), expected_value);
+        WaitFor(futures);
         BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
-    }
 
-    // Test case 2, maintain all threads busy except one.
-    {
-        ThreadPool threadPool(POOL_NAME);
-        threadPool.Start(NUM_WORKERS_DEFAULT);
-        // Single blocking future for all threads
-        std::promise<void> blocker;
-        std::shared_future<void> blocker_future(blocker.get_future());
-        const auto& blocking_tasks = BlockWorkers(threadPool, blocker_future, NUM_WORKERS_DEFAULT - 1, /*context=*/"test2 blocking tasks enabled");
-
-        // Now execute tasks on the single available worker
-        // and check that all the tasks are executed.
-        int num_tasks = 15;
-        int counter = 0;
-
-        // Store futures to wait on
-        std::vector<std::future<void>> futures;
-        futures.reserve(num_tasks);
-        for (int i = 0; i < num_tasks; i++) {
-            futures.emplace_back(threadPool.Submit([&counter]() {
-                counter += 1;
-            }));
+        if (free == 1) {
+            BOOST_CHECK_EQUAL(counter, num_tasks);
+        } else {
+            BOOST_CHECK_LE(counter, num_tasks); // unsynchronized update from multiple threads doesn't guarantee consistency
         }
 
-        WaitFor(futures, /*context=*/"test2 tasks");
-        BOOST_CHECK_EQUAL(counter, num_tasks);
-
-        blocker.set_value();
-        WaitFor(blocking_tasks, /*context=*/"test2 blocking tasks disabled");
-        threadPool.Stop();
-        BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
+        sem.release(free);
+        WaitFor(blocking_tasks);
     }
 
-    // Test case 3, wait for work to finish.
-    {
-        ThreadPool threadPool(POOL_NAME);
-        threadPool.Start(NUM_WORKERS_DEFAULT);
-        std::atomic<bool> flag = false;
-        std::future<void> future = threadPool.Submit([&flag]() {
-            std::this_thread::sleep_for(std::chrono::milliseconds{200});
-            flag.store(true);
-        });
-        future.wait();
-        BOOST_CHECK(flag.load());
-    }
+    threadPool.Stop();
+}
 
-    // Test case 4, obtain result object.
-    {
-        ThreadPool threadPool(POOL_NAME);
-        threadPool.Start(NUM_WORKERS_DEFAULT);
-        std::future<bool> future_bool = threadPool.Submit([]() {
-            return true;
-        });
-        BOOST_CHECK(future_bool.get());
+BOOST_AUTO_TEST_CASE(future_wait_blocks_until_task_completes)
+{
+    ThreadPool threadPool{"wait_test"};
+    threadPool.Start(NUM_WORKERS_DEFAULT);
+    const auto num_tasks{1 + m_rng.randrange<size_t>(20)};
 
-        std::future<std::string> future_str = threadPool.Submit([]() {
-            return std::string("true");
-        });
-        std::string result = future_str.get();
-        BOOST_CHECK_EQUAL(result, "true");
+    const auto start{steady_clock::now()};
+
+    std::vector<std::future<void>> futures(num_tasks + 1);
+    for (size_t i{0}; i <= num_tasks; ++i) {
+        futures[i] = threadPool.Submit([i] { UninterruptibleSleep(milliseconds{i}); });
     }
+    WaitFor(futures);
 
-    // Test case 5, throw exception and catch it on the consumer side.
-    {
-        ThreadPool threadPool(POOL_NAME);
-        threadPool.Start(NUM_WORKERS_DEFAULT);
-
-        int ROUNDS = 5;
-        std::string err_msg{"something wrong happened"};
-        std::vector<std::future<void>> futures;
-        futures.reserve(ROUNDS);
-        for (int i = 0; i < ROUNDS; i++) {
-            futures.emplace_back(threadPool.Submit([err_msg, i]() {
-                throw std::runtime_error(err_msg + util::ToString(i));
-            }));
-        }
+    const size_t elapsed_ms{size_t(duration_cast<milliseconds>(steady_clock::now() - start).count())};
+    BOOST_CHECK(elapsed_ms >= num_tasks);
+}
 
-        for (int i = 0; i < ROUNDS; i++) {
-            try {
-                futures.at(i).get();
-                BOOST_FAIL("Expected exception not thrown");
-            } catch (const std::runtime_error& e) {
-                BOOST_CHECK_EQUAL(e.what(), err_msg + util::ToString(i));
-            }
-        }
-    }
+BOOST_AUTO_TEST_CASE(future_get_returns_task_result)
+{
+    ThreadPool threadPool{"result_test"};
+    threadPool.Start(NUM_WORKERS_DEFAULT);
 
-    // Test case 6, all workers are busy, help them by processing tasks from outside.
-    {
-        ThreadPool threadPool(POOL_NAME);
-        threadPool.Start(NUM_WORKERS_DEFAULT);
-
-        std::promise<void> blocker;
-        std::shared_future<void> blocker_future(blocker.get_future());
-        const auto& blocking_tasks = BlockWorkers(threadPool, blocker_future, NUM_WORKERS_DEFAULT, /*context=*/"test6 blocking tasks enabled");
-
-        // Now submit tasks and check that none of them are executed.
-        int num_tasks = 20;
-        std::atomic<int> counter = 0;
-        for (int i = 0; i < num_tasks; i++) {
-            threadPool.Submit([&counter]() {
-                counter.fetch_add(1);
-            });
-        }
-        std::this_thread::sleep_for(std::chrono::milliseconds{100});
-        BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 20);
+    BOOST_CHECK_EQUAL(threadPool.Submit([] { return true; }).get(), true);
+    BOOST_CHECK_EQUAL(threadPool.Submit([] { return 42; }).get(), 42);
+    BOOST_CHECK_EQUAL(threadPool.Submit([] { return std::string{"true"}; }).get(), "true");
+}
 
-        // Now process manually
-        for (int i = 0; i < num_tasks; i++) {
-            threadPool.ProcessTask();
-        }
-        BOOST_CHECK_EQUAL(counter.load(), num_tasks);
-        BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
-        blocker.set_value();
-        threadPool.Stop();
-        WaitFor(blocking_tasks, "Failure waiting for test6 blocking task futures");
+BOOST_AUTO_TEST_CASE(task_exception_propagated_to_future)
+{
+    ThreadPool threadPool{"exception_test"};
+    threadPool.Start(NUM_WORKERS_DEFAULT);
+
+    const auto err{[&](size_t n) { return strprintf("error on thread #%s", n); }};
+
+    const auto num_tasks{1 + m_rng.randrange<size_t>(20)};
+    for (size_t i{0}; i < num_tasks; ++i) {
+        BOOST_CHECK_EXCEPTION(threadPool.Submit([&] { throw std::runtime_error(err(i)); }).get(), std::runtime_error, HasReason{err(i)});
     }
+}
+
+BOOST_AUTO_TEST_CASE(process_task_manually_when_workers_busy)
+{
+    ThreadPool threadPool{"manual_process"};
+    threadPool.Start(NUM_WORKERS_DEFAULT);
+    const auto num_tasks{1 + m_rng.randrange<size_t>(20)};
 
-    // Test case 7, recursive submission of tasks.
-    {
-        ThreadPool threadPool(POOL_NAME);
-        threadPool.Start(NUM_WORKERS_DEFAULT);
+    std::counting_semaphore sem{0};
+    const auto blocking_tasks{BlockWorkers(threadPool, sem, NUM_WORKERS_DEFAULT)};
 
-        std::promise<void> signal;
-        threadPool.Submit([&]() {
-            threadPool.Submit([&]() {
-                signal.set_value();
-            });
-        });
+    std::atomic_size_t counter{0};
+    std::vector<std::future<void>> futures(num_tasks);
+    for (auto& f : futures) f = threadPool.Submit([&counter] { counter.fetch_add(1, std::memory_order_relaxed); });
 
-        signal.get_future().wait();
-        threadPool.Stop();
-    }
+    UninterruptibleSleep(milliseconds{100});
+    BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), num_tasks);
 
-    // Test case 8, submit a task when all threads are busy and then stop the pool.
-    {
-        ThreadPool threadPool(POOL_NAME);
-        threadPool.Start(NUM_WORKERS_DEFAULT);
+    for (size_t i{0}; i < num_tasks; ++i) {
+        threadPool.ProcessTask();
+    }
 
-        std::promise<void> blocker;
-        std::shared_future<void> blocker_future(blocker.get_future());
-        const auto& blocking_tasks = BlockWorkers(threadPool, blocker_future, NUM_WORKERS_DEFAULT, /*context=*/"test8 blocking tasks enabled");
+    BOOST_CHECK_EQUAL(counter.load(), num_tasks);
+    BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
 
-        // Submit an extra task that should execute once a worker is free
-        std::future<bool> future = threadPool.Submit([]() { return true; });
+    sem.release(NUM_WORKERS_DEFAULT);
+    WaitFor(blocking_tasks);
+}
 
-        // At this point, all workers are blocked, and the extra task is queued
-        BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 1);
+BOOST_AUTO_TEST_CASE(recursive_task_submission)
+{
+    ThreadPool threadPool{"recursive"};
+    threadPool.Start(NUM_WORKERS_DEFAULT);
 
-        // Wait a short moment before unblocking the threads to mimic a concurrent shutdown
-        std::thread thread_unblocker([&blocker]() {
-            std::this_thread::sleep_for(std::chrono::milliseconds{300});
-            blocker.set_value();
+    std::promise<void> signal;
+    threadPool.Submit([&threadPool, &signal] {
+        threadPool.Submit([&signal] {
+            signal.set_value();
         });
+    });
 
-        // Stop the pool while the workers are still blocked
-        threadPool.Stop();
+    signal.get_future().wait();
+}
 
-        // Expect the submitted task to complete
-        BOOST_CHECK(future.get());
-        thread_unblocker.join();
+BOOST_AUTO_TEST_CASE(stop_completes_queued_tasks_gracefully)
+{
+    ThreadPool threadPool{"graceful_stop"};
+    threadPool.Start(NUM_WORKERS_DEFAULT);
 
-        // Obviously all the previously blocking tasks should be completed at this point too
-        WaitFor(blocking_tasks, "Failure waiting for test8 blocking task futures");
+    std::counting_semaphore sem{0};
+    const auto blocking_tasks{BlockWorkers(threadPool, sem, NUM_WORKERS_DEFAULT)};
 
-        // Pool should be stopped and no workers remaining
-        BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
-    }
+    auto future{threadPool.Submit([] { return true; })};
+    BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 1);
+
+    std::thread thread_unblocker{[&sem] {
+        std::this_thread::sleep_for(milliseconds{300});
+        sem.release(NUM_WORKERS_DEFAULT);
+    }};
+
+    threadPool.Stop();
+
+    BOOST_CHECK(future.get());
+    thread_unblocker.join();
+    WaitFor(blocking_tasks);
+    BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
 }
 
 BOOST_AUTO_TEST_SUITE_END()
diff --git a/src/util/threadpool.h b/src/util/threadpool.h
index 5d9884086e..c89fda37c2 100644
--- a/src/util/threadpool.h
+++ b/src/util/threadpool.h
@@ -24,6 +24,8 @@
 #include <utility>
 #include <vector>
 
+#include <tinyformat.h>
+
 /**
  * @brief Fixed-size thread pool for running arbitrary tasks concurrently.
  *
@@ -62,16 +64,9 @@ private:
         for (;;) {
             std::packaged_task<void()> task;
             {
-                // Wait only if needed; avoid sleeping when a new task was submitted while we were processing another one.
-                if (!m_interrupt && m_work_queue.empty()) {
-                    // Block until the pool is interrupted or a task is available.
-                    m_cv.wait(wait_lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { return m_interrupt || !m_work_queue.empty(); });
-                }
-
-                // If stopped and no work left, exit worker
-                if (m_interrupt && m_work_queue.empty()) {
-                    return;
-                }
+                // Block until the pool is interrupted or a task is available.
+                m_cv.wait(wait_lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { return m_interrupt || !m_work_queue.empty(); });
+                if (m_interrupt && m_work_queue.empty()) return;
 
                 task = std::move(m_work_queue.front());
                 m_work_queue.pop();
@@ -101,17 +96,16 @@ public:
      *
      * Must be called from a controller (non-worker) thread.
      */
-    void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
+    void Start(size_t num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     {
         assert(num_workers > 0);
         LOCK(m_mutex);
         if (!m_workers.empty()) throw std::runtime_error("Thread pool already started");
         m_interrupt = false; // Reset
 
-        // Create workers
         m_workers.reserve(num_workers);
-        for (int i = 0; i < num_workers; i++) {
-            m_workers.emplace_back(&util::TraceThread, m_name + "_pool_" + util::ToString(i), [this] { WorkerThread(); });
+        for (size_t i{0}; i < num_workers; i++) {
+            m_workers.emplace_back(&util::TraceThread, strprintf("%s_pool_%d", m_name, i), [this] { WorkerThread(); });
         }
     }
 
@@ -179,12 +173,6 @@ public:
         task();
     }
 
-    void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
-    {
-        WITH_LOCK(m_mutex, m_interrupt = true);
-        m_cv.notify_all();
-    }
-
     size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     {
         return WITH_LOCK(m_mutex, return m_work_queue.size());


static void setup_threadpool_test()
{
// Disable logging entirely. It seems to cause memory leaks.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we investigate that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a known issue. The pcp fuzz test does it too for the same reason (in an undocumented manner). I only document it properly so we don't forget this exists. Feel free to investigate it in a separate issue/PR. I'm not planning to do it. It is not an issue of the code introduced in this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

k, thanks, please resolve the comment

// Create workers
m_workers.reserve(num_workers);
for (int i = 0; i < num_workers; i++) {
m_workers.emplace_back(&util::TraceThread, m_name + "_pool_" + util::ToString(i), [this] { WorkerThread(); });
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we could use strprintf in a few more places:

Suggested change
m_workers.emplace_back(&util::TraceThread, m_name + "_pool_" + util::ToString(i), [this] { WorkerThread(); });
m_workers.emplace_back(&util::TraceThread, strprintf("%s_pool_%d", m_name, i), [this] { WorkerThread(); });

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about including extra dependencies on a low-level class for tiny readability improvements. But I'm not really opposed to this one, so done as suggested.

void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
// Notify workers and join them.
std::vector<std::thread> threads_to_join;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is necessarry

ThreadPool threadPool(POOL_NAME);
threadPool.Start(NUM_WORKERS_DEFAULT);

int ROUNDS = 5;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in other cases we called this num_tasks

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done as suggested.


int ROUNDS = 5;
std::string err_msg{"something wrong happened"};
std::vector<std::future<void>> futures;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need a vector here of can we just consume whatever we just submitted?

BOOST_AUTO_TEST_CASE(task_exception_propagated_to_future)
{
    ThreadPool threadPool{"exception_test"};
    threadPool.Start(NUM_WORKERS_DEFAULT);

    const auto make_err{[&](size_t n) { return strprintf("error on thread #%s", n); }};

    const auto num_tasks{1 + m_rng.randrange<size_t>(20)};
    for (size_t i{0}; i < num_tasks; ++i) {
        BOOST_CHECK_EXCEPTION(threadPool.Submit([&] { throw std::runtime_error(make_err(i)); }).get(), std::runtime_error, HasReason{make_err(i)});
    }
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need a vector here of can we just consume whatever we just submitted?

Consuming right away would wait for the task to be executed; we want to exercise some concurrency too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we want to exercise some concurrency too

You're right, that's better indeed.
My suggestion updated to keep the vector

// Test 5, throw exceptions and catch it on the consumer side
BOOST_AUTO_TEST_CASE(task_exception_propagates_to_future)
{
    ThreadPool threadPool("exception_test");
    threadPool.Start(NUM_WORKERS_DEFAULT);

    const auto make_err{[&](size_t n) { return strprintf("error on thread #%s", n); }};

    const auto num_tasks{1 + m_rng.randrange<size_t>(20)};
    std::vector<std::future<void>> futures(num_tasks);
    for (size_t i{0}; i < num_tasks; ++i) {
        futures[i] = threadPool.Submit([&make_err, i] { throw std::runtime_error(make_err(i)); });
    }

    for (size_t i{0}; i < num_tasks; ++i) {
        BOOST_CHECK_EXCEPTION(futures[i].get(), std::runtime_error, HasReason{make_err(i)});
    }
}

}
}

// Test case 6, all workers are busy, help them by processing tasks from outside.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can split out the remaining 3 tests as well:

BOOST_AUTO_TEST_CASE(process_task_manually_when_workers_busy)
{
    ThreadPool threadPool{"manual_process"};
    threadPool.Start(NUM_WORKERS_DEFAULT);
    const auto num_tasks{1 + m_rng.randrange<size_t>(20)};

    std::counting_semaphore sem{0};
    const auto blocking_tasks{BlockWorkers(threadPool, sem, NUM_WORKERS_DEFAULT)};

    std::atomic_size_t counter{0};
    std::vector<std::future<void>> futures(num_tasks);
    for (auto& f : futures) f = threadPool.Submit([&counter] { counter.fetch_add(1, std::memory_order_relaxed); });

    UninterruptibleSleep(milliseconds{100});
    BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), num_tasks);

    for (size_t i{0}; i < num_tasks; ++i) {
        threadPool.ProcessTask();
    }

    BOOST_CHECK_EQUAL(counter.load(), num_tasks);
    BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);

    sem.release(NUM_WORKERS_DEFAULT);
    WaitFor(blocking_tasks);
}

BOOST_AUTO_TEST_CASE(recursive_task_submission)
{
    ThreadPool threadPool{"recursive"};
    threadPool.Start(NUM_WORKERS_DEFAULT);

    std::promise<void> signal;
    threadPool.Submit([&threadPool, &signal] {
        threadPool.Submit([&signal] {
            signal.set_value();
        });
    });

    signal.get_future().wait();
}

BOOST_AUTO_TEST_CASE(stop_completes_queued_tasks_gracefully)
{
    ThreadPool threadPool{"graceful_stop"};
    threadPool.Start(NUM_WORKERS_DEFAULT);

    std::counting_semaphore sem{0};
    const auto blocking_tasks{BlockWorkers(threadPool, sem, NUM_WORKERS_DEFAULT)};

    auto future{threadPool.Submit([] { return true; })};
    BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 1);

    std::thread thread_unblocker{[&sem] {
        std::this_thread::sleep_for(milliseconds{300});
        sem.release(NUM_WORKERS_DEFAULT);
    }};

    threadPool.Stop();

    BOOST_CHECK(future.get());
    thread_unblocker.join();
    WaitFor(blocking_tasks);
    BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Splitted.

static const size_t MAX_HEADERS_SIZE = 8192;

/** HTTP request work item */
class HTTPWorkItem final : public HTTPClosure
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to do this final threadpool migration in smaller steps?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you wrote this first and then answered yourself in your final comment; which is basically a long version of the PR's "Note 2" description.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not exactly sure what you mean by that - Note 2 does seem like a good idea to me and it's probably similar to what I meant. It would help the review process to gain more confidence in the implementation if we migrated away in smaller steps, documenting how the tests we add for the old implementation also pass when we switch over to the new implementation.

Copy link
Member Author

@furszy furszy Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine the problem is that you haven't compared the current http code with the ThreadPool code, which is pretty much the same but properly documented and test covered. That's one of the reasons behind the not modernization of the underlying sync mechanism in this PR, and why I added the "Note 2" paragraph as well as wrote #33689 (comment).

Look at the current WorkQueue:

void Run() EXCLUSIVE_LOCKS_REQUIRED(!cs)
    {
        while (true) {
            std::unique_ptr<WorkItem> i;
            {
                WAIT_LOCK(cs, lock);
                while (running && queue.empty())
                    cond.wait(lock);
                if (!running && queue.empty())
                    break;
                i = std::move(queue.front());
                queue.pop_front();
            }
            (*i)();
        }
    }

And this is the ThreadPool (stripping all comments)

void WorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    {
        WAIT_LOCK(m_mutex, wait_lock);
        for (;;) {
            std::packaged_task<void()> task;
            {
                if (!m_interrupt && m_work_queue.empty()) {
                    m_cv.wait(wait_lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { return m_interrupt || !m_work_queue.empty(); });
                }

                if (m_interrupt && m_work_queue.empty()) return;
               
                task = std::move(m_work_queue.front());
                m_work_queue.pop();
            }

            REVERSE_LOCK(wait_lock, m_mutex);
            task();
        }
    }

In any case, since most reviewers seem ok to proceed with the current approach, I think it’s more productive to keep working on it rather than continue circling around this topic, even if it’s not to your taste.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a big change, other reviewers can just review the unified view if they don't want smaller commits, but I currently cannot view this in small steps, so I insist that we should split it into smaller changes. I want to help, that's why I'm spending so much time with the details, I think this is a really risky change, I want to make sure it's correct. Let me know how I can help.

@marcofleon
Copy link
Contributor

marcofleon commented Oct 30, 2025

I've been playing around with the fuzz test and wanted to update here, even if it's still a bit inconclusive.

I'm running into non-reproducible timeouts when using fork with libFuzzer. I'm overusing my cores by a lot (fork=25 plus the 3 workers each), and I'm not yet sure if these timeouts would happen eventually when using less cores.

I have "fixed" this by changing notify_one in Submit to notify_all, although I can't say I'm exactly sure why this works. It almost seems like some of the workers aren't being woken up at all when only one is being notified. Maybe the same one somehow ends up being reused over and over, so it gets stuck waiting? But then with notify_all all idle workers are woken up with every task, so the thread pool is able to handle the cpu overload more effectively.

edit: This should be reproducible in the sense that if you run with fork close to the number of total cores on your machine and set -timeout=30 you should get timeouts in less than an hour of fuzzing.

Copy link
Contributor

@Eunovo Eunovo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concept ACK 435e8b5

The PR looks good already, but I think we can block users from calling Threadpool::Start() and Threadpool::Stop() inside Worker threads; We can use a thread local variable to identify worker threads and reject the operation.

* Creates and launches `num_workers` threads that begin executing tasks
* from the queue. If the pool is already started, throws.
*
* Must be called from a controller (non-worker) thread.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e1eb4cd:

Can we enforce this rule using a thread-local variable?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i remember that in the past we had tons of issues with thread-local variables. They've been a pain in the ass on some platforms, and decided to not use them except for one case. Not sure if this is still the case, but if not, this needs to be updated:
https://github.com/bitcoin/bitcoin/blob/master/src/util/threadnames.cpp#L39

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's also our thread_local clang-tidy plugin: https://github.com/bitcoin/bitcoin/tree/master/contrib/devtools/bitcoin-tidy.

I don't see why we would need a thread local variable with a non-trivial desctructor to implement this.

i remember that in the past we had tons of issues with thread-local variables. They've been a pain in the ass on some platforms, and decided to not use them except for one case. Not sure if this is still the case, but if not, this needs to be updated:
https://github.com/bitcoin/bitcoin/blob/master/src/util/threadnames.cpp#L39

I think it would be fine in this case because the use is limited. A simple thread-local boolean and assertion should be enough.

I don't have a strong opinion here; I just think it would be better to programmatically enforce the rule that certain functions should not be called from Worker Threads.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we enforce this rule using a thread-local variable?

As m_workers is guarded, we could enforce it in a simple way:

for (const auto& worker : m_workers) assert(worker.get_id() != std::this_thread::get_id());

@furszy furszy force-pushed the 2025_threadpool_http_server branch from 435e8b5 to adb8911 Compare November 17, 2025 14:58
Copy link
Contributor

@l0rinc l0rinc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a few more comments to help with moving this forward.
I like the new structure more and after the tests and the restructuring are finished, I would like to spend some more time reviewing the ThreadPool as well - but I want to make sure we have a solid foundation first.

BOOST_CHECK_EXCEPTION(threadPool.Submit([]{ return false; }), std::runtime_error, [&](const std::runtime_error& e) {
BOOST_CHECK_EQUAL(e.what(), "No active workers; cannot accept new tasks");
return true;
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Continuing the thread from #33689 (comment):

#include <common/system.h>
+#include <test/util/setup_common.h>
#include <util/string.h>
Suggested change
BOOST_CHECK_EXCEPTION(threadPool.Submit([]{ return false; }), std::runtime_error, [&](const std::runtime_error& e) {
BOOST_CHECK_EQUAL(e.what(), "No active workers; cannot accept new tasks");
return true;
});
BOOST_CHECK_EXCEPTION(threadPool.Submit([]{ return false; }), std::runtime_error, HasReason{"No active workers; cannot accept new tasks"});

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed to fully answer your previous comment related to this change, sorry. I didn't do it to not include setup_common.h, as it comes with way more dependencies than just HasReason().

The idea is to try to reduce large Bitcoin-Core repo dependencies in this low level class, mainly when they do not add much value (this one just let us remove two lines of code with no behavior change), so this class and tests can be easily used elsewhere.
We could move HasReason() to another test utils file too but I don't think it worth the effort.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed in multiple methods here, it would simplify the diff and it provides a higher level primitive instead of repeating and reimplementing what we have already built a helper for.
Not sure why we'd want to reduce dependencies here, what's the advantage of that? I value clean code a lot more, especially for a test that doesn't even have performance requirements?
We can of course move HasReason in a separate PR, but I think we can use it here before that as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why we'd want to reduce dependencies here, what's the advantage of that?

I wrote it above; the idea is to be able to use this low-level class and tests elsewhere, outside the project, just by pulling a few files without dragging in all our unit test framework machinery, which has tons other dependencies.
If we were talking about a substantial improvement, I’d agree with you, but here it’s just a 2-line diff with no behavior change. And for me, that makes the rationale for including it not very convincing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the idea is to be able to use this low-level class and tests elsewhere

I haven't heard that argument before, why would we care about other projects wanting to copy-paste our code?
Let them refactor, but we should write the best code for our project.

but here it’s just a 2-line diff

It's not, please see my remaining suggestions many of which haven't been applied yet.


Also, since we're ignoring the return value here, we should likely cast to void here:

Suggested change
BOOST_CHECK_EXCEPTION(threadPool.Submit([]{ return false; }), std::runtime_error, [&](const std::runtime_error& e) {
BOOST_CHECK_EQUAL(e.what(), "No active workers; cannot accept new tasks");
return true;
});
BOOST_CHECK_EXCEPTION((void)threadPool.Submit([] { return false; }), std::runtime_error, HasReason{"No active workers; cannot accept new tasks"});


for (int i = 1; i <= num_tasks; i++) {
futures.emplace_back(threadPool.Submit([&counter, i]() {
counter.fetch_add(i);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please apply this to the other cases as well?

BOOST_AUTO_TEST_SUITE(threadpool_tests)

// General test values
constexpr int NUM_WORKERS_DEFAULT = 3;
Copy link
Contributor

@l0rinc l0rinc Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Continuing the discussion in #33689 (comment), could we randomize this on the call-site, so that we can exercise cases when we have other than 3 workers (to make sure we don't introduce an accidental bias), i.e. in the test it would be something like:

    const auto num_tasks{1 + m_rng.randrange<size_t>(20)};

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I like the extra coverage, I’m not fully convinced about adding this bit of non-determinism. That’s why I didn’t include it in the last push. It could make reproducing failures trickier, since we’d be adding another dimension to all tests; core counts behave differently across environments.

Probably a good middle ground could be adding a separate test that picks a random worker count and prints the available core count on failure. That gives us some extra coverage without making the other tests harder to debug, and it makes it clear what’s needed to reproduce those specific failures.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not about coverage, but rather avoiding bias that we may introduce by hard-coding arbitrary values.

I prefer a situation that is "hard to reproduce" over something that we won't find because of hard-coded values.

core counts behave differently across environments

Exactly, let's exercise those differences, that's exactly what we want from our tests, to ruthlessly try to break the code, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly, let's exercise those differences, that's exactly what we want from our tests, to ruthlessly try to break the code, right?

We’re going to side-track the discussion a bit, but I’m not sure I completely agree.
Generally, I see unit tests as being meant for correctness, not stress testing. For example, we want to ensure certain behavior is always retained — it’s not about trying to break the code in a non-deterministic way. That’s what fuzz tests are for, where we randomize inputs. I’d also argue that we don’t yet have a proper heavy-load testing framework either.

Still, IIRC, our current fuzzing framework has some limitations; the engine runs the function faster than the OS can release the threads, which causes memory usage to increase non-stop (at least that’s what I remember from a good number of experiments we did a few months ago).

In any case, this is just a general software development sidetrack… sorry, couldn’t contain myself.
We could still have a specific test case for this or dynamically adapt the number of workers if others are happy with that too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could still have a specific test case for this or dynamically adapt the number of workers

How would that solve the problem I highlighted, namely that 3 is a special value that introduces needless bias?
We could theoretically have a bug that only manifests when the worker count equals (or exceeds) the CPU count (but maybe only happens for exceptions), which would never happen with 3 workers, but would sometimes fail correctly if we randomize instead of hard-code magic values.

Comment on lines 14 to 39
template <typename T>
void WaitFor(const std::vector<std::future<T>>& futures, const std::string& context)
{
for (size_t i = 0; i < futures.size(); ++i) {
if (futures[i].wait_for(TIMEOUT_SECS) != std::future_status::ready) {
throw std::runtime_error("Timeout waiting for: " + context + ", task index " + util::ToString(i));
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original failure looks like this:

unknown location:0: fatal error: in "threadpool_tests/submit_tasks_complete_successfully": std::runtime_error: Timeout waiting for: test1 task, task index 0

test/threadpool_tests.cpp:77: last checkpoint: "submit_tasks_complete_successfully" test entry

If you want the extra method to not mask the failure stack, we can make it a macro, like we did with e.g.

#define CHECK_RESULT(result_expression, hss, exp_state, exp_success, exp_request_more, \
exp_headers_size, exp_pow_validated_prev, exp_locator_hash) \
do { \
const auto result{result_expression}; \
BOOST_REQUIRE_EQUAL(hss.GetState(), exp_state); \
BOOST_CHECK_EQUAL(result.success, exp_success); \
BOOST_CHECK_EQUAL(result.request_more, exp_request_more); \
BOOST_CHECK_EQUAL(result.pow_validated_headers.size(), exp_headers_size); \
const std::optional<uint256> pow_validated_prev_opt{exp_pow_validated_prev}; \
if (pow_validated_prev_opt) { \
BOOST_CHECK_EQUAL(result.pow_validated_headers.at(0).hashPrevBlock, pow_validated_prev_opt); \
} else { \
BOOST_CHECK_EQUAL(exp_headers_size, 0); \
} \
const std::optional<uint256> locator_hash_opt{exp_locator_hash}; \
if (locator_hash_opt) { \
BOOST_CHECK_EQUAL(hss.NextHeadersRequestLocator().vHave.at(0), locator_hash_opt); \
} else { \
BOOST_CHECK_EQUAL(exp_state, State::FINAL); \
} \
} while (false)

Which would look like:

#define WAIT_FOR(futures)                                                         \
    do {                                                                          \
        for (const auto& f : futures) {                                           \
            BOOST_REQUIRE(f.wait_for(WAIT_TIMEOUT) == std::future_status::ready); \
        }                                                                         \
    } while (0)

and a failure stack would be more helpful:

test/threadpool_tests.cpp:96: fatal error: in "threadpool_tests/submit_tasks_complete_successfully": critical check f.wait_for(WAIT_TIMEOUT) != std::future_status::ready has failed

which would point to the exact call site instead of the helper method:

WaitFor(futures, /*context=*/"test1 task");

Comment on lines 117 to 123
std::vector<std::future<void>> futures;
futures.reserve(num_tasks);
for (int i = 0; i < num_tasks; i++) {
futures.emplace_back(threadPool.Submit([&counter]() {
counter += 1;
}));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still see init + reserve + loop + emplace in most tests, could we apply to the rest as well?


int ROUNDS = 5;
std::string err_msg{"something wrong happened"};
std::vector<std::future<void>> futures;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we want to exercise some concurrency too

You're right, that's better indeed.
My suggestion updated to keep the vector

// Test 5, throw exceptions and catch it on the consumer side
BOOST_AUTO_TEST_CASE(task_exception_propagates_to_future)
{
    ThreadPool threadPool("exception_test");
    threadPool.Start(NUM_WORKERS_DEFAULT);

    const auto make_err{[&](size_t n) { return strprintf("error on thread #%s", n); }};

    const auto num_tasks{1 + m_rng.randrange<size_t>(20)};
    std::vector<std::future<void>> futures(num_tasks);
    for (size_t i{0}; i < num_tasks; ++i) {
        futures[i] = threadPool.Submit([&make_err, i] { throw std::runtime_error(make_err(i)); });
    }

    for (size_t i{0}; i < num_tasks; ++i) {
        BOOST_CHECK_EXCEPTION(futures[i].get(), std::runtime_error, HasReason{make_err(i)});
    }
}

static const size_t MAX_HEADERS_SIZE = 8192;

/** HTTP request work item */
class HTTPWorkItem final : public HTTPClosure
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not exactly sure what you mean by that - Note 2 does seem like a good idea to me and it's probably similar to what I meant. It would help the review process to gain more confidence in the implementation if we migrated away in smaller steps, documenting how the tests we add for the old implementation also pass when we switch over to the new implementation.

@furszy furszy force-pushed the 2025_threadpool_http_server branch from adb8911 to 2de0ce5 Compare November 18, 2025 19:35
@furszy
Copy link
Member Author

furszy commented Nov 18, 2025

I've been playing around with the fuzz test and wanted to update here, even if it's still a bit inconclusive.

I'm running into non-reproducible timeouts when using fork with libFuzzer. I'm overusing my cores by a lot (fork=25 plus the 3 workers each), and I'm not yet sure if these timeouts would happen eventually when using less cores.

I have "fixed" this by changing notify_one in Submit to notify_all, although I can't say I'm exactly sure why this works. It almost seems like some of the workers aren't being woken up at all when only one is being notified. Maybe the same one somehow ends up being reused over and over, so it gets stuck waiting? But then with notify_all all idle workers are woken up with every task, so the thread pool is able to handle the cpu overload more effectively.

edit: This should be reproducible in the sense that if you run with fork close to the number of total cores on your machine and set -timeout=30 you should get timeouts in less than an hour of fuzzing.

@marcofleon I don’t think this is an issue. You’re just massively oversubscribing the CPU and lowering the timeout to the point where all the context switching triggers it. Switching to notify_all() only forces all workers awake on every submission, which masks the OS scheduler starvation you get in this kind of extreme setup.

Copy link
Contributor

@l0rinc l0rinc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the tests, I have reviewed the ThreadPool as well now - first iteration, have a few questions, I expect a few more rounds of review.
I left some modernization suggestions and fine-grained commit requests to make the change easily reviewable.

I still share many of @andrewtoth's concerns regarding RAII vs Start/Stop/Interrupt and I think we can modernize the pool from std::condition_variable with locks to C++20 std::counting_semaphore. Besides my suggestions below I think we should merge Stop/Interrupt maybe via a bool join = true parameter.

There's also some duplication between WorkerThread and ProcessTask that we can likely avoid (the test are passing, we can either simplify or improve our testing).
There are also a few remaining inconsistencies (I have mentioned a few, repeating a some of them here for visibility).
Similarly to before, I have tracked my suggestions locally, here's the patch to simplify checking my suggestions against your local copy:

Threadpool & tests
diff --git a/src/httpserver.cpp b/src/httpserver.cpp
index 6069062abd..c5033462ac 100644
--- a/src/httpserver.cpp
+++ b/src/httpserver.cpp
@@ -253,7 +253,7 @@ static void http_request_cb(struct evhttp_request* req, void* arg)
     // Dispatch to worker thread
     if (i != iend) {
         if ((int)g_threadpool_http.WorkQueueSize() < g_max_queue_depth) {
-            g_threadpool_http.Submit([req = std::move(hreq), in_path = std::move(path), fn = i->handler]() {
+            (void)g_threadpool_http.Enqueue([req = std::move(hreq), in_path = std::move(path), fn = i->handler] {
                 fn(req.get(), in_path);
             });
         } else {
diff --git a/src/test/threadpool_tests.cpp b/src/test/threadpool_tests.cpp
index 8cbaf89ddf..56e4fc27e0 100644
--- a/src/test/threadpool_tests.cpp
+++ b/src/test/threadpool_tests.cpp
@@ -3,30 +3,18 @@
 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
 
 #include <common/system.h>
-#include <util/string.h>
+#include <test/util/setup_common.h>
 #include <util/threadpool.h>
 #include <util/time.h>
 
 #include <boost/test/unit_test.hpp>
 
-// Test Cases Overview
-// 0) Submit task to a non-started pool.
-// 1) Submit tasks and verify completion.
-// 2) Maintain all threads busy except one.
-// 3) Wait for work to finish.
-// 4) Wait for result object.
-// 5) The task throws an exception, catch must be done in the consumer side.
-// 6) Busy workers, help them by processing tasks externally.
-// 7) Recursive submission of tasks.
-// 8) Submit task when all threads are busy, stop pool and verify task gets executed.
-// 9) Congestion test; create more workers than available cores.
-// 10) Ensure Interrupt() prevents further submissions.
-BOOST_AUTO_TEST_SUITE(threadpool_tests)
-
-// General test values
-constexpr int NUM_WORKERS_DEFAULT = 3;
-constexpr char POOL_NAME[] = "test";
-constexpr auto WAIT_TIMEOUT = 120s;
+#include <latch>
+#include <semaphore>
+
+using namespace std::chrono;
+
+constexpr auto WAIT_TIMEOUT{120s};
 
 #define WAIT_FOR(futures)                                                         \
     do {                                                                          \
@@ -37,259 +25,233 @@ constexpr auto WAIT_TIMEOUT = 120s;
 
 // Block a number of worker threads by submitting tasks that wait on `blocker_future`.
 // Returns the futures of the blocking tasks, ensuring all have started and are waiting.
-std::vector<std::future<void>> BlockWorkers(ThreadPool& threadPool, std::shared_future<void>& blocker_future, int num_of_threads_to_block)
+std::vector<std::future<void>> BlockWorkers(ThreadPool& threadPool, std::counting_semaphore<>& release_sem, size_t num_of_threads_to_block)
 {
-    // Per-thread ready promises to ensure all workers are actually blocked
-    std::vector<std::promise<void>> ready_promises(num_of_threads_to_block);
-    std::vector<std::future<void>> ready_futures;
-    ready_futures.reserve(num_of_threads_to_block);
-    for (auto& p : ready_promises) ready_futures.emplace_back(p.get_future());
-
-    // Fill all workers with blocking tasks
-    std::vector<std::future<void>> blocking_tasks;
-    for (int i = 0; i < num_of_threads_to_block; i++) {
-        std::promise<void>& ready = ready_promises[i];
-        blocking_tasks.emplace_back(threadPool.Submit([blocker_future, &ready]() {
-            ready.set_value();
-            blocker_future.wait();
-        }));
-    }
-
-    // Wait until all threads are actually blocked
-    WAIT_FOR(ready_futures);
+    assert(threadPool.WorkersCount() >= num_of_threads_to_block);
+    std::latch ready{std::ptrdiff_t(num_of_threads_to_block)};
+    std::vector<std::future<void>> blocking_tasks(num_of_threads_to_block);
+    for (auto& f : blocking_tasks) f = threadPool.Enqueue([&] {
+        ready.count_down();
+        release_sem.acquire();
+    });
+    ready.wait();
     return blocking_tasks;
 }
 
-// Test 0, submit task to a non-started pool
+BOOST_FIXTURE_TEST_SUITE(threadpool_tests, BasicTestingSetup)
+
+// Submit task to a non-started pool
 BOOST_AUTO_TEST_CASE(submit_task_before_start_fails)
 {
-    ThreadPool threadPool(POOL_NAME);
-    BOOST_CHECK_EXCEPTION(threadPool.Submit([]{ return false; }), std::runtime_error, [&](const std::runtime_error& e) {
-        BOOST_CHECK_EQUAL(e.what(), "No active workers; cannot accept new tasks");
-        return true;
-    });
+    ThreadPool threadPool{"not_started"};
+    BOOST_CHECK_EXCEPTION((void)threadPool.Enqueue([] { return 0; }), std::runtime_error, HasReason{"No active workers"});
 }
 
-// Test 1, submit tasks and verify completion
+// Submit tasks and verify completion
 BOOST_AUTO_TEST_CASE(submit_tasks_complete_successfully)
 {
-    int num_tasks = 50;
-
-    ThreadPool threadPool(POOL_NAME);
-    threadPool.Start(NUM_WORKERS_DEFAULT);
-    std::atomic<int> counter = 0;
-
-    // Store futures to ensure completion before checking counter.
-    std::vector<std::future<void>> futures;
-    futures.reserve(num_tasks);
-    for (int i = 1; i <= num_tasks; i++) {
-        futures.emplace_back(threadPool.Submit([&counter, i]() {
-            counter.fetch_add(i, std::memory_order_relaxed);
-        }));
+    const auto num_workers{3 + m_rng.randrange<size_t>(5)};
+    ThreadPool threadPool{"completion"};
+    threadPool.Start(num_workers);
+
+    const auto num_tasks{1 + m_rng.randrange<size_t>(50)};
+    std::atomic_size_t counter{0};
+
+    std::vector<std::future<void>> futures(num_tasks);
+    for (size_t i{0}; i < num_tasks; ++i) {
+        futures[i] = threadPool.Enqueue([&counter, i] { counter.fetch_add(i, std::memory_order_relaxed); });
     }
 
-    // Wait for all tasks to finish
     WAIT_FOR(futures);
-    int expected_value = (num_tasks * (num_tasks + 1)) / 2; // Gauss sum.
-    BOOST_CHECK_EQUAL(counter.load(), expected_value);
+    BOOST_CHECK_EQUAL(counter.load(std::memory_order_relaxed), (num_tasks - 1) * num_tasks / 2);
     BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
 }
 
-// Test 2, maintain all threads busy except one
-BOOST_AUTO_TEST_CASE(single_available_worker_executes_all_tasks)
+// Block varying numbers of workers and verify remaining workers process all tasks
+BOOST_AUTO_TEST_CASE(limited_free_workers_processes_all_tasks)
 {
-    ThreadPool threadPool(POOL_NAME);
-    threadPool.Start(NUM_WORKERS_DEFAULT);
-    // Single blocking future for all threads
-    std::promise<void> blocker;
-    std::shared_future<void> blocker_future(blocker.get_future());
-    const auto blocking_tasks = BlockWorkers(threadPool, blocker_future, NUM_WORKERS_DEFAULT - 1);
-
-    // Now execute tasks on the single available worker
-    // and check that all the tasks are executed.
-    int num_tasks = 15;
-    int counter = 0;
-
-    // Store futures to wait on
-    std::vector<std::future<void>> futures(num_tasks);
-    for (auto& f : futures) f = threadPool.Submit([&counter]{ counter++; });
+    const auto num_workers{3 + m_rng.randrange<size_t>(5)};
+    ThreadPool threadPool{"limited_workers"};
+    threadPool.Start(num_workers);
 
-    WAIT_FOR(futures);
-    BOOST_CHECK_EQUAL(counter, num_tasks);
+    const auto num_tasks{5 + m_rng.randrange<size_t>(20)};
+
+    for (size_t free{1}; free < num_workers; ++free) {
+        BOOST_TEST_MESSAGE("Testing with " << free << " available workers");
+        std::counting_semaphore sem{0};
+        const auto blocking_tasks{BlockWorkers(threadPool, sem, num_workers - free)};
+
+        size_t counter{0};
+        std::vector<std::future<void>> futures(num_tasks);
+        for (auto& f : futures) f = threadPool.Enqueue([&counter] { ++counter; });
+
+        WAIT_FOR(futures);
+        BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
+
+        if (free == 1) {
+            BOOST_CHECK_EQUAL(counter, num_tasks);
+        } else {
+            BOOST_CHECK_LE(counter, num_tasks); // unsynchronized update from multiple threads doesn't guarantee consistency
+        }
+
+        sem.release(num_workers - free);
+        WAIT_FOR(blocking_tasks);
+    }
 
-    blocker.set_value();
-    WAIT_FOR(blocking_tasks);
     threadPool.Stop();
     BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
 }
 
-// Test 3, wait for work to finish
+// Wait for work to finish
 BOOST_AUTO_TEST_CASE(wait_for_task_to_finish)
 {
-    ThreadPool threadPool(POOL_NAME);
-    threadPool.Start(NUM_WORKERS_DEFAULT);
-    std::atomic<bool> flag = false;
-    std::future<void> future = threadPool.Submit([&flag]() {
-        UninterruptibleSleep(std::chrono::milliseconds{200});
-        flag.store(true, std::memory_order_release);
-    });
-    BOOST_CHECK(future.wait_for(WAIT_TIMEOUT) == std::future_status::ready);
-    BOOST_CHECK(flag.load(std::memory_order_acquire));
+    const auto num_workers{3 + m_rng.randrange<size_t>(5)};
+    ThreadPool threadPool{"wait_test"};
+    threadPool.Start(num_workers);
+
+    const auto num_tasks{1 + m_rng.randrange<size_t>(10)};
+    const auto start{SteadyClock::now()};
+
+    std::vector<std::future<void>> futures(num_tasks + 1);
+    for (size_t i{0}; i <= num_tasks; ++i) {
+        futures[i] = threadPool.Enqueue([i] { UninterruptibleSleep(milliseconds{i}); });
+    }
+
+    WAIT_FOR(futures);
+    const size_t elapsed_ms{size_t(duration_cast<milliseconds>(steady_clock::now() - start).count())};
+    BOOST_CHECK(elapsed_ms >= num_tasks);
 }
 
-// Test 4, obtain result object
+// Obtain result object
 BOOST_AUTO_TEST_CASE(get_result_from_completed_task)
 {
-    ThreadPool threadPool(POOL_NAME);
-    threadPool.Start(NUM_WORKERS_DEFAULT);
-    std::future<bool> future_bool = threadPool.Submit([]() { return true; });
-    BOOST_CHECK(future_bool.get());
-
-    std::future<std::string> future_str = threadPool.Submit([]() { return std::string("true"); });
-    std::string result = future_str.get();
-    BOOST_CHECK_EQUAL(result, "true");
+    const auto num_workers{3 + m_rng.randrange<size_t>(5)};
+    ThreadPool threadPool{"result_test"};
+    threadPool.Start(num_workers);
+
+    BOOST_CHECK_EQUAL(threadPool.Enqueue([] { return true; }).get(), true);
+    BOOST_CHECK_EQUAL(threadPool.Enqueue([] { return 42; }).get(), 42);
+    BOOST_CHECK_EQUAL(threadPool.Enqueue([] { return std::string{"true"}; }).get(), "true");
 }
 
-// Test 5, throw exception and catch it on the consumer side
+// Throw exception and catch it on the consumer side
 BOOST_AUTO_TEST_CASE(task_exception_propagates_to_future)
 {
-    ThreadPool threadPool(POOL_NAME);
-    threadPool.Start(NUM_WORKERS_DEFAULT);
-
-    int num_tasks = 5;
-    std::string err_msg{"something wrong happened"};
-    std::vector<std::future<void>> futures;
-    futures.reserve(num_tasks);
-    for (int i = 0; i < num_tasks; i++) {
-        futures.emplace_back(threadPool.Submit([err_msg, i]() {
-            throw std::runtime_error(err_msg + util::ToString(i));
-        }));
+    const auto num_workers{3 + m_rng.randrange<size_t>(5)};
+    ThreadPool threadPool{"exception_test"};
+    threadPool.Start(num_workers);
+
+    const auto make_err{[](size_t n) { return strprintf("error on thread #%s", n); }};
+    const auto num_tasks{5 + m_rng.randrange<size_t>(15)};
+
+    std::vector<std::future<void>> futures(num_tasks);
+    for (size_t i{0}; i < num_tasks; ++i) {
+        futures[i] = threadPool.Enqueue([&make_err, i] { throw std::runtime_error(make_err(i)); });
     }
 
-    for (int i = 0; i < num_tasks; i++) {
-        BOOST_CHECK_EXCEPTION(futures.at(i).get(), std::runtime_error, [&](const std::runtime_error& e) {
-            BOOST_CHECK_EQUAL(e.what(), err_msg + util::ToString(i));
-            return true;
-        });
+    for (size_t i{0}; i < num_tasks; ++i) {
+        BOOST_CHECK_EXCEPTION(futures[i].get(), std::runtime_error, HasReason{make_err(i)});
     }
 }
 
-// Test 6, all workers are busy, help them by processing tasks from outside
+// All workers are busy, help them by processing tasks from outside
 BOOST_AUTO_TEST_CASE(process_tasks_manually_when_workers_busy)
 {
-    ThreadPool threadPool(POOL_NAME);
-    threadPool.Start(NUM_WORKERS_DEFAULT);
-
-    std::promise<void> blocker;
-    std::shared_future<void> blocker_future(blocker.get_future());
-    const auto& blocking_tasks = BlockWorkers(threadPool, blocker_future, NUM_WORKERS_DEFAULT);
-
-    // Now submit tasks and check that none of them are executed.
-    int num_tasks = 20;
-    std::atomic<int> counter = 0;
-    for (int i = 0; i < num_tasks; i++) {
-        threadPool.Submit([&counter]() {
-            counter.fetch_add(1, std::memory_order_relaxed);
-        });
-    }
-    UninterruptibleSleep(std::chrono::milliseconds{100});
+    const auto num_workers{3 + m_rng.randrange<size_t>(5)};
+    ThreadPool threadPool{"manual_process"};
+    threadPool.Start(num_workers);
+
+    std::counting_semaphore sem{0};
+    const auto blocking_tasks{BlockWorkers(threadPool, sem, num_workers)};
+
+    const auto num_tasks{10 + m_rng.randrange<size_t>(30)};
+    std::atomic_size_t counter{0};
+
+    std::vector<std::future<void>> futures(num_tasks);
+    for (auto& f : futures) f = threadPool.Enqueue([&counter] { counter.fetch_add(1, std::memory_order_relaxed); });
+
+    UninterruptibleSleep(100ms);
     BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), num_tasks);
 
-    // Now process manually
-    for (int i = 0; i < num_tasks; i++) {
-        threadPool.ProcessTask();
-    }
-    BOOST_CHECK_EQUAL(counter.load(), num_tasks);
+    for (size_t i{0}; i < num_tasks; ++i) threadPool.ProcessTask();
+
+    BOOST_CHECK_EQUAL(counter.load(std::memory_order_relaxed), num_tasks);
     BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
-    blocker.set_value();
+
+    WAIT_FOR(futures);
+
+    sem.release(num_workers);
     threadPool.Stop();
     WAIT_FOR(blocking_tasks);
 }
 
-// Test 7, submit tasks from other tasks
+// Submit tasks from other tasks
 BOOST_AUTO_TEST_CASE(recursive_task_submission)
 {
-    ThreadPool threadPool(POOL_NAME);
-    threadPool.Start(NUM_WORKERS_DEFAULT);
+    const auto num_workers{3 + m_rng.randrange<size_t>(5)};
+    ThreadPool threadPool{"recursive"};
+    threadPool.Start(num_workers);
 
     std::promise<void> signal;
-    threadPool.Submit([&]() {
-        threadPool.Submit([&]() {
-            signal.set_value();
-        });
-    });
+    (void)threadPool.Enqueue([&] { (void)threadPool.Enqueue([&] { signal.set_value(); }); });
 
     signal.get_future().wait();
     threadPool.Stop();
 }
 
-// Test 8, submit task when all threads are busy and then stop the pool
+// Submit task when all threads are busy and then stop the pool
 BOOST_AUTO_TEST_CASE(task_submitted_while_busy_completes)
 {
-    ThreadPool threadPool(POOL_NAME);
-    threadPool.Start(NUM_WORKERS_DEFAULT);
-
-    std::promise<void> blocker;
-    std::shared_future<void> blocker_future(blocker.get_future());
-    const auto& blocking_tasks = BlockWorkers(threadPool, blocker_future, NUM_WORKERS_DEFAULT);
+    const auto num_workers{3 + m_rng.randrange<size_t>(5)};
+    ThreadPool threadPool{"graceful_stop"};
+    threadPool.Start(num_workers);
 
-    // Submit an extra task that should execute once a worker is free
-    std::future<bool> future = threadPool.Submit([]() { return true; });
+    std::counting_semaphore sem{0};
+    const auto blocking_tasks{BlockWorkers(threadPool, sem, num_workers)};
 
-    // At this point, all workers are blocked, and the extra task is queued
+    std::future<bool> future{threadPool.Enqueue([] { return true; })};
     BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 1);
 
-    // Wait a short moment before unblocking the threads to mimic a concurrent shutdown
-    std::thread thread_unblocker([&blocker]() {
-        UninterruptibleSleep(std::chrono::milliseconds{300});
-        blocker.set_value();
-    });
+    std::thread thread_unblocker{[&sem, num_workers] {
+        UninterruptibleSleep(300ms);
+        sem.release(num_workers);
+    }};
 
-    // Stop the pool while the workers are still blocked
     threadPool.Stop();
 
-    // Expect the submitted task to complete
     BOOST_CHECK(future.get());
     thread_unblocker.join();
-
-    // Obviously all the previously blocking tasks should be completed at this point too
     WAIT_FOR(blocking_tasks);
-
-    // Pool should be stopped and no workers remaining
     BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
 }
 
-// Test 9, more workers than available cores (congestion test)
+// More workers than available cores (congestion test)
 BOOST_AUTO_TEST_CASE(congestion_more_workers_than_cores)
 {
-    ThreadPool threadPool(POOL_NAME);
-    threadPool.Start(std::max(1, GetNumCores() * 2)); // Oversubscribe by 2×
-
-    int num_tasks = 200;
-    std::atomic<int> counter{0};
-
-    std::vector<std::future<void>> futures;
-    futures.reserve(num_tasks);
-    for (int i = 0; i < num_tasks; i++) {
-        futures.emplace_back(threadPool.Submit([&counter] {
-            counter.fetch_add(1, std::memory_order_relaxed);
-        }));
+    const auto oversubscribe_factor{2 + m_rng.randrange<int>(3)};
+    ThreadPool threadPool{"congestion"};
+    threadPool.Start(std::max(1, GetNumCores() * oversubscribe_factor));
+
+    const auto num_tasks{100 + m_rng.randrange<size_t>(200)};
+    std::atomic_size_t counter{0};
+
+    std::vector<std::future<void>> futures(num_tasks);
+    for (auto& f : futures) {
+        f = threadPool.Enqueue([&counter] { counter.fetch_add(1, std::memory_order_relaxed); });
     }
 
     WAIT_FOR(futures);
-    BOOST_CHECK_EQUAL(counter.load(), num_tasks);
+    BOOST_CHECK_EQUAL(counter.load(std::memory_order_relaxed), num_tasks);
 }
 
-// Test 10, Interrupt() prevents further submissions
+// Interrupt() prevents further submissions
 BOOST_AUTO_TEST_CASE(interrupt_blocks_new_submissions)
 {
-    ThreadPool threadPool(POOL_NAME);
-    threadPool.Start(NUM_WORKERS_DEFAULT);
+    const auto num_workers{3 + m_rng.randrange<size_t>(5)};
+    ThreadPool threadPool{"interrupt"};
+    threadPool.Start(num_workers);
     threadPool.Interrupt();
-    BOOST_CHECK_EXCEPTION(threadPool.Submit([]{}), std::runtime_error, [&](const std::runtime_error& e) {
-        BOOST_CHECK_EQUAL(e.what(), "No active workers; cannot accept new tasks");
-        return true;
-    });
+    BOOST_CHECK_EXCEPTION((void)threadPool.Enqueue([] {}), std::runtime_error, HasReason{"No active workers"});
 }
 
 BOOST_AUTO_TEST_SUITE_END()
diff --git a/src/util/threadpool.h b/src/util/threadpool.h
index b489e34c2f..f224c81b55 100644
--- a/src/util/threadpool.h
+++ b/src/util/threadpool.h
@@ -8,19 +8,13 @@
 #include <sync.h>
 #include <tinyformat.h>
 #include <util/check.h>
-#include <util/string.h>
 #include <util/thread.h>
-#include <util/threadinterrupt.h>
 
-#include <algorithm>
-#include <atomic>
-#include <condition_variable>
-#include <cstddef>
-#include <functional>
+#include <deque>
 #include <future>
-#include <memory>
-#include <queue>
+#include <semaphore>
 #include <stdexcept>
+#include <string>
 #include <thread>
 #include <utility>
 #include <vector>
@@ -46,53 +40,26 @@
  */
 class ThreadPool
 {
-private:
     std::string m_name;
     Mutex m_mutex;
-    std::queue<std::packaged_task<void()>> m_work_queue GUARDED_BY(m_mutex);
-    std::condition_variable m_cv;
-    // Note: m_interrupt must be guarded by m_mutex, and cannot be replaced by an unguarded atomic bool.
-    // This ensures threads blocked on m_cv reliably observe the change and proceed correctly without missing signals.
-    // Ref: https://en.cppreference.com/w/cpp/thread/condition_variable
+    std::deque<std::packaged_task<void()>> m_work_queue GUARDED_BY(m_mutex);
     bool m_interrupt GUARDED_BY(m_mutex){false};
     std::vector<std::thread> m_workers GUARDED_BY(m_mutex);
 
+    std::counting_semaphore<> m_sem{0};
+
     void WorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     {
-        WAIT_LOCK(m_mutex, wait_lock);
         for (;;) {
-            std::packaged_task<void()> task;
-            {
-                // Wait only if needed; avoid sleeping when a new task was submitted while we were processing another one.
-                if (!m_interrupt && m_work_queue.empty()) {
-                    // Block until the pool is interrupted or a task is available.
-                    m_cv.wait(wait_lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { return m_interrupt || !m_work_queue.empty(); });
-                }
-
-                // If stopped and no work left, exit worker
-                if (m_interrupt && m_work_queue.empty()) {
-                    return;
-                }
-
-                task = std::move(m_work_queue.front());
-                m_work_queue.pop();
-            }
-
-            {
-                // Execute the task without the lock
-                REVERSE_LOCK(wait_lock, m_mutex);
-                task();
-            }
+            m_sem.acquire();
+            if (!ProcessTask()) return;
         }
     }
 
 public:
     explicit ThreadPool(const std::string& name) : m_name(name) {}
 
-    ~ThreadPool()
-    {
-        Stop(); // In case it hasn't been stopped.
-    }
+    ~ThreadPool() { Stop(); }
 
     /**
      * @brief Start worker threads.
@@ -109,7 +76,6 @@ public:
         if (!m_workers.empty()) throw std::runtime_error("Thread pool already started");
         m_interrupt = false; // Reset
 
-        // Create workers
         m_workers.reserve(num_workers);
         for (int i = 0; i < num_workers; i++) {
             m_workers.emplace_back(&util::TraceThread, strprintf("%s_pool_%d", m_name, i), [this] { WorkerThread(); });
@@ -130,14 +96,15 @@ public:
         std::vector<std::thread> threads_to_join;
         {
             LOCK(m_mutex);
-            // Ensure 'Stop()' isn't called from any worker thread to avoid deadlocks
-            auto id = std::this_thread::get_id();
+            // Ensure `Stop()` isn't called from any worker thread to avoid deadlocks
+            const auto id{std::this_thread::get_id()};
             for (const auto& worker : m_workers) assert(worker.get_id() != id);
-            // Early shutdown to return right away on any concurrent 'Submit()' call
+            // Early shutdown to return right away on any concurrent `Submit()` call
             m_interrupt = true;
             threads_to_join.swap(m_workers);
         }
-        m_cv.notify_all();
+        m_sem.release(threads_to_join.size());
+
         for (auto& worker : threads_to_join) worker.join();
         // Since we currently wait for tasks completion, sanity-check empty queue
         WITH_LOCK(m_mutex, Assume(m_work_queue.empty()));
@@ -147,13 +114,13 @@ public:
     /**
      * @brief Enqueues a new task for asynchronous execution.
      *
-     * Returns a `std::future` that provides the task’s result or propagates
+     * Returns a `std::future` that provides the task's result or propagates
      * any exception it throws.
      * Note: Ignoring the returned future requires guarding the task against
      * uncaught exceptions, as they would otherwise be silently discarded.
      */
-    template <class F> EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
-    auto Submit(F&& fn)
+    template <class F>
+    [[nodiscard]] EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) auto Enqueue(F&& fn)
     {
         std::packaged_task task{std::forward<F>(fn)};
         auto future{task.get_future()};
@@ -162,34 +129,34 @@ public:
             if (m_interrupt || m_workers.empty()) {
                 throw std::runtime_error("No active workers; cannot accept new tasks");
             }
-            m_work_queue.emplace(std::move(task));
+            m_work_queue.emplace_back(std::move(task));
         }
-        m_cv.notify_one();
+        m_sem.release();
         return future;
     }
 
     /**
      * @brief Execute a single queued task synchronously.
      * Removes one task from the queue and executes it on the calling thread.
+     * @return true if a task was executed, false if queue was empty
      */
-    void ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
+    bool ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     {
         std::packaged_task<void()> task;
         {
             LOCK(m_mutex);
-            if (m_work_queue.empty()) return;
-
-            // Pop the task
+            if (m_work_queue.empty()) return false;
             task = std::move(m_work_queue.front());
-            m_work_queue.pop();
+            m_work_queue.pop_front();
         }
         task();
+        return true;
     }
 
     void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     {
         WITH_LOCK(m_mutex, m_interrupt = true);
-        m_cv.notify_all();
+        m_sem.release(WorkersCount());
     }
 
     size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)


for (int i = 1; i <= num_tasks; i++) {
futures.emplace_back(threadPool.Submit([&counter, i]() {
counter.fetch_add(i);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Currently all fetch_add calls use memory_order_relaxed, which makes sense since we don't want the counter itself to act as a barrier for other data, but the corresponding loads still use the default std::memory_order_seq_cst.
Since external synchronization (e.g. via WAIT_FOR calls) already provide happens-before relationships, I don't think the memory ordering of load operations should be sequential.
I don't think it's a bug, but would likely be more consistent if we used the same memory_order_relaxed on the loads as well.
To avoid accidental synchronization, can we use memory_order_relaxed throughout (for fetch_add & load)?
What do you think?

BOOST_AUTO_TEST_SUITE(threadpool_tests)

// General test values
constexpr int NUM_WORKERS_DEFAULT = 3;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could still have a specific test case for this or dynamically adapt the number of workers

How would that solve the problem I highlighted, namely that 3 is a special value that introduces needless bias?
We could theoretically have a bug that only manifests when the worker count equals (or exceeds) the CPU count (but maybe only happens for exceptions), which would never happen with 3 workers, but would sometimes fail correctly if we randomize instead of hard-code magic values.

Comment on lines +12 to +23
// Test Cases Overview
// 0) Submit task to a non-started pool.
// 1) Submit tasks and verify completion.
// 2) Maintain all threads busy except one.
// 3) Wait for work to finish.
// 4) Wait for result object.
// 5) The task throws an exception, catch must be done in the consumer side.
// 6) Busy workers, help them by processing tasks externally.
// 7) Recursive submission of tasks.
// 8) Submit task when all threads are busy, stop pool and verify task gets executed.
// 9) Congestion test; create more workers than available cores.
// 10) Ensure Interrupt() prevents further submissions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we already have individual test cases, there's no need to duplicate them here

BOOST_CHECK_EXCEPTION(threadPool.Submit([]{ return false; }), std::runtime_error, [&](const std::runtime_error& e) {
BOOST_CHECK_EQUAL(e.what(), "No active workers; cannot accept new tasks");
return true;
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the idea is to be able to use this low-level class and tests elsewhere

I haven't heard that argument before, why would we care about other projects wanting to copy-paste our code?
Let them refactor, but we should write the best code for our project.

but here it’s just a 2-line diff

It's not, please see my remaining suggestions many of which haven't been applied yet.


Also, since we're ignoring the return value here, we should likely cast to void here:

Suggested change
BOOST_CHECK_EXCEPTION(threadPool.Submit([]{ return false; }), std::runtime_error, [&](const std::runtime_error& e) {
BOOST_CHECK_EQUAL(e.what(), "No active workers; cannot accept new tasks");
return true;
});
BOOST_CHECK_EXCEPTION((void)threadPool.Submit([] { return false; }), std::runtime_error, HasReason{"No active workers; cannot accept new tasks"});

/**
* @brief Enqueues a new task for asynchronous execution.
*
* Returns a `std::future` that provides the task’s result or propagates
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: for consistency with other spelling, e.g.

Stop(); // In case it hasn't been stopped.

Suggested change
* Returns a `std::future` that provides the tasks result or propagates
* Returns a `std::future` that provides the task's result or propagates

Comment on lines +134 to +131
auto id = std::this_thread::get_id();
for (const auto& worker : m_workers) assert(worker.get_id() != id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tests are passing without these lines - can we cover them?

for (auto& worker : threads_to_join) worker.join();
// Since we currently wait for tasks completion, sanity-check empty queue
WITH_LOCK(m_mutex, Assume(m_work_queue.empty()));
// Note: m_interrupt is left true until next Start()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also find it weird that we can restart a ThreadPool - can we add a Start() → Stop() → Start() unit test if that something we want indeed? I understand it may not be trivial, but I would rather we restructure for RAII or some better lifecycle instead.

std::vector<std::thread> threads_to_join;
{
LOCK(m_mutex);
// Ensure 'Stop()' isn't called from any worker thread to avoid deadlocks
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we've used backticks in other similar cases

Suggested change
// Ensure 'Stop()' isn't called from any worker thread to avoid deadlocks
// Ensure `Stop()` isn't called from any worker thread to avoid deadlocks

// Ensure 'Stop()' isn't called from any worker thread to avoid deadlocks
auto id = std::this_thread::get_id();
for (const auto& worker : m_workers) assert(worker.get_id() != id);
// Early shutdown to return right away on any concurrent 'Submit()' call
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same:

Suggested change
// Early shutdown to return right away on any concurrent 'Submit()' call
// Early shutdown to return right away on any concurrent `Submit()` call

static const size_t MAX_HEADERS_SIZE = 8192;

/** HTTP request work item */
class HTTPWorkItem final : public HTTPClosure
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a big change, other reviewers can just review the unified view if they don't want smaller commits, but I currently cannot view this in small steps, so I insist that we should split it into smaller changes. I want to help, that's why I'm spending so much time with the details, I think this is a really risky change, I want to make sure it's correct. Let me know how I can help.

@marcofleon
Copy link
Contributor

I don’t think this is an issue. You’re just massively oversubscribing the CPU and lowering the timeout to the point where all the context switching triggers it. Switching to notify_all() only forces all workers awake on every submission, which masks the OS scheduler starvation you get in this kind of extreme setup.

I've been playing with the fuzz test and found that this fixes it for me:

ThreadPool g_pool{"fuzz"};
size_t g_num_workers = 3;
std::atomic<bool> g_pool_started{false};

static void setup_threadpool_test()
{
    LogInstance().DisableLogging();

}

FUZZ_TARGET(threadpool, .init = setup_threadpool_test)
{
    if (!g_pool_started.exchange(true)) {
        g_pool.Start(g_num_workers);
    }
    
    FuzzedDataProvider fuzzed_data_provider(buffer.data(), buffer.size());

Oversubscribed by a lot and left it running for a long time with -timeout=10 and it's good to go, no timeouts. This is with notify_one as well.

There's two changes here. The first is to start the thread pool inside of the first iteration. This fixes a problem I was having with the LibAFL version of libfuzzer. It would hang immediately because LibAFL calls fork() after calling the init function, and so the child processes would end up only having one thread active (the main calling thread) and no workers.

The second change is just switching from call_once to a global that we check every iteration. This fixes the original non-reproducible timeout issue I had with libFuzzer. I don't really know why, but I'm guessing call_once is more complicated than the simpler check.

Overall, I've learned that multi core fuzzing and multiple threads in a fuzz test don't mix well it seems. It leads to issues that are hard to reproduce and debug. For the purposes of testing the threadpool under heavy load, I think it's fine to use work arounds like this to get it done.

@l0rinc
Copy link
Contributor

l0rinc commented Dec 9, 2025

@furszy, are you still working on this?

@furszy
Copy link
Member Author

furszy commented Dec 9, 2025

@furszy, are you still working on this?

yes.

Copy link
Member

@ismaelsadeeq ismaelsadeeq left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code review ACK 2de0ce5 👾

@ismaelsadeeq
Copy link
Member

The rebase is trivial it seems master...ismaelsadeeq:bitcoin:12-2025-threadpool

furszy and others added 3 commits December 28, 2025 13:07
Replace the HTTP server's WorkQueue implementation and single threads
handling code with ThreadPool for processing HTTP requests. The
ThreadPool class encapsulates all this functionality on a reusable
class, properly unit and fuzz tested (the previous code was not
unit nor fuzz tested at all).

This cleanly separates responsibilities:
The HTTP server now focuses solely on receiving and dispatching requests,
while ThreadPool handles concurrency, queuing, and execution.
It simplifies init, shutdown and requests tracking.

This also allows us to experiment with further performance improvements at
the task queuing and execution level, such as a lock-free structure, task
prioritization or any other performance improvement in the future, without
having to deal with HTTP code that lives on a different layer.
@furszy furszy force-pushed the 2025_threadpool_http_server branch from 2de0ce5 to d504f81 Compare December 28, 2025 18:10
@furszy
Copy link
Member Author

furszy commented Dec 28, 2025

thanks @ismaelsadeeq, rebased. Will tackle the remaining comments and update accordingly soon.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.