From 45930a79412dc45f9d391cd7689d029fa4f0189e Mon Sep 17 00:00:00 2001 From: furszy Date: Wed, 28 Jan 2026 12:40:02 -0500 Subject: [PATCH 1/5] http-server: guard against crashes from unhandled exceptions Currently, if an exception is thrown at the top-level HTTP request handler (prior to invoking the command), the program crashes. Ideally, each handler should catch all exceptions internally and be responsible for sanitizing them and crafting the client response. This is because only the handler knows the correct response format, which differs per server type. However, because this cannot always be guaranteed, it is safer to also catch exceptions in the top-level server code, log the unexpected error, and disconnect the socket. This both guards against crashes caused by uncaught exceptions and prevents the client from hanging indefinitely while waiting for a response that will never arrive. The following diff can be used to trigger the crash in master (just run single node functional tests like feature_shutdown.py): ``` diff --git a/src/httprpc.cpp b/src/httprpc.cpp --- a/src/httprpc.cpp +++ b/src/httprpc.cpp @@ -103,6 +103,9 @@ static bool HTTPReq_JSONRPC(const std::any& context, HTTPRequest* req) { + static int i = 0; // skip initial requests as they are used in the RPC warmup phase. + if (i++ > 3) throw std::runtime_error("error from json rpc handler"); + // JSONRPC handles only POST if (req->GetRequestMethod() != HTTPRequest::POST) { req->WriteReply(HTTP_BAD_METHOD, "JSONRPC server handles only POST requests"); ``` Note: This leaves a TODO in the code because error responses should eventually be specialized per server type. REST clients expect plain text responses, while JSON-RPC clients expect a JSON error object. The TODO is there because this is not consistently enforced everywhere in the current codebase, and we should tackle them all at once. --- src/httpserver.cpp | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/src/httpserver.cpp b/src/httpserver.cpp index 71c6f5b1ee2..61df454af8e 100644 --- a/src/httpserver.cpp +++ b/src/httpserver.cpp @@ -327,7 +327,24 @@ static void http_request_cb(struct evhttp_request* req, void* arg) // Dispatch to worker thread if (i != iend) { - std::unique_ptr item(new HTTPWorkItem(std::move(hreq), path, i->handler)); + auto item = std::make_unique(std::move(hreq), path, [fn = i->handler](HTTPRequest* req, const std::string& path_inner) { + std::string err_msg; + try { + return fn(req, path_inner); + } catch (const std::exception& e) { + LogWarning("Unexpected error while processing request for '%s'. Error msg: '%s'", req->GetURI(), e.what()); + err_msg = e.what(); + } catch (...) { + LogWarning("Unknown error while processing request for '%s'", req->GetURI()); + err_msg = "unknown error"; + } + // Reply so the client doesn't hang waiting for the response. + req->WriteHeader("Connection", "close"); + // TODO: Implement specific error formatting for the REST and JSON-RPC servers responses. + req->WriteReply(HTTP_INTERNAL_SERVER_ERROR, err_msg); + return false; + }); + assert(g_work_queue); if (g_work_queue->Enqueue(item.get())) { (void)item.release(); /* if true, queue took ownership */ From 6354b4fd7fe819eb13274b212e426a7d10ca75d3 Mon Sep 17 00:00:00 2001 From: furszy Date: Tue, 27 Jan 2026 20:59:33 -0500 Subject: [PATCH 2/5] tests: log node JSON-RPC errors during test setup Currently, if the node replies to any command with an error during the test framework's setup(), we log the generic and not really useful "Unexpected exception" from the BaseException catch, with no further information. This isn't helpful for diagnosing the issue. Fix it by explicitly handling JSONRPCException and logging the response error message and http status code. --- test/functional/test_framework/test_framework.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/test/functional/test_framework/test_framework.py b/test/functional/test_framework/test_framework.py index 1f957564453..3ab351aef4a 100755 --- a/test/functional/test_framework/test_framework.py +++ b/test/functional/test_framework/test_framework.py @@ -147,6 +147,9 @@ class BitcoinTestFramework(metaclass=BitcoinTestMetaClass): except subprocess.CalledProcessError as e: self.log.exception(f"Called Process failed with stdout='{e.stdout}'; stderr='{e.stderr}';") self.success = TestStatus.FAILED + except JSONRPCException as e: + self.log.exception(f"Failure during setup: error={e.error}, http_status={e.http_status}") + self.success = TestStatus.FAILED except BaseException: self.log.exception("Unexpected exception") self.success = TestStatus.FAILED From c528dd5f8ccc3955b00bdba869f0a774efa97fe1 Mon Sep 17 00:00:00 2001 From: furszy Date: Thu, 16 Feb 2023 12:20:33 -0300 Subject: [PATCH 3/5] util: introduce general purpose thread pool --- src/test/CMakeLists.txt | 1 + src/test/threadpool_tests.cpp | 325 ++++++++++++++++++++++++++++++++++ src/util/threadpool.h | 211 ++++++++++++++++++++++ 3 files changed, 537 insertions(+) create mode 100644 src/test/threadpool_tests.cpp create mode 100644 src/util/threadpool.h diff --git a/src/test/CMakeLists.txt b/src/test/CMakeLists.txt index 2466a483921..1cae4fc43fd 100644 --- a/src/test/CMakeLists.txt +++ b/src/test/CMakeLists.txt @@ -107,6 +107,7 @@ add_executable(test_bitcoin system_ram_tests.cpp system_tests.cpp testnet4_miner_tests.cpp + threadpool_tests.cpp timeoffsets_tests.cpp torcontrol_tests.cpp transaction_tests.cpp diff --git a/src/test/threadpool_tests.cpp b/src/test/threadpool_tests.cpp new file mode 100644 index 00000000000..46acf1d67d6 --- /dev/null +++ b/src/test/threadpool_tests.cpp @@ -0,0 +1,325 @@ +// Copyright (c) The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include +#include +#include +#include +#include +#include + +#include + +// General test values +int NUM_WORKERS_DEFAULT = 0; +constexpr char POOL_NAME[] = "test"; +constexpr auto WAIT_TIMEOUT = 120s; + +struct ThreadPoolFixture { + ThreadPoolFixture() { + NUM_WORKERS_DEFAULT = FastRandomContext().randrange(GetNumCores()) + 1; + LogInfo("thread pool workers count: %d", NUM_WORKERS_DEFAULT); + } +}; + +// Test Cases Overview +// 0) Submit task to a non-started pool. +// 1) Submit tasks and verify completion. +// 2) Maintain all threads busy except one. +// 3) Wait for work to finish. +// 4) Wait for result object. +// 5) The task throws an exception, catch must be done in the consumer side. +// 6) Busy workers, help them by processing tasks externally. +// 7) Recursive submission of tasks. +// 8) Submit task when all threads are busy, stop pool and verify task gets executed. +// 9) Congestion test; create more workers than available cores. +// 10) Ensure Interrupt() prevents further submissions. +BOOST_FIXTURE_TEST_SUITE(threadpool_tests, ThreadPoolFixture) + +#define WAIT_FOR(futures) \ + do { \ + for (const auto& f : futures) { \ + BOOST_REQUIRE(f.wait_for(WAIT_TIMEOUT) == std::future_status::ready); \ + } \ + } while (0) + +// Block a number of worker threads by submitting tasks that wait on `blocker_future`. +// Returns the futures of the blocking tasks, ensuring all have started and are waiting. +std::vector> BlockWorkers(ThreadPool& threadPool, const std::shared_future& blocker_future, int num_of_threads_to_block) +{ + // Per-thread ready promises to ensure all workers are actually blocked + std::vector> ready_promises(num_of_threads_to_block); + std::vector> ready_futures; + ready_futures.reserve(num_of_threads_to_block); + for (auto& p : ready_promises) ready_futures.emplace_back(p.get_future()); + + // Fill all workers with blocking tasks + std::vector> blocking_tasks; + for (int i = 0; i < num_of_threads_to_block; i++) { + std::promise& ready = ready_promises[i]; + blocking_tasks.emplace_back(threadPool.Submit([blocker_future, &ready]() { + ready.set_value(); + blocker_future.wait(); + })); + } + + // Wait until all threads are actually blocked + WAIT_FOR(ready_futures); + return blocking_tasks; +} + +// Test 0, submit task to a non-started pool +BOOST_AUTO_TEST_CASE(submit_task_before_start_fails) +{ + ThreadPool threadPool(POOL_NAME); + BOOST_CHECK_EXCEPTION((void)threadPool.Submit([]{ return false; }), std::runtime_error, [&](const std::runtime_error& e) { + BOOST_CHECK_EQUAL(e.what(), "No active workers; cannot accept new tasks"); + return true; + }); +} + +// Test 1, submit tasks and verify completion +BOOST_AUTO_TEST_CASE(submit_tasks_complete_successfully) +{ + int num_tasks = 50; + + ThreadPool threadPool(POOL_NAME); + threadPool.Start(NUM_WORKERS_DEFAULT); + std::atomic counter = 0; + + // Store futures to ensure completion before checking counter. + std::vector> futures; + futures.reserve(num_tasks); + for (int i = 1; i <= num_tasks; i++) { + futures.emplace_back(threadPool.Submit([&counter, i]() { + counter.fetch_add(i, std::memory_order_relaxed); + })); + } + + // Wait for all tasks to finish + WAIT_FOR(futures); + int expected_value = (num_tasks * (num_tasks + 1)) / 2; // Gauss sum. + BOOST_CHECK_EQUAL(counter.load(), expected_value); + BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0); +} + +// Test 2, maintain all threads busy except one +BOOST_AUTO_TEST_CASE(single_available_worker_executes_all_tasks) +{ + ThreadPool threadPool(POOL_NAME); + threadPool.Start(NUM_WORKERS_DEFAULT); + // Single blocking future for all threads + std::promise blocker; + std::shared_future blocker_future(blocker.get_future()); + const auto blocking_tasks = BlockWorkers(threadPool, blocker_future, NUM_WORKERS_DEFAULT - 1); + + // Now execute tasks on the single available worker + // and check that all the tasks are executed. + int num_tasks = 15; + int counter = 0; + + // Store futures to wait on + std::vector> futures(num_tasks); + for (auto& f : futures) f = threadPool.Submit([&counter]{ counter++; }); + + WAIT_FOR(futures); + BOOST_CHECK_EQUAL(counter, num_tasks); + + blocker.set_value(); + WAIT_FOR(blocking_tasks); + threadPool.Stop(); + BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0); +} + +// Test 3, wait for work to finish +BOOST_AUTO_TEST_CASE(wait_for_task_to_finish) +{ + ThreadPool threadPool(POOL_NAME); + threadPool.Start(NUM_WORKERS_DEFAULT); + std::atomic flag = false; + std::future future = threadPool.Submit([&flag]() { + UninterruptibleSleep(200ms); + flag.store(true, std::memory_order_release); + }); + BOOST_CHECK(future.wait_for(WAIT_TIMEOUT) == std::future_status::ready); + BOOST_CHECK(flag.load(std::memory_order_acquire)); +} + +// Test 4, obtain result object +BOOST_AUTO_TEST_CASE(get_result_from_completed_task) +{ + ThreadPool threadPool(POOL_NAME); + threadPool.Start(NUM_WORKERS_DEFAULT); + std::future future_bool = threadPool.Submit([]() { return true; }); + BOOST_CHECK(future_bool.get()); + + std::future future_str = threadPool.Submit([]() { return std::string("true"); }); + std::string result = future_str.get(); + BOOST_CHECK_EQUAL(result, "true"); +} + +// Test 5, throw exception and catch it on the consumer side +BOOST_AUTO_TEST_CASE(task_exception_propagates_to_future) +{ + ThreadPool threadPool(POOL_NAME); + threadPool.Start(NUM_WORKERS_DEFAULT); + + int num_tasks = 5; + std::string err_msg{"something wrong happened"}; + std::vector> futures; + futures.reserve(num_tasks); + for (int i = 0; i < num_tasks; i++) { + futures.emplace_back(threadPool.Submit([err_msg, i]() { + throw std::runtime_error(err_msg + util::ToString(i)); + })); + } + + for (int i = 0; i < num_tasks; i++) { + BOOST_CHECK_EXCEPTION(futures.at(i).get(), std::runtime_error, [&](const std::runtime_error& e) { + BOOST_CHECK_EQUAL(e.what(), err_msg + util::ToString(i)); + return true; + }); + } +} + +// Test 6, all workers are busy, help them by processing tasks from outside +BOOST_AUTO_TEST_CASE(process_tasks_manually_when_workers_busy) +{ + ThreadPool threadPool(POOL_NAME); + threadPool.Start(NUM_WORKERS_DEFAULT); + + std::promise blocker; + std::shared_future blocker_future(blocker.get_future()); + const auto& blocking_tasks = BlockWorkers(threadPool, blocker_future, NUM_WORKERS_DEFAULT); + + // Now submit tasks and check that none of them are executed. + int num_tasks = 20; + std::atomic counter = 0; + for (int i = 0; i < num_tasks; i++) { + (void)threadPool.Submit([&counter]() { + counter.fetch_add(1, std::memory_order_relaxed); + }); + } + UninterruptibleSleep(100ms); + BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), num_tasks); + + // Now process manually + for (int i = 0; i < num_tasks; i++) { + threadPool.ProcessTask(); + } + BOOST_CHECK_EQUAL(counter.load(), num_tasks); + BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0); + blocker.set_value(); + threadPool.Stop(); + WAIT_FOR(blocking_tasks); +} + +// Test 7, submit tasks from other tasks +BOOST_AUTO_TEST_CASE(recursive_task_submission) +{ + ThreadPool threadPool(POOL_NAME); + threadPool.Start(NUM_WORKERS_DEFAULT); + + std::promise signal; + (void)threadPool.Submit([&]() { + (void)threadPool.Submit([&]() { + signal.set_value(); + }); + }); + + signal.get_future().wait(); + threadPool.Stop(); +} + +// Test 8, submit task when all threads are busy and then stop the pool +BOOST_AUTO_TEST_CASE(task_submitted_while_busy_completes) +{ + ThreadPool threadPool(POOL_NAME); + threadPool.Start(NUM_WORKERS_DEFAULT); + + std::promise blocker; + std::shared_future blocker_future(blocker.get_future()); + const auto& blocking_tasks = BlockWorkers(threadPool, blocker_future, NUM_WORKERS_DEFAULT); + + // Submit an extra task that should execute once a worker is free + std::future future = threadPool.Submit([]() { return true; }); + + // At this point, all workers are blocked, and the extra task is queued + BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 1); + + // Wait a short moment before unblocking the threads to mimic a concurrent shutdown + std::thread thread_unblocker([&blocker]() { + UninterruptibleSleep(300ms); + blocker.set_value(); + }); + + // Stop the pool while the workers are still blocked + threadPool.Stop(); + + // Expect the submitted task to complete + BOOST_CHECK(future.get()); + thread_unblocker.join(); + + // Obviously all the previously blocking tasks should be completed at this point too + WAIT_FOR(blocking_tasks); + + // Pool should be stopped and no workers remaining + BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0); +} + +// Test 9, more workers than available cores (congestion test) +BOOST_AUTO_TEST_CASE(congestion_more_workers_than_cores) +{ + ThreadPool threadPool(POOL_NAME); + threadPool.Start(std::max(1, GetNumCores() * 2)); // Oversubscribe by 2× + + int num_tasks = 200; + std::atomic counter{0}; + + std::vector> futures; + futures.reserve(num_tasks); + for (int i = 0; i < num_tasks; i++) { + futures.emplace_back(threadPool.Submit([&counter] { + counter.fetch_add(1, std::memory_order_relaxed); + })); + } + + WAIT_FOR(futures); + BOOST_CHECK_EQUAL(counter.load(), num_tasks); +} + +// Test 10, Interrupt() prevents further submissions +BOOST_AUTO_TEST_CASE(interrupt_blocks_new_submissions) +{ + // 1) Interrupt from main thread + ThreadPool threadPool(POOL_NAME); + threadPool.Start(NUM_WORKERS_DEFAULT); + threadPool.Interrupt(); + BOOST_CHECK_EXCEPTION((void)threadPool.Submit([]{}), std::runtime_error, [&](const std::runtime_error& e) { + BOOST_CHECK_EQUAL(e.what(), "No active workers; cannot accept new tasks"); + return true; + }); + + // Reset pool + threadPool.Stop(); + + // 2) Interrupt() from a worker thread + // One worker is blocked, another calls Interrupt(), and the remaining one waits for tasks. + threadPool.Start(/*num_workers=*/3); + std::atomic counter{0}; + std::promise blocker; + const auto blocking_tasks = BlockWorkers(threadPool, blocker.get_future().share(), 1); + threadPool.Submit([&threadPool, &counter]{ + threadPool.Interrupt(); + counter.fetch_add(1, std::memory_order_relaxed); + }).get(); + blocker.set_value(); // unblock worker + + BOOST_CHECK_EQUAL(counter.load(), 1); + threadPool.Stop(); + WAIT_FOR(blocking_tasks); + BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0); +} + +BOOST_AUTO_TEST_SUITE_END() diff --git a/src/util/threadpool.h b/src/util/threadpool.h new file mode 100644 index 00000000000..b75a94157eb --- /dev/null +++ b/src/util/threadpool.h @@ -0,0 +1,211 @@ +// Copyright (c) The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or https://www.opensource.org/licenses/mit-license.php. + +#ifndef BITCOIN_UTIL_THREADPOOL_H +#define BITCOIN_UTIL_THREADPOOL_H + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/** + * @brief Fixed-size thread pool for running arbitrary tasks concurrently. + * + * The thread pool maintains a set of worker threads that consume and execute + * tasks submitted through Submit(). Once started, tasks can be queued and + * processed asynchronously until Stop() is called. + * + * ### Thread-safety and lifecycle + * - `Start()` and `Stop()` must be called from a controller (non-worker) thread. + * Calling `Stop()` from a worker thread will deadlock, as it waits for all + * workers to join, including the current one. + * + * - `Submit()` can be called from any thread, including workers. It safely + * enqueues new work for execution as long as the pool has active workers. + * + * - `Interrupt()` stops new task submission and lets queued ones drain + * in the background. Callers can continue other shutdown steps and call + * Stop() at the end to ensure no remaining tasks are left to execute. + * + * - `Stop()` prevents further task submission and blocks until all the + * queued ones are completed. + */ +class ThreadPool +{ +private: + std::string m_name; + Mutex m_mutex; + std::queue> m_work_queue GUARDED_BY(m_mutex); + std::condition_variable m_cv; + // Note: m_interrupt must be guarded by m_mutex, and cannot be replaced by an unguarded atomic bool. + // This ensures threads blocked on m_cv reliably observe the change and proceed correctly without missing signals. + // Ref: https://en.cppreference.com/w/cpp/thread/condition_variable + bool m_interrupt GUARDED_BY(m_mutex){false}; + std::vector m_workers GUARDED_BY(m_mutex); + + void WorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) + { + WAIT_LOCK(m_mutex, wait_lock); + for (;;) { + std::packaged_task task; + { + // Wait only if needed; avoid sleeping when a new task was submitted while we were processing another one. + if (!m_interrupt && m_work_queue.empty()) { + // Block until the pool is interrupted or a task is available. + m_cv.wait(wait_lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { return m_interrupt || !m_work_queue.empty(); }); + } + + // If stopped and no work left, exit worker + if (m_interrupt && m_work_queue.empty()) { + return; + } + + task = std::move(m_work_queue.front()); + m_work_queue.pop(); + } + + { + // Execute the task without the lock + REVERSE_LOCK(wait_lock, m_mutex); + task(); + } + } + } + +public: + explicit ThreadPool(const std::string& name) : m_name(name) {} + + ~ThreadPool() + { + Stop(); // In case it hasn't been stopped. + } + + /** + * @brief Start worker threads. + * + * Creates and launches `num_workers` threads that begin executing tasks + * from the queue. If the pool is already started, throws. + * + * Must be called from a controller (non-worker) thread. + */ + void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) + { + assert(num_workers > 0); + LOCK(m_mutex); + if (!m_workers.empty()) throw std::runtime_error("Thread pool already started"); + m_interrupt = false; // Reset + + // Create workers + m_workers.reserve(num_workers); + for (int i = 0; i < num_workers; i++) { + m_workers.emplace_back(&util::TraceThread, strprintf("%s_pool_%d", m_name, i), [this] { WorkerThread(); }); + } + } + + /** + * @brief Stop all worker threads and wait for them to exit. + * + * Sets the interrupt flag, wakes all waiting workers, and joins them. + * Any remaining tasks in the queue will be processed before returning. + * + * Must be called from a controller (non-worker) thread. + */ + void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) + { + // Notify workers and join them + std::vector threads_to_join; + { + LOCK(m_mutex); + // Ensure Stop() is not called from a worker thread while workers are still registered, + // otherwise a self-join deadlock would occur. + auto id = std::this_thread::get_id(); + for (const auto& worker : m_workers) assert(worker.get_id() != id); + // Early shutdown to return right away on any concurrent Submit() call + m_interrupt = true; + threads_to_join.swap(m_workers); + } + m_cv.notify_all(); + for (auto& worker : threads_to_join) worker.join(); + // Since we currently wait for tasks completion, sanity-check empty queue + WITH_LOCK(m_mutex, Assume(m_work_queue.empty())); + // Note: m_interrupt is left true until next Start() + } + + /** + * @brief Enqueues a new task for asynchronous execution. + * + * Returns a `std::future` that provides the task's result or propagates + * any exception it throws. + * Note: Ignoring the returned future requires guarding the task against + * uncaught exceptions, as they would otherwise be silently discarded. + */ + template [[nodiscard]] EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) + auto Submit(F&& fn) + { + std::packaged_task task{std::forward(fn)}; + auto future{task.get_future()}; + { + LOCK(m_mutex); + if (m_interrupt || m_workers.empty()) { + throw std::runtime_error("No active workers; cannot accept new tasks"); + } + m_work_queue.emplace(std::move(task)); + } + m_cv.notify_one(); + return future; + } + + /** + * @brief Execute a single queued task synchronously. + * Removes one task from the queue and executes it on the calling thread. + */ + void ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) + { + std::packaged_task task; + { + LOCK(m_mutex); + if (m_work_queue.empty()) return; + + // Pop the task + task = std::move(m_work_queue.front()); + m_work_queue.pop(); + } + task(); + } + + /** + * @brief Stop accepting new tasks and begin asynchronous shutdown. + * + * Wakes all worker threads so they can drain the queue and exit. + * Unlike Stop(), this function does not wait for threads to finish. + */ + void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) + { + WITH_LOCK(m_mutex, m_interrupt = true); + m_cv.notify_all(); + } + + size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) + { + return WITH_LOCK(m_mutex, return m_work_queue.size()); + } + + size_t WorkersCount() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) + { + return WITH_LOCK(m_mutex, return m_workers.size()); + } +}; + +#endif // BITCOIN_UTIL_THREADPOOL_H From c323f882ed3841401edee90ab5261d68215ab316 Mon Sep 17 00:00:00 2001 From: TheCharlatan Date: Mon, 22 Sep 2025 13:21:50 -0400 Subject: [PATCH 4/5] fuzz: add test case for threadpool Co-authored-by: furszy --- src/test/fuzz/CMakeLists.txt | 1 + src/test/fuzz/threadpool.cpp | 117 +++++++++++++++++++++++++++++++++++ 2 files changed, 118 insertions(+) create mode 100644 src/test/fuzz/threadpool.cpp diff --git a/src/test/fuzz/CMakeLists.txt b/src/test/fuzz/CMakeLists.txt index 3d2ff8f3aa7..fc82fdc03a2 100644 --- a/src/test/fuzz/CMakeLists.txt +++ b/src/test/fuzz/CMakeLists.txt @@ -120,6 +120,7 @@ add_executable(fuzz string.cpp strprintf.cpp system.cpp + threadpool.cpp timeoffsets.cpp torcontrol.cpp transaction.cpp diff --git a/src/test/fuzz/threadpool.cpp b/src/test/fuzz/threadpool.cpp new file mode 100644 index 00000000000..293aa63e0fe --- /dev/null +++ b/src/test/fuzz/threadpool.cpp @@ -0,0 +1,117 @@ +// Copyright (c) The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include +#include + +#include +#include + +#include +#include +#include + +struct ExpectedException : std::runtime_error { + explicit ExpectedException(const std::string& msg) : std::runtime_error(msg) {} +}; + +struct ThrowTask { + void operator()() const { throw ExpectedException("fail"); } +}; + +struct CounterTask { + std::atomic_uint32_t& m_counter; + explicit CounterTask(std::atomic_uint32_t& counter) : m_counter{counter} {} + void operator()() const { m_counter.fetch_add(1, std::memory_order_relaxed); } +}; + +// Waits for a future to complete. Increments 'fail_counter' if the expected exception is thrown. +static void GetFuture(std::future& future, uint32_t& fail_counter) +{ + try { + future.get(); + } catch (const ExpectedException&) { + fail_counter++; + } catch (...) { + assert(false && "Unexpected exception type"); + } +} + +// Global thread pool for fuzzing. Persisting it across iterations prevents +// the excessive thread creation/destruction overhead that can lead to +// instability in the fuzzing environment. +// This is also how we use it in the app's lifecycle. +ThreadPool g_pool{"fuzz"}; +Mutex g_pool_mutex; +// Global to verify we always have the same number of threads. +size_t g_num_workers = 3; + +static void StartPoolIfNeeded() EXCLUSIVE_LOCKS_REQUIRED(!g_pool_mutex) +{ + LOCK(g_pool_mutex); + if (g_pool.WorkersCount() == g_num_workers) return; + g_pool.Start(g_num_workers); +} + +static void setup_threadpool_test() +{ + // Disable logging entirely. It seems to cause memory leaks. + LogInstance().DisableLogging(); +} + +FUZZ_TARGET(threadpool, .init = setup_threadpool_test) EXCLUSIVE_LOCKS_REQUIRED(!g_pool_mutex) +{ + // Because LibAFL calls fork() after calling the init setup function, + // the child processes end up having one thread active and no workers. + // To work around this limitation, start thread pool inside the first runner. + StartPoolIfNeeded(); + + FuzzedDataProvider fuzzed_data_provider(buffer.data(), buffer.size()); + + const uint32_t num_tasks = fuzzed_data_provider.ConsumeIntegralInRange(0, 1024); + assert(g_pool.WorkersCount() == g_num_workers); + assert(g_pool.WorkQueueSize() == 0); + + // Counters + std::atomic_uint32_t task_counter{0}; + uint32_t fail_counter{0}; + uint32_t expected_task_counter{0}; + uint32_t expected_fail_tasks{0}; + + std::queue> futures; + for (uint32_t i = 0; i < num_tasks; ++i) { + const bool will_throw = fuzzed_data_provider.ConsumeBool(); + const bool wait_immediately = fuzzed_data_provider.ConsumeBool(); + + std::future fut; + if (will_throw) { + expected_fail_tasks++; + fut = g_pool.Submit(ThrowTask{}); + } else { + expected_task_counter++; + fut = g_pool.Submit(CounterTask{task_counter}); + } + + // If caller wants to wait immediately, consume the future here (safe). + if (wait_immediately) { + // Waits for this task to complete immediately; prior queued tasks may also complete + // as they were queued earlier. + GetFuture(fut, fail_counter); + } else { + // Store task for a posterior check + futures.emplace(std::move(fut)); + } + } + + // Drain remaining futures + while (!futures.empty()) { + auto fut = std::move(futures.front()); + futures.pop(); + GetFuture(fut, fail_counter); + } + + assert(g_pool.WorkQueueSize() == 0); + assert(task_counter.load() == expected_task_counter); + assert(fail_counter == expected_fail_tasks); +} From 38fd85c676a072ebf256e806beda9d7533790baa Mon Sep 17 00:00:00 2001 From: furszy Date: Wed, 28 Jan 2026 15:08:01 -0500 Subject: [PATCH 5/5] http: replace WorkQueue and threads handling for ThreadPool Replace the HTTP server's WorkQueue implementation and single threads handling code with ThreadPool for processing HTTP requests. The ThreadPool class encapsulates all this functionality on a reusable class, properly unit and fuzz tested (the previous code was not unit nor fuzz tested at all). This cleanly separates responsibilities: The HTTP server now focuses solely on receiving and dispatching requests, while ThreadPool handles concurrency, queuing, and execution. It simplifies init, shutdown and requests tracking. This also allows us to experiment with further performance improvements at the task queuing and execution level, such as a lock-free structure, task prioritization or any other performance improvement in the future, without having to deal with HTTP code that lives on a different layer. --- src/httpserver.cpp | 140 ++++++++------------------------------------- src/httpserver.h | 9 --- 2 files changed, 24 insertions(+), 125 deletions(-) diff --git a/src/httpserver.cpp b/src/httpserver.cpp index 61df454af8e..b6a74d1c015 100644 --- a/src/httpserver.cpp +++ b/src/httpserver.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include @@ -49,83 +50,6 @@ using common::InvalidPortErrMsg; /** Maximum size of http request (request line + headers) */ static const size_t MAX_HEADERS_SIZE = 8192; -/** HTTP request work item */ -class HTTPWorkItem final : public HTTPClosure -{ -public: - HTTPWorkItem(std::unique_ptr _req, const std::string &_path, const HTTPRequestHandler& _func): - req(std::move(_req)), path(_path), func(_func) - { - } - void operator()() override - { - func(req.get(), path); - } - - std::unique_ptr req; - -private: - std::string path; - HTTPRequestHandler func; -}; - -/** Simple work queue for distributing work over multiple threads. - * Work items are simply callable objects. - */ -template -class WorkQueue -{ -private: - Mutex cs; - std::condition_variable cond GUARDED_BY(cs); - std::deque> queue GUARDED_BY(cs); - bool running GUARDED_BY(cs){true}; - const size_t maxDepth; - -public: - explicit WorkQueue(size_t _maxDepth) : maxDepth(_maxDepth) - { - } - /** Precondition: worker threads have all stopped (they have been joined). - */ - ~WorkQueue() = default; - /** Enqueue a work item */ - bool Enqueue(WorkItem* item) EXCLUSIVE_LOCKS_REQUIRED(!cs) - { - LOCK(cs); - if (!running || queue.size() >= maxDepth) { - return false; - } - queue.emplace_back(std::unique_ptr(item)); - cond.notify_one(); - return true; - } - /** Thread function */ - void Run() EXCLUSIVE_LOCKS_REQUIRED(!cs) - { - while (true) { - std::unique_ptr i; - { - WAIT_LOCK(cs, lock); - while (running && queue.empty()) - cond.wait(lock); - if (!running && queue.empty()) - break; - i = std::move(queue.front()); - queue.pop_front(); - } - (*i)(); - } - } - /** Interrupt and exit loops */ - void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!cs) - { - LOCK(cs); - running = false; - cond.notify_all(); - } -}; - struct HTTPPathHandler { HTTPPathHandler(std::string _prefix, bool _exactMatch, HTTPRequestHandler _handler): @@ -145,13 +69,14 @@ static struct event_base* eventBase = nullptr; static struct evhttp* eventHTTP = nullptr; //! List of subnets to allow RPC connections from static std::vector rpc_allow_subnets; -//! Work queue for handling longer requests off the event loop thread -static std::unique_ptr> g_work_queue{nullptr}; //! Handlers for (sub)paths static GlobalMutex g_httppathhandlers_mutex; static std::vector pathHandlers GUARDED_BY(g_httppathhandlers_mutex); //! Bound listening sockets static std::vector boundSockets; +//! Http thread pool - future: encapsulate in HttpContext +static ThreadPool g_threadpool_http("http"); +static int g_max_queue_depth{100}; /** * @brief Helps keep track of open `evhttp_connection`s with active `evhttp_requests` @@ -327,10 +252,17 @@ static void http_request_cb(struct evhttp_request* req, void* arg) // Dispatch to worker thread if (i != iend) { - auto item = std::make_unique(std::move(hreq), path, [fn = i->handler](HTTPRequest* req, const std::string& path_inner) { + if (static_cast(g_threadpool_http.WorkQueueSize()) >= g_max_queue_depth) { + LogWarning("Request rejected because http work queue depth exceeded, it can be increased with the -rpcworkqueue= setting"); + hreq->WriteReply(HTTP_SERVICE_UNAVAILABLE, "Work queue depth exceeded"); + return; + } + + auto item = [req = std::move(hreq), in_path = std::move(path), fn = i->handler]() { std::string err_msg; try { - return fn(req, path_inner); + fn(req.get(), in_path); + return; } catch (const std::exception& e) { LogWarning("Unexpected error while processing request for '%s'. Error msg: '%s'", req->GetURI(), e.what()); err_msg = e.what(); @@ -342,16 +274,9 @@ static void http_request_cb(struct evhttp_request* req, void* arg) req->WriteHeader("Connection", "close"); // TODO: Implement specific error formatting for the REST and JSON-RPC servers responses. req->WriteReply(HTTP_INTERNAL_SERVER_ERROR, err_msg); - return false; - }); + }; - assert(g_work_queue); - if (g_work_queue->Enqueue(item.get())) { - (void)item.release(); /* if true, queue took ownership */ - } else { - LogWarning("Request rejected because http work queue depth exceeded, it can be increased with the -rpcworkqueue= setting"); - item->req->WriteReply(HTTP_SERVICE_UNAVAILABLE, "Work queue depth exceeded"); - } + [[maybe_unused]] auto _{g_threadpool_http.Submit(std::move(item))}; } else { hreq->WriteReply(HTTP_NOT_FOUND); } @@ -429,13 +354,6 @@ static bool HTTPBindAddresses(struct evhttp* http) return !boundSockets.empty(); } -/** Simple wrapper to set thread name and run work queue */ -static void HTTPWorkQueueRun(WorkQueue* queue, int worker_num) -{ - util::ThreadRename(strprintf("httpworker.%i", worker_num)); - queue->Run(); -} - /** libevent event log callback */ static void libevent_log_cb(int severity, const char *msg) { @@ -492,10 +410,9 @@ bool InitHTTPServer(const util::SignalInterrupt& interrupt) } LogDebug(BCLog::HTTP, "Initialized HTTP server\n"); - int workQueueDepth = std::max((long)gArgs.GetIntArg("-rpcworkqueue", DEFAULT_HTTP_WORKQUEUE), 1L); - LogDebug(BCLog::HTTP, "creating work queue of depth %d\n", workQueueDepth); + g_max_queue_depth = std::max((long)gArgs.GetIntArg("-rpcworkqueue", DEFAULT_HTTP_WORKQUEUE), 1L); + LogDebug(BCLog::HTTP, "set work queue of depth %d\n", g_max_queue_depth); - g_work_queue = std::make_unique>(workQueueDepth); // transfer ownership to eventBase/HTTP via .release() eventBase = base_ctr.release(); eventHTTP = http_ctr.release(); @@ -511,17 +428,13 @@ void UpdateHTTPServerLogging(bool enable) { } static std::thread g_thread_http; -static std::vector g_thread_http_workers; void StartHTTPServer() { int rpcThreads = std::max((long)gArgs.GetIntArg("-rpcthreads", DEFAULT_HTTP_THREADS), 1L); LogInfo("Starting HTTP server with %d worker threads\n", rpcThreads); + g_threadpool_http.Start(rpcThreads); g_thread_http = std::thread(ThreadHTTP, eventBase); - - for (int i = 0; i < rpcThreads; i++) { - g_thread_http_workers.emplace_back(HTTPWorkQueueRun, g_work_queue.get(), i); - } } void InterruptHTTPServer() @@ -531,21 +444,17 @@ void InterruptHTTPServer() // Reject requests on current connections evhttp_set_gencb(eventHTTP, http_reject_request_cb, nullptr); } - if (g_work_queue) { - g_work_queue->Interrupt(); - } + // Interrupt pool after disabling requests + g_threadpool_http.Interrupt(); } void StopHTTPServer() { LogDebug(BCLog::HTTP, "Stopping HTTP server\n"); - if (g_work_queue) { - LogDebug(BCLog::HTTP, "Waiting for HTTP worker threads to exit\n"); - for (auto& thread : g_thread_http_workers) { - thread.join(); - } - g_thread_http_workers.clear(); - } + + LogDebug(BCLog::HTTP, "Waiting for HTTP worker threads to exit\n"); + g_threadpool_http.Stop(); + // Unlisten sockets, these are what make the event loop running, which means // that after this and all connections are closed the event loop will quit. for (evhttp_bound_socket *socket : boundSockets) { @@ -573,7 +482,6 @@ void StopHTTPServer() event_base_free(eventBase); eventBase = nullptr; } - g_work_queue.reset(); LogDebug(BCLog::HTTP, "Stopped HTTP server\n"); } diff --git a/src/httpserver.h b/src/httpserver.h index 1ef3aaeb0ab..5461480d44f 100644 --- a/src/httpserver.h +++ b/src/httpserver.h @@ -159,15 +159,6 @@ public: */ std::optional GetQueryParameterFromUri(const char* uri, const std::string& key); -/** Event handler closure. - */ -class HTTPClosure -{ -public: - virtual void operator()() = 0; - virtual ~HTTPClosure() = default; -}; - /** Event class. This can be used either as a cross-thread trigger or as a timer. */ class HTTPEvent