Merge bitcoin/bitcoin#33689: http: replace WorkQueue and single threads handling for ThreadPool

38fd85c676a072ebf256e806beda9d7533790baa http: replace WorkQueue and threads handling for ThreadPool (furszy)
c323f882ed3841401edee90ab5261d68215ab316 fuzz: add test case for threadpool (TheCharlatan)
c528dd5f8ccc3955b00bdba869f0a774efa97fe1 util: introduce general purpose thread pool (furszy)
6354b4fd7fe819eb13274b212e426a7d10ca75d3 tests: log node JSON-RPC errors during test setup (furszy)
45930a79412dc45f9d391cd7689d029fa4f0189e http-server: guard against crashes from unhandled exceptions (furszy)

Pull request description:

  This has been a recent discovery; the general thread pool class created for #26966, cleanly
  integrates into the HTTP server. It simplifies init, shutdown and requests execution logic.
  Replacing code that was never unit tested for code that is properly unit and fuzz tested.
  Although our functional test framework extensively uses this RPC interface (that’s how
  we’ve been ensuring its correct behavior so far - which is not the best).

  This clearly separates the responsibilities:
  The HTTP server now focuses solely on receiving and dispatching requests, while ThreadPool handles
  concurrency, queuing, and execution.

  This will also allows us to experiment with further performance improvements at the task queuing and
  execution level, such as a lock-free structure or task prioritization or any other implementation detail
  like coroutines in the future, without having to deal with HTTP code that lives on a different layer.

  Note:
  The rationale behind introducing the ThreadPool first is to be able to easily cherry-pick it across different
  working paths. Some of the ones that are benefited from it are #26966 for the parallelization of the indexes
  initial sync, #31132 for the parallelization of the inputs fetching procedure, #32061 for the libevent replacement,
  the kernel API #30595 (https://github.com/bitcoin/bitcoin/pull/30595#discussion_r2413702370) to avoid blocking validation among others use cases not publicly available.

  Note 2:
  I could have created a wrapper around the existing code and replaced the `WorkQueue` in a subsequent
  commit, but it didn’t seem worth the extra commits and review effort. The `ThreadPool` implements
  essentially the same functionality in a more modern and cleaner way.

ACKs for top commit:
  Eunovo:
    ReACK 38fd85c676
  sedited:
    Re-ACK 38fd85c676a072ebf256e806beda9d7533790baa
  pinheadmz:
    ACK 38fd85c676a072ebf256e806beda9d7533790baa

Tree-SHA512: a0330e54ed504330ca874c42d4e318a909f548b2fb9ac46db8badf5935b9eec47dc4ed503d1b6f98574418e3473420ea45f60498be05545c4325cfa89dcca689
This commit is contained in:
merge-script 2026-02-11 18:04:17 +01:00
commit 4a05825a3f
No known key found for this signature in database
GPG Key ID: 9B79B45691DB4173
8 changed files with 694 additions and 120 deletions

View File

@ -17,6 +17,7 @@
#include <util/signalinterrupt.h>
#include <util/strencodings.h>
#include <util/threadnames.h>
#include <util/threadpool.h>
#include <util/translation.h>
#include <condition_variable>
@ -49,83 +50,6 @@ using common::InvalidPortErrMsg;
/** Maximum size of http request (request line + headers) */
static const size_t MAX_HEADERS_SIZE = 8192;
/** HTTP request work item */
class HTTPWorkItem final : public HTTPClosure
{
public:
HTTPWorkItem(std::unique_ptr<HTTPRequest> _req, const std::string &_path, const HTTPRequestHandler& _func):
req(std::move(_req)), path(_path), func(_func)
{
}
void operator()() override
{
func(req.get(), path);
}
std::unique_ptr<HTTPRequest> req;
private:
std::string path;
HTTPRequestHandler func;
};
/** Simple work queue for distributing work over multiple threads.
* Work items are simply callable objects.
*/
template <typename WorkItem>
class WorkQueue
{
private:
Mutex cs;
std::condition_variable cond GUARDED_BY(cs);
std::deque<std::unique_ptr<WorkItem>> queue GUARDED_BY(cs);
bool running GUARDED_BY(cs){true};
const size_t maxDepth;
public:
explicit WorkQueue(size_t _maxDepth) : maxDepth(_maxDepth)
{
}
/** Precondition: worker threads have all stopped (they have been joined).
*/
~WorkQueue() = default;
/** Enqueue a work item */
bool Enqueue(WorkItem* item) EXCLUSIVE_LOCKS_REQUIRED(!cs)
{
LOCK(cs);
if (!running || queue.size() >= maxDepth) {
return false;
}
queue.emplace_back(std::unique_ptr<WorkItem>(item));
cond.notify_one();
return true;
}
/** Thread function */
void Run() EXCLUSIVE_LOCKS_REQUIRED(!cs)
{
while (true) {
std::unique_ptr<WorkItem> i;
{
WAIT_LOCK(cs, lock);
while (running && queue.empty())
cond.wait(lock);
if (!running && queue.empty())
break;
i = std::move(queue.front());
queue.pop_front();
}
(*i)();
}
}
/** Interrupt and exit loops */
void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!cs)
{
LOCK(cs);
running = false;
cond.notify_all();
}
};
struct HTTPPathHandler
{
HTTPPathHandler(std::string _prefix, bool _exactMatch, HTTPRequestHandler _handler):
@ -145,13 +69,14 @@ static struct event_base* eventBase = nullptr;
static struct evhttp* eventHTTP = nullptr;
//! List of subnets to allow RPC connections from
static std::vector<CSubNet> rpc_allow_subnets;
//! Work queue for handling longer requests off the event loop thread
static std::unique_ptr<WorkQueue<HTTPClosure>> g_work_queue{nullptr};
//! Handlers for (sub)paths
static GlobalMutex g_httppathhandlers_mutex;
static std::vector<HTTPPathHandler> pathHandlers GUARDED_BY(g_httppathhandlers_mutex);
//! Bound listening sockets
static std::vector<evhttp_bound_socket *> boundSockets;
//! Http thread pool - future: encapsulate in HttpContext
static ThreadPool g_threadpool_http("http");
static int g_max_queue_depth{100};
/**
* @brief Helps keep track of open `evhttp_connection`s with active `evhttp_requests`
@ -327,14 +252,31 @@ static void http_request_cb(struct evhttp_request* req, void* arg)
// Dispatch to worker thread
if (i != iend) {
std::unique_ptr<HTTPWorkItem> item(new HTTPWorkItem(std::move(hreq), path, i->handler));
assert(g_work_queue);
if (g_work_queue->Enqueue(item.get())) {
(void)item.release(); /* if true, queue took ownership */
} else {
if (static_cast<int>(g_threadpool_http.WorkQueueSize()) >= g_max_queue_depth) {
LogWarning("Request rejected because http work queue depth exceeded, it can be increased with the -rpcworkqueue= setting");
item->req->WriteReply(HTTP_SERVICE_UNAVAILABLE, "Work queue depth exceeded");
hreq->WriteReply(HTTP_SERVICE_UNAVAILABLE, "Work queue depth exceeded");
return;
}
auto item = [req = std::move(hreq), in_path = std::move(path), fn = i->handler]() {
std::string err_msg;
try {
fn(req.get(), in_path);
return;
} catch (const std::exception& e) {
LogWarning("Unexpected error while processing request for '%s'. Error msg: '%s'", req->GetURI(), e.what());
err_msg = e.what();
} catch (...) {
LogWarning("Unknown error while processing request for '%s'", req->GetURI());
err_msg = "unknown error";
}
// Reply so the client doesn't hang waiting for the response.
req->WriteHeader("Connection", "close");
// TODO: Implement specific error formatting for the REST and JSON-RPC servers responses.
req->WriteReply(HTTP_INTERNAL_SERVER_ERROR, err_msg);
};
[[maybe_unused]] auto _{g_threadpool_http.Submit(std::move(item))};
} else {
hreq->WriteReply(HTTP_NOT_FOUND);
}
@ -412,13 +354,6 @@ static bool HTTPBindAddresses(struct evhttp* http)
return !boundSockets.empty();
}
/** Simple wrapper to set thread name and run work queue */
static void HTTPWorkQueueRun(WorkQueue<HTTPClosure>* queue, int worker_num)
{
util::ThreadRename(strprintf("httpworker.%i", worker_num));
queue->Run();
}
/** libevent event log callback */
static void libevent_log_cb(int severity, const char *msg)
{
@ -475,10 +410,9 @@ bool InitHTTPServer(const util::SignalInterrupt& interrupt)
}
LogDebug(BCLog::HTTP, "Initialized HTTP server\n");
int workQueueDepth = std::max((long)gArgs.GetIntArg("-rpcworkqueue", DEFAULT_HTTP_WORKQUEUE), 1L);
LogDebug(BCLog::HTTP, "creating work queue of depth %d\n", workQueueDepth);
g_max_queue_depth = std::max((long)gArgs.GetIntArg("-rpcworkqueue", DEFAULT_HTTP_WORKQUEUE), 1L);
LogDebug(BCLog::HTTP, "set work queue of depth %d\n", g_max_queue_depth);
g_work_queue = std::make_unique<WorkQueue<HTTPClosure>>(workQueueDepth);
// transfer ownership to eventBase/HTTP via .release()
eventBase = base_ctr.release();
eventHTTP = http_ctr.release();
@ -494,17 +428,13 @@ void UpdateHTTPServerLogging(bool enable) {
}
static std::thread g_thread_http;
static std::vector<std::thread> g_thread_http_workers;
void StartHTTPServer()
{
int rpcThreads = std::max((long)gArgs.GetIntArg("-rpcthreads", DEFAULT_HTTP_THREADS), 1L);
LogInfo("Starting HTTP server with %d worker threads\n", rpcThreads);
g_threadpool_http.Start(rpcThreads);
g_thread_http = std::thread(ThreadHTTP, eventBase);
for (int i = 0; i < rpcThreads; i++) {
g_thread_http_workers.emplace_back(HTTPWorkQueueRun, g_work_queue.get(), i);
}
}
void InterruptHTTPServer()
@ -514,21 +444,17 @@ void InterruptHTTPServer()
// Reject requests on current connections
evhttp_set_gencb(eventHTTP, http_reject_request_cb, nullptr);
}
if (g_work_queue) {
g_work_queue->Interrupt();
}
// Interrupt pool after disabling requests
g_threadpool_http.Interrupt();
}
void StopHTTPServer()
{
LogDebug(BCLog::HTTP, "Stopping HTTP server\n");
if (g_work_queue) {
LogDebug(BCLog::HTTP, "Waiting for HTTP worker threads to exit\n");
for (auto& thread : g_thread_http_workers) {
thread.join();
}
g_thread_http_workers.clear();
}
LogDebug(BCLog::HTTP, "Waiting for HTTP worker threads to exit\n");
g_threadpool_http.Stop();
// Unlisten sockets, these are what make the event loop running, which means
// that after this and all connections are closed the event loop will quit.
for (evhttp_bound_socket *socket : boundSockets) {
@ -556,7 +482,6 @@ void StopHTTPServer()
event_base_free(eventBase);
eventBase = nullptr;
}
g_work_queue.reset();
LogDebug(BCLog::HTTP, "Stopped HTTP server\n");
}

View File

@ -159,15 +159,6 @@ public:
*/
std::optional<std::string> GetQueryParameterFromUri(const char* uri, const std::string& key);
/** Event handler closure.
*/
class HTTPClosure
{
public:
virtual void operator()() = 0;
virtual ~HTTPClosure() = default;
};
/** Event class. This can be used either as a cross-thread trigger or as a timer.
*/
class HTTPEvent

View File

@ -107,6 +107,7 @@ add_executable(test_bitcoin
system_ram_tests.cpp
system_tests.cpp
testnet4_miner_tests.cpp
threadpool_tests.cpp
timeoffsets_tests.cpp
torcontrol_tests.cpp
transaction_tests.cpp

View File

@ -120,6 +120,7 @@ add_executable(fuzz
string.cpp
strprintf.cpp
system.cpp
threadpool.cpp
timeoffsets.cpp
torcontrol.cpp
transaction.cpp

View File

@ -0,0 +1,117 @@
// Copyright (c) The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <logging.h>
#include <util/threadpool.h>
#include <test/fuzz/FuzzedDataProvider.h>
#include <test/fuzz/fuzz.h>
#include <atomic>
#include <future>
#include <queue>
struct ExpectedException : std::runtime_error {
explicit ExpectedException(const std::string& msg) : std::runtime_error(msg) {}
};
struct ThrowTask {
void operator()() const { throw ExpectedException("fail"); }
};
struct CounterTask {
std::atomic_uint32_t& m_counter;
explicit CounterTask(std::atomic_uint32_t& counter) : m_counter{counter} {}
void operator()() const { m_counter.fetch_add(1, std::memory_order_relaxed); }
};
// Waits for a future to complete. Increments 'fail_counter' if the expected exception is thrown.
static void GetFuture(std::future<void>& future, uint32_t& fail_counter)
{
try {
future.get();
} catch (const ExpectedException&) {
fail_counter++;
} catch (...) {
assert(false && "Unexpected exception type");
}
}
// Global thread pool for fuzzing. Persisting it across iterations prevents
// the excessive thread creation/destruction overhead that can lead to
// instability in the fuzzing environment.
// This is also how we use it in the app's lifecycle.
ThreadPool g_pool{"fuzz"};
Mutex g_pool_mutex;
// Global to verify we always have the same number of threads.
size_t g_num_workers = 3;
static void StartPoolIfNeeded() EXCLUSIVE_LOCKS_REQUIRED(!g_pool_mutex)
{
LOCK(g_pool_mutex);
if (g_pool.WorkersCount() == g_num_workers) return;
g_pool.Start(g_num_workers);
}
static void setup_threadpool_test()
{
// Disable logging entirely. It seems to cause memory leaks.
LogInstance().DisableLogging();
}
FUZZ_TARGET(threadpool, .init = setup_threadpool_test) EXCLUSIVE_LOCKS_REQUIRED(!g_pool_mutex)
{
// Because LibAFL calls fork() after calling the init setup function,
// the child processes end up having one thread active and no workers.
// To work around this limitation, start thread pool inside the first runner.
StartPoolIfNeeded();
FuzzedDataProvider fuzzed_data_provider(buffer.data(), buffer.size());
const uint32_t num_tasks = fuzzed_data_provider.ConsumeIntegralInRange<uint32_t>(0, 1024);
assert(g_pool.WorkersCount() == g_num_workers);
assert(g_pool.WorkQueueSize() == 0);
// Counters
std::atomic_uint32_t task_counter{0};
uint32_t fail_counter{0};
uint32_t expected_task_counter{0};
uint32_t expected_fail_tasks{0};
std::queue<std::future<void>> futures;
for (uint32_t i = 0; i < num_tasks; ++i) {
const bool will_throw = fuzzed_data_provider.ConsumeBool();
const bool wait_immediately = fuzzed_data_provider.ConsumeBool();
std::future<void> fut;
if (will_throw) {
expected_fail_tasks++;
fut = g_pool.Submit(ThrowTask{});
} else {
expected_task_counter++;
fut = g_pool.Submit(CounterTask{task_counter});
}
// If caller wants to wait immediately, consume the future here (safe).
if (wait_immediately) {
// Waits for this task to complete immediately; prior queued tasks may also complete
// as they were queued earlier.
GetFuture(fut, fail_counter);
} else {
// Store task for a posterior check
futures.emplace(std::move(fut));
}
}
// Drain remaining futures
while (!futures.empty()) {
auto fut = std::move(futures.front());
futures.pop();
GetFuture(fut, fail_counter);
}
assert(g_pool.WorkQueueSize() == 0);
assert(task_counter.load() == expected_task_counter);
assert(fail_counter == expected_fail_tasks);
}

View File

@ -0,0 +1,325 @@
// Copyright (c) The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <common/system.h>
#include <logging.h>
#include <random.h>
#include <util/string.h>
#include <util/threadpool.h>
#include <util/time.h>
#include <boost/test/unit_test.hpp>
// General test values
int NUM_WORKERS_DEFAULT = 0;
constexpr char POOL_NAME[] = "test";
constexpr auto WAIT_TIMEOUT = 120s;
struct ThreadPoolFixture {
ThreadPoolFixture() {
NUM_WORKERS_DEFAULT = FastRandomContext().randrange(GetNumCores()) + 1;
LogInfo("thread pool workers count: %d", NUM_WORKERS_DEFAULT);
}
};
// Test Cases Overview
// 0) Submit task to a non-started pool.
// 1) Submit tasks and verify completion.
// 2) Maintain all threads busy except one.
// 3) Wait for work to finish.
// 4) Wait for result object.
// 5) The task throws an exception, catch must be done in the consumer side.
// 6) Busy workers, help them by processing tasks externally.
// 7) Recursive submission of tasks.
// 8) Submit task when all threads are busy, stop pool and verify task gets executed.
// 9) Congestion test; create more workers than available cores.
// 10) Ensure Interrupt() prevents further submissions.
BOOST_FIXTURE_TEST_SUITE(threadpool_tests, ThreadPoolFixture)
#define WAIT_FOR(futures) \
do { \
for (const auto& f : futures) { \
BOOST_REQUIRE(f.wait_for(WAIT_TIMEOUT) == std::future_status::ready); \
} \
} while (0)
// Block a number of worker threads by submitting tasks that wait on `blocker_future`.
// Returns the futures of the blocking tasks, ensuring all have started and are waiting.
std::vector<std::future<void>> BlockWorkers(ThreadPool& threadPool, const std::shared_future<void>& blocker_future, int num_of_threads_to_block)
{
// Per-thread ready promises to ensure all workers are actually blocked
std::vector<std::promise<void>> ready_promises(num_of_threads_to_block);
std::vector<std::future<void>> ready_futures;
ready_futures.reserve(num_of_threads_to_block);
for (auto& p : ready_promises) ready_futures.emplace_back(p.get_future());
// Fill all workers with blocking tasks
std::vector<std::future<void>> blocking_tasks;
for (int i = 0; i < num_of_threads_to_block; i++) {
std::promise<void>& ready = ready_promises[i];
blocking_tasks.emplace_back(threadPool.Submit([blocker_future, &ready]() {
ready.set_value();
blocker_future.wait();
}));
}
// Wait until all threads are actually blocked
WAIT_FOR(ready_futures);
return blocking_tasks;
}
// Test 0, submit task to a non-started pool
BOOST_AUTO_TEST_CASE(submit_task_before_start_fails)
{
ThreadPool threadPool(POOL_NAME);
BOOST_CHECK_EXCEPTION((void)threadPool.Submit([]{ return false; }), std::runtime_error, [&](const std::runtime_error& e) {
BOOST_CHECK_EQUAL(e.what(), "No active workers; cannot accept new tasks");
return true;
});
}
// Test 1, submit tasks and verify completion
BOOST_AUTO_TEST_CASE(submit_tasks_complete_successfully)
{
int num_tasks = 50;
ThreadPool threadPool(POOL_NAME);
threadPool.Start(NUM_WORKERS_DEFAULT);
std::atomic<int> counter = 0;
// Store futures to ensure completion before checking counter.
std::vector<std::future<void>> futures;
futures.reserve(num_tasks);
for (int i = 1; i <= num_tasks; i++) {
futures.emplace_back(threadPool.Submit([&counter, i]() {
counter.fetch_add(i, std::memory_order_relaxed);
}));
}
// Wait for all tasks to finish
WAIT_FOR(futures);
int expected_value = (num_tasks * (num_tasks + 1)) / 2; // Gauss sum.
BOOST_CHECK_EQUAL(counter.load(), expected_value);
BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
}
// Test 2, maintain all threads busy except one
BOOST_AUTO_TEST_CASE(single_available_worker_executes_all_tasks)
{
ThreadPool threadPool(POOL_NAME);
threadPool.Start(NUM_WORKERS_DEFAULT);
// Single blocking future for all threads
std::promise<void> blocker;
std::shared_future<void> blocker_future(blocker.get_future());
const auto blocking_tasks = BlockWorkers(threadPool, blocker_future, NUM_WORKERS_DEFAULT - 1);
// Now execute tasks on the single available worker
// and check that all the tasks are executed.
int num_tasks = 15;
int counter = 0;
// Store futures to wait on
std::vector<std::future<void>> futures(num_tasks);
for (auto& f : futures) f = threadPool.Submit([&counter]{ counter++; });
WAIT_FOR(futures);
BOOST_CHECK_EQUAL(counter, num_tasks);
blocker.set_value();
WAIT_FOR(blocking_tasks);
threadPool.Stop();
BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
}
// Test 3, wait for work to finish
BOOST_AUTO_TEST_CASE(wait_for_task_to_finish)
{
ThreadPool threadPool(POOL_NAME);
threadPool.Start(NUM_WORKERS_DEFAULT);
std::atomic<bool> flag = false;
std::future<void> future = threadPool.Submit([&flag]() {
UninterruptibleSleep(200ms);
flag.store(true, std::memory_order_release);
});
BOOST_CHECK(future.wait_for(WAIT_TIMEOUT) == std::future_status::ready);
BOOST_CHECK(flag.load(std::memory_order_acquire));
}
// Test 4, obtain result object
BOOST_AUTO_TEST_CASE(get_result_from_completed_task)
{
ThreadPool threadPool(POOL_NAME);
threadPool.Start(NUM_WORKERS_DEFAULT);
std::future<bool> future_bool = threadPool.Submit([]() { return true; });
BOOST_CHECK(future_bool.get());
std::future<std::string> future_str = threadPool.Submit([]() { return std::string("true"); });
std::string result = future_str.get();
BOOST_CHECK_EQUAL(result, "true");
}
// Test 5, throw exception and catch it on the consumer side
BOOST_AUTO_TEST_CASE(task_exception_propagates_to_future)
{
ThreadPool threadPool(POOL_NAME);
threadPool.Start(NUM_WORKERS_DEFAULT);
int num_tasks = 5;
std::string err_msg{"something wrong happened"};
std::vector<std::future<void>> futures;
futures.reserve(num_tasks);
for (int i = 0; i < num_tasks; i++) {
futures.emplace_back(threadPool.Submit([err_msg, i]() {
throw std::runtime_error(err_msg + util::ToString(i));
}));
}
for (int i = 0; i < num_tasks; i++) {
BOOST_CHECK_EXCEPTION(futures.at(i).get(), std::runtime_error, [&](const std::runtime_error& e) {
BOOST_CHECK_EQUAL(e.what(), err_msg + util::ToString(i));
return true;
});
}
}
// Test 6, all workers are busy, help them by processing tasks from outside
BOOST_AUTO_TEST_CASE(process_tasks_manually_when_workers_busy)
{
ThreadPool threadPool(POOL_NAME);
threadPool.Start(NUM_WORKERS_DEFAULT);
std::promise<void> blocker;
std::shared_future<void> blocker_future(blocker.get_future());
const auto& blocking_tasks = BlockWorkers(threadPool, blocker_future, NUM_WORKERS_DEFAULT);
// Now submit tasks and check that none of them are executed.
int num_tasks = 20;
std::atomic<int> counter = 0;
for (int i = 0; i < num_tasks; i++) {
(void)threadPool.Submit([&counter]() {
counter.fetch_add(1, std::memory_order_relaxed);
});
}
UninterruptibleSleep(100ms);
BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), num_tasks);
// Now process manually
for (int i = 0; i < num_tasks; i++) {
threadPool.ProcessTask();
}
BOOST_CHECK_EQUAL(counter.load(), num_tasks);
BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
blocker.set_value();
threadPool.Stop();
WAIT_FOR(blocking_tasks);
}
// Test 7, submit tasks from other tasks
BOOST_AUTO_TEST_CASE(recursive_task_submission)
{
ThreadPool threadPool(POOL_NAME);
threadPool.Start(NUM_WORKERS_DEFAULT);
std::promise<void> signal;
(void)threadPool.Submit([&]() {
(void)threadPool.Submit([&]() {
signal.set_value();
});
});
signal.get_future().wait();
threadPool.Stop();
}
// Test 8, submit task when all threads are busy and then stop the pool
BOOST_AUTO_TEST_CASE(task_submitted_while_busy_completes)
{
ThreadPool threadPool(POOL_NAME);
threadPool.Start(NUM_WORKERS_DEFAULT);
std::promise<void> blocker;
std::shared_future<void> blocker_future(blocker.get_future());
const auto& blocking_tasks = BlockWorkers(threadPool, blocker_future, NUM_WORKERS_DEFAULT);
// Submit an extra task that should execute once a worker is free
std::future<bool> future = threadPool.Submit([]() { return true; });
// At this point, all workers are blocked, and the extra task is queued
BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 1);
// Wait a short moment before unblocking the threads to mimic a concurrent shutdown
std::thread thread_unblocker([&blocker]() {
UninterruptibleSleep(300ms);
blocker.set_value();
});
// Stop the pool while the workers are still blocked
threadPool.Stop();
// Expect the submitted task to complete
BOOST_CHECK(future.get());
thread_unblocker.join();
// Obviously all the previously blocking tasks should be completed at this point too
WAIT_FOR(blocking_tasks);
// Pool should be stopped and no workers remaining
BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
}
// Test 9, more workers than available cores (congestion test)
BOOST_AUTO_TEST_CASE(congestion_more_workers_than_cores)
{
ThreadPool threadPool(POOL_NAME);
threadPool.Start(std::max(1, GetNumCores() * 2)); // Oversubscribe by 2×
int num_tasks = 200;
std::atomic<int> counter{0};
std::vector<std::future<void>> futures;
futures.reserve(num_tasks);
for (int i = 0; i < num_tasks; i++) {
futures.emplace_back(threadPool.Submit([&counter] {
counter.fetch_add(1, std::memory_order_relaxed);
}));
}
WAIT_FOR(futures);
BOOST_CHECK_EQUAL(counter.load(), num_tasks);
}
// Test 10, Interrupt() prevents further submissions
BOOST_AUTO_TEST_CASE(interrupt_blocks_new_submissions)
{
// 1) Interrupt from main thread
ThreadPool threadPool(POOL_NAME);
threadPool.Start(NUM_WORKERS_DEFAULT);
threadPool.Interrupt();
BOOST_CHECK_EXCEPTION((void)threadPool.Submit([]{}), std::runtime_error, [&](const std::runtime_error& e) {
BOOST_CHECK_EQUAL(e.what(), "No active workers; cannot accept new tasks");
return true;
});
// Reset pool
threadPool.Stop();
// 2) Interrupt() from a worker thread
// One worker is blocked, another calls Interrupt(), and the remaining one waits for tasks.
threadPool.Start(/*num_workers=*/3);
std::atomic<int> counter{0};
std::promise<void> blocker;
const auto blocking_tasks = BlockWorkers(threadPool, blocker.get_future().share(), 1);
threadPool.Submit([&threadPool, &counter]{
threadPool.Interrupt();
counter.fetch_add(1, std::memory_order_relaxed);
}).get();
blocker.set_value(); // unblock worker
BOOST_CHECK_EQUAL(counter.load(), 1);
threadPool.Stop();
WAIT_FOR(blocking_tasks);
BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
}
BOOST_AUTO_TEST_SUITE_END()

211
src/util/threadpool.h Normal file
View File

@ -0,0 +1,211 @@
// Copyright (c) The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or https://www.opensource.org/licenses/mit-license.php.
#ifndef BITCOIN_UTIL_THREADPOOL_H
#define BITCOIN_UTIL_THREADPOOL_H
#include <sync.h>
#include <tinyformat.h>
#include <util/check.h>
#include <util/thread.h>
#include <algorithm>
#include <condition_variable>
#include <functional>
#include <future>
#include <queue>
#include <stdexcept>
#include <thread>
#include <utility>
#include <vector>
/**
* @brief Fixed-size thread pool for running arbitrary tasks concurrently.
*
* The thread pool maintains a set of worker threads that consume and execute
* tasks submitted through Submit(). Once started, tasks can be queued and
* processed asynchronously until Stop() is called.
*
* ### Thread-safety and lifecycle
* - `Start()` and `Stop()` must be called from a controller (non-worker) thread.
* Calling `Stop()` from a worker thread will deadlock, as it waits for all
* workers to join, including the current one.
*
* - `Submit()` can be called from any thread, including workers. It safely
* enqueues new work for execution as long as the pool has active workers.
*
* - `Interrupt()` stops new task submission and lets queued ones drain
* in the background. Callers can continue other shutdown steps and call
* Stop() at the end to ensure no remaining tasks are left to execute.
*
* - `Stop()` prevents further task submission and blocks until all the
* queued ones are completed.
*/
class ThreadPool
{
private:
std::string m_name;
Mutex m_mutex;
std::queue<std::packaged_task<void()>> m_work_queue GUARDED_BY(m_mutex);
std::condition_variable m_cv;
// Note: m_interrupt must be guarded by m_mutex, and cannot be replaced by an unguarded atomic bool.
// This ensures threads blocked on m_cv reliably observe the change and proceed correctly without missing signals.
// Ref: https://en.cppreference.com/w/cpp/thread/condition_variable
bool m_interrupt GUARDED_BY(m_mutex){false};
std::vector<std::thread> m_workers GUARDED_BY(m_mutex);
void WorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
WAIT_LOCK(m_mutex, wait_lock);
for (;;) {
std::packaged_task<void()> task;
{
// Wait only if needed; avoid sleeping when a new task was submitted while we were processing another one.
if (!m_interrupt && m_work_queue.empty()) {
// Block until the pool is interrupted or a task is available.
m_cv.wait(wait_lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { return m_interrupt || !m_work_queue.empty(); });
}
// If stopped and no work left, exit worker
if (m_interrupt && m_work_queue.empty()) {
return;
}
task = std::move(m_work_queue.front());
m_work_queue.pop();
}
{
// Execute the task without the lock
REVERSE_LOCK(wait_lock, m_mutex);
task();
}
}
}
public:
explicit ThreadPool(const std::string& name) : m_name(name) {}
~ThreadPool()
{
Stop(); // In case it hasn't been stopped.
}
/**
* @brief Start worker threads.
*
* Creates and launches `num_workers` threads that begin executing tasks
* from the queue. If the pool is already started, throws.
*
* Must be called from a controller (non-worker) thread.
*/
void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
assert(num_workers > 0);
LOCK(m_mutex);
if (!m_workers.empty()) throw std::runtime_error("Thread pool already started");
m_interrupt = false; // Reset
// Create workers
m_workers.reserve(num_workers);
for (int i = 0; i < num_workers; i++) {
m_workers.emplace_back(&util::TraceThread, strprintf("%s_pool_%d", m_name, i), [this] { WorkerThread(); });
}
}
/**
* @brief Stop all worker threads and wait for them to exit.
*
* Sets the interrupt flag, wakes all waiting workers, and joins them.
* Any remaining tasks in the queue will be processed before returning.
*
* Must be called from a controller (non-worker) thread.
*/
void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
// Notify workers and join them
std::vector<std::thread> threads_to_join;
{
LOCK(m_mutex);
// Ensure Stop() is not called from a worker thread while workers are still registered,
// otherwise a self-join deadlock would occur.
auto id = std::this_thread::get_id();
for (const auto& worker : m_workers) assert(worker.get_id() != id);
// Early shutdown to return right away on any concurrent Submit() call
m_interrupt = true;
threads_to_join.swap(m_workers);
}
m_cv.notify_all();
for (auto& worker : threads_to_join) worker.join();
// Since we currently wait for tasks completion, sanity-check empty queue
WITH_LOCK(m_mutex, Assume(m_work_queue.empty()));
// Note: m_interrupt is left true until next Start()
}
/**
* @brief Enqueues a new task for asynchronous execution.
*
* Returns a `std::future` that provides the task's result or propagates
* any exception it throws.
* Note: Ignoring the returned future requires guarding the task against
* uncaught exceptions, as they would otherwise be silently discarded.
*/
template <class F> [[nodiscard]] EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
auto Submit(F&& fn)
{
std::packaged_task task{std::forward<F>(fn)};
auto future{task.get_future()};
{
LOCK(m_mutex);
if (m_interrupt || m_workers.empty()) {
throw std::runtime_error("No active workers; cannot accept new tasks");
}
m_work_queue.emplace(std::move(task));
}
m_cv.notify_one();
return future;
}
/**
* @brief Execute a single queued task synchronously.
* Removes one task from the queue and executes it on the calling thread.
*/
void ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
std::packaged_task<void()> task;
{
LOCK(m_mutex);
if (m_work_queue.empty()) return;
// Pop the task
task = std::move(m_work_queue.front());
m_work_queue.pop();
}
task();
}
/**
* @brief Stop accepting new tasks and begin asynchronous shutdown.
*
* Wakes all worker threads so they can drain the queue and exit.
* Unlike Stop(), this function does not wait for threads to finish.
*/
void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
WITH_LOCK(m_mutex, m_interrupt = true);
m_cv.notify_all();
}
size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
return WITH_LOCK(m_mutex, return m_work_queue.size());
}
size_t WorkersCount() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
return WITH_LOCK(m_mutex, return m_workers.size());
}
};
#endif // BITCOIN_UTIL_THREADPOOL_H

View File

@ -147,6 +147,9 @@ class BitcoinTestFramework(metaclass=BitcoinTestMetaClass):
except subprocess.CalledProcessError as e:
self.log.exception(f"Called Process failed with stdout='{e.stdout}'; stderr='{e.stderr}';")
self.success = TestStatus.FAILED
except JSONRPCException as e:
self.log.exception(f"Failure during setup: error={e.error}, http_status={e.http_status}")
self.success = TestStatus.FAILED
except BaseException:
self.log.exception("Unexpected exception")
self.success = TestStatus.FAILED