Merge bitcoin/bitcoin#34562: ThreadPool follow-ups, proactive shutdown and HasReason dependency cleanup

408d5b12e80151feded31c2a5509e2dc5f15edf3 test: include response body in non-JSON HTTP error msg (Matthew Zipkin)
9dc653b3b4f3049b0e742499b762f7c13bb006cc test: threadpool, add coverage for all Submit() errors (furszy)
ce2a984ee324d37ba1dd7c2c4e27e40e0508bedc test: cleanup, use HasReason in threadpool_tests.cpp (l0rinc)
d9c6769d0324b65121935b7c8a285c6421fe74a6 test: refactor, decouple HasReason from test framework machinery (furszy)
dbbb780af02d850a1f9257f18610cfb9de9cb828 test: move and simplify BOOST_CHECK ostream helpers (Hodlinator)
3b7cbcafcb9b318bf1fa00a3499f514c5ebe9bb6 test: ensure Stop() thread helps drain the queue (seduless)
ca101a2315774f0ed65da633ba99899fd0dad740 test: coverage for queued tasks completion after interrupt (furszy)
bf2c607aaa22d253b9367c11b0a198bd4244ad2f threadpool: active-wait during shutdown (furszy)
e88d2744301a434064714f0a21e1395d41ac3984 test: add threadpool Start-Stop race coverage (furszy)
8cd4a4363fb85f5487a19ace82aa0d12d5fab450 threadpool: guard against Start-Stop race (furszy)
9ff1e82e7dbdf31ddf1c534853da4581a1f41bd5 test: cleanup, block threads via semaphore instead of shared_future (l0rinc)

Pull request description:

  A few follow-ups to #33689, includes:

  1) `ThreadPool` active-wait during shutdown:
  Instead of just waiting for workers to finish processing tasks, `Stop()` now helps them actively.
  This speeds up the JSON-RPC and REST server shutdown, resulting in a faster node shutdown when many requests remain unhandled. This wasn't included in the original PR due to the behavior change this introduces.

  2) Decouple `HasReason` from the unit test framework machinery
  This avoids providing the entire unit test framework dependency to low-level tests that only require access to the `HasReason` utility class. Examples are: `reverselock_tests.cpp`, `sync_tests.cpp`, `util_check_tests.cpp`, `util_string_tests.cpp`, `script_parse_tests.cpp` and `threadpool_tests.cpp`. These tests no longer gain access to unnecessary components like the chainstate, node context, caches, etc. It includes l0rinc's `threadpool_tests.cpp` `HasReason` changes.

  3) Include response body in non-JSON HTTP error messages
  Straight from pinheadmz [comment](https://github.com/bitcoin/bitcoin/pull/33689#discussion_r2783817192), it makes debugging CI issues easier.

ACKs for top commit:
  maflcko:
    review ACK 408d5b12e80151feded31c2a5509e2dc5f15edf3 🕗
  achow101:
    ACK 408d5b12e80151feded31c2a5509e2dc5f15edf3
  hodlinator:
    re-ACK 408d5b12e80151feded31c2a5509e2dc5f15edf3

Tree-SHA512: 57aa0ef96886f32bf95a0bd7f87c878d31c9df9e34cb96de615eee703ce0824b5cfdf8f5c9cd19a3594559994295b5810c38c94f5efd6291cbbd83a95473357a
This commit is contained in:
Ava Chow 2026-02-26 12:44:11 -08:00
commit b6b8f8ac55
No known key found for this signature in database
GPG Key ID: 17565732E08E5E41
65 changed files with 295 additions and 126 deletions

View File

@ -5,6 +5,7 @@
#include <ipc/process.h>
#include <ipc/test/ipc_test.h>
#include <test/util/common.h>
#include <test/util/setup_common.h>
#include <boost/test/unit_test.hpp>

View File

@ -3,7 +3,7 @@
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <arith_uint256.h>
#include <test/util/setup_common.h>
#include <test/util/common.h>
#include <uint256.h>
#include <boost/test/unit_test.hpp>

View File

@ -10,6 +10,7 @@
#include <test/util/random.h>
#include <test/util/txmempool.h>
#include <test/util/common.h>
#include <test/util/setup_common.h>
#include <boost/test/unit_test.hpp>

View File

@ -12,6 +12,7 @@
#include <node/miner.h>
#include <pow.h>
#include <test/util/blockfilter.h>
#include <test/util/common.h>
#include <test/util/setup_common.h>
#include <validation.h>

View File

@ -3,7 +3,7 @@
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <test/data/blockfilters.json.h>
#include <test/util/setup_common.h>
#include <test/util/common.h>
#include <blockfilter.h>
#include <core_io.h>

View File

@ -14,6 +14,7 @@
#include <validation.h>
#include <boost/test/unit_test.hpp>
#include <test/util/common.h>
#include <test/util/logging.h>
#include <test/util/setup_common.h>

View File

@ -13,6 +13,7 @@
#include <random.h>
#include <serialize.h>
#include <streams.h>
#include <test/util/common.h>
#include <test/util/random.h>
#include <test/util/setup_common.h>
#include <uint256.h>

View File

@ -6,6 +6,7 @@
#include <clientversion.h>
#include <coins.h>
#include <streams.h>
#include <test/util/common.h>
#include <test/util/poolresourcetester.h>
#include <test/util/random.h>
#include <test/util/setup_common.h>

View File

@ -17,6 +17,7 @@
#include <crypto/muhash.h>
#include <random.h>
#include <streams.h>
#include <test/util/common.h>
#include <test/util/random.h>
#include <test/util/setup_common.h>
#include <util/strencodings.h>

View File

@ -3,6 +3,7 @@
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <dbwrapper.h>
#include <test/util/common.h>
#include <test/util/random.h>
#include <test/util/setup_common.h>
#include <uint256.h>

View File

@ -5,6 +5,7 @@
#include <common/args.h>
#include <common/settings.h>
#include <logging.h>
#include <test/util/common.h>
#include <test/util/setup_common.h>
#include <univalue.h>
#include <util/strencodings.h>

View File

@ -8,6 +8,7 @@
#include <headerssync.h>
#include <net_processing.h>
#include <pow.h>
#include <test/util/common.h>
#include <test/util/setup_common.h>
#include <validation.h>

View File

@ -3,6 +3,7 @@
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <httpserver.h>
#include <test/util/common.h>
#include <test/util/setup_common.h>
#include <boost/test/unit_test.hpp>

View File

@ -5,6 +5,7 @@
#include <chainparams.h>
#include <consensus/validation.h>
#include <interfaces/chain.h>
#include <test/util/common.h>
#include <test/util/setup_common.h>
#include <script/solver.h>
#include <validation.h>

View File

@ -9,6 +9,7 @@
#include <span.h>
#include <streams.h>
#include <secp256k1_extrakeys.h>
#include <test/util/common.h>
#include <test/util/random.h>
#include <test/util/setup_common.h>
#include <uint256.h>

View File

@ -6,6 +6,7 @@
#include <logging.h>
#include <logging/timer.h>
#include <scheduler.h>
#include <test/util/common.h>
#include <test/util/logging.h>
#include <test/util/setup_common.h>
#include <tinyformat.h>

View File

@ -3,6 +3,7 @@
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <consensus/merkle.h>
#include <test/util/common.h>
#include <test/util/random.h>
#include <test/util/setup_common.h>

View File

@ -3,6 +3,7 @@
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <merkleblock.h>
#include <test/util/common.h>
#include <test/util/setup_common.h>
#include <uint256.h>

View File

@ -25,6 +25,7 @@
#include <versionbits.h>
#include <pow.h>
#include <test/util/common.h>
#include <test/util/setup_common.h>
#include <memory>

View File

@ -5,6 +5,7 @@
#include <minisketch.h>
#include <node/minisketchwrapper.h>
#include <random.h>
#include <test/util/common.h>
#include <test/util/random.h>
#include <test/util/setup_common.h>

View File

@ -9,6 +9,7 @@
#include <script/script_error.h>
#include <script/sign.h>
#include <script/signingprovider.h>
#include <test/util/common.h>
#include <test/util/setup_common.h>
#include <test/util/transaction_utils.h>
#include <tinyformat.h>

View File

@ -15,6 +15,7 @@
#include <serialize.h>
#include <span.h>
#include <streams.h>
#include <test/util/common.h>
#include <test/util/net.h>
#include <test/util/random.h>
#include <test/util/setup_common.h>

View File

@ -10,6 +10,7 @@
#include <protocol.h>
#include <serialize.h>
#include <streams.h>
#include <test/util/common.h>
#include <test/util/setup_common.h>
#include <util/strencodings.h>
#include <util/translation.h>

View File

@ -7,6 +7,7 @@
#include <rpc/server.h>
#include <boost/test/unit_test.hpp>
#include <test/util/common.h>
#include <test/util/setup_common.h>
using node::NodeContext;

View File

@ -10,6 +10,7 @@
#include <pubkey.h>
#include <script/sign.h>
#include <script/signingprovider.h>
#include <test/util/common.h>
#include <test/util/random.h>
#include <test/util/setup_common.h>
#include <test/util/transaction_utils.h>

View File

@ -5,6 +5,7 @@
#include <common/pcp.h>
#include <netbase.h>
#include <test/util/logging.h>
#include <test/util/common.h>
#include <test/util/setup_common.h>
#include <util/time.h>

View File

@ -6,6 +6,7 @@
#include <chainparams.h>
#include <pow.h>
#include <test/util/random.h>
#include <test/util/common.h>
#include <test/util/setup_common.h>
#include <util/chaintype.h>

View File

@ -3,6 +3,7 @@
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <rest.h>
#include <test/util/common.h>
#include <test/util/setup_common.h>
#include <boost/test/unit_test.hpp>

View File

@ -3,7 +3,7 @@
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <sync.h>
#include <test/util/setup_common.h>
#include <test/util/common.h>
#include <boost/test/unit_test.hpp>

View File

@ -9,6 +9,7 @@
#include <rpc/client.h>
#include <rpc/server.h>
#include <rpc/util.h>
#include <test/util/common.h>
#include <test/util/setup_common.h>
#include <univalue.h>
#include <util/time.h>

View File

@ -5,7 +5,7 @@
#include <core_io.h>
#include <script/script.h>
#include <util/strencodings.h>
#include <test/util/setup_common.h>
#include <test/util/common.h>
#include <boost/test/unit_test.hpp>

View File

@ -10,6 +10,7 @@
#include <script/script.h>
#include <script/signingprovider.h>
#include <script/solver.h>
#include <test/util/common.h>
#include <test/util/setup_common.h>
#include <util/strencodings.h>

View File

@ -19,6 +19,7 @@
#include <streams.h>
#include <test/util/json.h>
#include <test/util/random.h>
#include <test/util/common.h>
#include <test/util/setup_common.h>
#include <test/util/transaction_utils.h>
#include <util/fs.h>
@ -26,7 +27,6 @@
#include <util/string.h>
#include <cstdint>
#include <fstream>
#include <string>
#include <vector>

View File

@ -5,6 +5,7 @@
#include <hash.h>
#include <serialize.h>
#include <streams.h>
#include <test/util/common.h>
#include <test/util/setup_common.h>
#include <util/strencodings.h>

View File

@ -11,6 +11,7 @@
#include <serialize.h>
#include <streams.h>
#include <test/data/sighash.json.h>
#include <test/util/common.h>
#include <test/util/json.h>
#include <test/util/random.h>
#include <test/util/setup_common.h>

View File

@ -4,6 +4,7 @@
#include <common/system.h>
#include <compat/compat.h>
#include <test/util/common.h>
#include <test/util/setup_common.h>
#include <util/sock.h>
#include <util/threadinterrupt.h>

View File

@ -5,6 +5,7 @@
#include <flatfile.h>
#include <node/blockstorage.h>
#include <streams.h>
#include <test/util/common.h>
#include <test/util/random.h>
#include <test/util/setup_common.h>
#include <util/fs.h>

View File

@ -3,7 +3,7 @@
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <sync.h>
#include <test/util/setup_common.h>
#include <test/util/common.h>
#include <boost/test/unit_test.hpp>

View File

@ -6,6 +6,7 @@
#include <bitcoin-build-config.h> // IWYU pragma: keep
#include <common/run_command.h>
#include <test/util/common.h>
#include <test/util/setup_common.h>
#include <univalue.h>
#include <util/string.h>

View File

@ -5,11 +5,14 @@
#include <common/system.h>
#include <logging.h>
#include <random.h>
#include <test/util/common.h>
#include <util/string.h>
#include <util/threadpool.h>
#include <util/time.h>
#include <boost/test/unit_test.hpp>
#include <latch>
#include <semaphore>
// General test values
int NUM_WORKERS_DEFAULT = 0;
@ -35,6 +38,9 @@ struct ThreadPoolFixture {
// 8) Submit task when all threads are busy, stop pool and verify task gets executed.
// 9) Congestion test; create more workers than available cores.
// 10) Ensure Interrupt() prevents further submissions.
// 11) Start() must not cause a deadlock when called during Stop().
// 12) Ensure queued tasks complete after Interrupt().
// 13) Ensure the Stop() calling thread helps drain the queue.
BOOST_FIXTURE_TEST_SUITE(threadpool_tests, ThreadPoolFixture)
#define WAIT_FOR(futures) \
@ -51,36 +57,54 @@ template <typename F>
return std::move(*Assert(pool.Submit(std::forward<F>(fn))));
}
// Block a number of worker threads by submitting tasks that wait on `blocker_future`.
// Block a number of worker threads by submitting tasks that wait on `release_sem`.
// Returns the futures of the blocking tasks, ensuring all have started and are waiting.
std::vector<std::future<void>> BlockWorkers(ThreadPool& threadPool, const std::shared_future<void>& blocker_future, int num_of_threads_to_block)
std::vector<std::future<void>> BlockWorkers(ThreadPool& threadPool, std::counting_semaphore<>& release_sem, size_t num_of_threads_to_block)
{
// Per-thread ready promises to ensure all workers are actually blocked
std::vector<std::promise<void>> ready_promises(num_of_threads_to_block);
std::vector<std::future<void>> ready_futures;
ready_futures.reserve(num_of_threads_to_block);
for (auto& p : ready_promises) ready_futures.emplace_back(p.get_future());
// Fill all workers with blocking tasks
std::vector<std::future<void>> blocking_tasks;
for (int i = 0; i < num_of_threads_to_block; i++) {
std::promise<void>& ready = ready_promises[i];
blocking_tasks.emplace_back(Submit(threadPool, [blocker_future, &ready]() {
ready.set_value();
blocker_future.wait();
}));
}
// Wait until all threads are actually blocked
WAIT_FOR(ready_futures);
assert(threadPool.WorkersCount() >= num_of_threads_to_block);
std::latch ready{static_cast<std::ptrdiff_t>(num_of_threads_to_block)};
std::vector<std::future<void>> blocking_tasks(num_of_threads_to_block);
for (auto& f : blocking_tasks) f = Submit(threadPool, [&] {
ready.count_down();
release_sem.acquire();
});
ready.wait();
return blocking_tasks;
}
// Test 0, submit task to a non-started pool
BOOST_AUTO_TEST_CASE(submit_task_before_start_fails)
// Test 0, submit task to a non-started, interrupted, or stopped pool
BOOST_AUTO_TEST_CASE(submit_fails_with_correct_error)
{
ThreadPool threadPool(POOL_NAME);
auto res = threadPool.Submit([]{ return false; });
const auto fn_empty = [&] {};
// Never started: Inactive
auto res = threadPool.Submit(fn_empty);
BOOST_CHECK(!res);
BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "No active workers");
// Interrupted (workers still alive): Interrupted, and Start() must be rejected too
std::counting_semaphore<> blocker(0);
threadPool.Start(NUM_WORKERS_DEFAULT);
const auto blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT);
threadPool.Interrupt();
res = threadPool.Submit(fn_empty);
BOOST_CHECK(!res);
BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "Interrupted");
BOOST_CHECK_EXCEPTION(threadPool.Start(NUM_WORKERS_DEFAULT), std::runtime_error, HasReason("Thread pool has been interrupted or is stopping"));
blocker.release(NUM_WORKERS_DEFAULT);
WAIT_FOR(blocking_tasks);
// Interrupted then stopped: Inactive
threadPool.Stop();
res = threadPool.Submit(fn_empty);
BOOST_CHECK(!res);
BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "No active workers");
// Started then stopped: Inactive
threadPool.Start(NUM_WORKERS_DEFAULT);
threadPool.Stop();
res = threadPool.Submit(fn_empty);
BOOST_CHECK(!res);
BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "No active workers");
}
@ -115,10 +139,8 @@ BOOST_AUTO_TEST_CASE(single_available_worker_executes_all_tasks)
{
ThreadPool threadPool(POOL_NAME);
threadPool.Start(NUM_WORKERS_DEFAULT);
// Single blocking future for all threads
std::promise<void> blocker;
std::shared_future<void> blocker_future(blocker.get_future());
const auto blocking_tasks = BlockWorkers(threadPool, blocker_future, NUM_WORKERS_DEFAULT - 1);
std::counting_semaphore<> blocker(0);
const auto blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT - 1);
// Now execute tasks on the single available worker
// and check that all the tasks are executed.
@ -132,7 +154,7 @@ BOOST_AUTO_TEST_CASE(single_available_worker_executes_all_tasks)
WAIT_FOR(futures);
BOOST_CHECK_EQUAL(counter, num_tasks);
blocker.set_value();
blocker.release(NUM_WORKERS_DEFAULT - 1);
WAIT_FOR(blocking_tasks);
threadPool.Stop();
BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
@ -171,21 +193,17 @@ BOOST_AUTO_TEST_CASE(task_exception_propagates_to_future)
ThreadPool threadPool(POOL_NAME);
threadPool.Start(NUM_WORKERS_DEFAULT);
int num_tasks = 5;
std::string err_msg{"something wrong happened"};
const auto make_err{[&](size_t n) { return strprintf("error on thread #%s", n); }};
const int num_tasks = 5;
std::vector<std::future<void>> futures;
futures.reserve(num_tasks);
for (int i = 0; i < num_tasks; i++) {
futures.emplace_back(Submit(threadPool, [err_msg, i]() {
throw std::runtime_error(err_msg + util::ToString(i));
}));
futures.emplace_back(Submit(threadPool, [&make_err, i] { throw std::runtime_error(make_err(i)); }));
}
for (int i = 0; i < num_tasks; i++) {
BOOST_CHECK_EXCEPTION(futures.at(i).get(), std::runtime_error, [&](const std::runtime_error& e) {
BOOST_CHECK_EQUAL(e.what(), err_msg + util::ToString(i));
return true;
});
BOOST_CHECK_EXCEPTION(futures[i].get(), std::runtime_error, HasReason{make_err(i)});
}
}
@ -195,9 +213,8 @@ BOOST_AUTO_TEST_CASE(process_tasks_manually_when_workers_busy)
ThreadPool threadPool(POOL_NAME);
threadPool.Start(NUM_WORKERS_DEFAULT);
std::promise<void> blocker;
std::shared_future<void> blocker_future(blocker.get_future());
const auto& blocking_tasks = BlockWorkers(threadPool, blocker_future, NUM_WORKERS_DEFAULT);
std::counting_semaphore<> blocker(0);
const auto& blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT);
// Now submit tasks and check that none of them are executed.
int num_tasks = 20;
@ -216,7 +233,7 @@ BOOST_AUTO_TEST_CASE(process_tasks_manually_when_workers_busy)
}
BOOST_CHECK_EQUAL(counter.load(), num_tasks);
BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
blocker.set_value();
blocker.release(NUM_WORKERS_DEFAULT);
threadPool.Stop();
WAIT_FOR(blocking_tasks);
}
@ -244,9 +261,8 @@ BOOST_AUTO_TEST_CASE(task_submitted_while_busy_completes)
ThreadPool threadPool(POOL_NAME);
threadPool.Start(NUM_WORKERS_DEFAULT);
std::promise<void> blocker;
std::shared_future<void> blocker_future(blocker.get_future());
const auto& blocking_tasks = BlockWorkers(threadPool, blocker_future, NUM_WORKERS_DEFAULT);
std::counting_semaphore<> blocker(0);
const auto& blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT);
// Submit an extra task that should execute once a worker is free
std::future<bool> future = Submit(threadPool, []() { return true; });
@ -257,7 +273,7 @@ BOOST_AUTO_TEST_CASE(task_submitted_while_busy_completes)
// Wait a short moment before unblocking the threads to mimic a concurrent shutdown
std::thread thread_unblocker([&blocker]() {
UninterruptibleSleep(300ms);
blocker.set_value();
blocker.release(NUM_WORKERS_DEFAULT);
});
// Stop the pool while the workers are still blocked
@ -314,13 +330,13 @@ BOOST_AUTO_TEST_CASE(interrupt_blocks_new_submissions)
// One worker is blocked, another calls Interrupt(), and the remaining one waits for tasks.
threadPool.Start(/*num_workers=*/3);
std::atomic<int> counter{0};
std::promise<void> blocker;
const auto blocking_tasks = BlockWorkers(threadPool, blocker.get_future().share(), 1);
std::counting_semaphore<> blocker(0);
const auto blocking_tasks = BlockWorkers(threadPool, blocker, 1);
Submit(threadPool, [&threadPool, &counter]{
threadPool.Interrupt();
counter.fetch_add(1, std::memory_order_relaxed);
}).get();
blocker.set_value(); // unblock worker
blocker.release(1); // unblock worker
BOOST_CHECK_EQUAL(counter.load(), 1);
threadPool.Stop();
@ -328,4 +344,102 @@ BOOST_AUTO_TEST_CASE(interrupt_blocks_new_submissions)
BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
}
// Test 11, Start() must not cause a deadlock when called during Stop()
BOOST_AUTO_TEST_CASE(start_mid_stop_does_not_deadlock)
{
ThreadPool threadPool(POOL_NAME);
threadPool.Start(NUM_WORKERS_DEFAULT);
// Keep all workers busy so Stop() gets stuck waiting for them to finish during join()
std::counting_semaphore<> workers_blocker(0);
const auto blocking_tasks = BlockWorkers(threadPool, workers_blocker, NUM_WORKERS_DEFAULT);
std::thread stopper_thread([&threadPool] { threadPool.Stop(); });
// Stop() takes ownership of the workers before joining them, so WorkersCount()
// hits 0 the moment Stop() is waiting for them to join. That is our signal
// to call Start() right into the middle of the joining phase.
while (threadPool.WorkersCount() != 0) {
std::this_thread::yield(); // let the OS breathe so it can switch context
}
// Now we know for sure the stopper thread is hanging while workers are still alive.
// Restart the pool and resume workers so the stopper thread can proceed.
// This will throw an exception only if the pool handles Start-Stop race properly,
// otherwise it will proceed and hang the stopper_thread.
try {
threadPool.Start(NUM_WORKERS_DEFAULT);
} catch (std::exception& e) {
BOOST_CHECK_EQUAL(e.what(), "Thread pool has been interrupted or is stopping");
}
workers_blocker.release(NUM_WORKERS_DEFAULT);
WAIT_FOR(blocking_tasks);
// If Stop() is stuck, joining the stopper thread will deadlock
stopper_thread.join();
}
// Test 12, queued tasks complete after Interrupt()
BOOST_AUTO_TEST_CASE(queued_tasks_complete_after_interrupt)
{
ThreadPool threadPool(POOL_NAME);
threadPool.Start(NUM_WORKERS_DEFAULT);
std::counting_semaphore<> blocker(0);
const auto blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT);
// Queue tasks while all workers are busy, then interrupt
std::atomic<int> counter{0};
const int num_tasks = 10;
std::vector<std::future<void>> futures;
futures.reserve(num_tasks);
for (int i = 0; i < num_tasks; i++) {
futures.emplace_back(Submit(threadPool, [&counter]{ counter.fetch_add(1, std::memory_order_relaxed); }));
}
threadPool.Interrupt();
// Queued tasks must still complete despite the interrupt
blocker.release(NUM_WORKERS_DEFAULT);
WAIT_FOR(futures);
BOOST_CHECK_EQUAL(counter.load(), num_tasks);
threadPool.Stop();
WAIT_FOR(blocking_tasks);
}
// Test 13, ensure the Stop() calling thread helps drain the queue
BOOST_AUTO_TEST_CASE(stop_active_wait_drains_queue)
{
ThreadPool threadPool(POOL_NAME);
threadPool.Start(NUM_WORKERS_DEFAULT);
std::counting_semaphore<> blocker(0);
const auto blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT);
auto main_thread_id = std::this_thread::get_id();
std::atomic<int> main_thread_tasks{0};
const size_t num_tasks = 20;
for (size_t i = 0; i < num_tasks; i++) {
(void)Submit(threadPool, [&main_thread_tasks, main_thread_id]() {
if (std::this_thread::get_id() == main_thread_id)
main_thread_tasks.fetch_add(1, std::memory_order_relaxed);
});
}
BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), num_tasks);
// Delay release so Stop() drain all tasks from the calling thread
std::thread unblocker([&blocker, &threadPool]() {
while (threadPool.WorkQueueSize() > 0) {
std::this_thread::yield();
}
blocker.release(NUM_WORKERS_DEFAULT);
});
threadPool.Stop();
unblocker.join();
// Check the main thread processed all tasks
BOOST_CHECK_EQUAL(main_thread_tasks.load(), num_tasks);
WAIT_FOR(blocking_tasks);
}
BOOST_AUTO_TEST_SUITE_END()

View File

@ -26,6 +26,7 @@
#include <script/signingprovider.h>
#include <script/solver.h>
#include <streams.h>
#include <test/util/common.h>
#include <test/util/json.h>
#include <test/util/random.h>
#include <test/util/script.h>

View File

@ -8,6 +8,7 @@
#include <node/txdownloadman_impl.h>
#include <primitives/transaction.h>
#include <script/script.h>
#include <test/util/common.h>
#include <test/util/random.h>
#include <test/util/setup_common.h>
#include <validation.h>

View File

@ -3,6 +3,7 @@
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <index/txospenderindex.h>
#include <test/util/common.h>
#include <test/util/setup_common.h>
#include <validation.h>

View File

@ -11,6 +11,7 @@
#include <script/script.h>
#include <serialize.h>
#include <streams.h>
#include <test/util/common.h>
#include <test/util/random.h>
#include <test/util/script.h>
#include <test/util/setup_common.h>

View File

@ -4,6 +4,7 @@
#include <node/txreconciliation.h>
#include <test/util/common.h>
#include <test/util/setup_common.h>
#include <boost/test/unit_test.hpp>

View File

@ -11,6 +11,7 @@
#include <primitives/transaction.h>
#include <random.h>
#include <script/script.h>
#include <test/util/common.h>
#include <test/util/setup_common.h>
#include <test/util/txmempool.h>
#include <validation.h>

View File

@ -4,7 +4,7 @@
#include <primitives/transaction_identifier.h>
#include <streams.h>
#include <test/util/setup_common.h>
#include <test/util/common.h>
#include <uint256.h>
#include <util/strencodings.h>

55
src/test/util/common.h Normal file
View File

@ -0,0 +1,55 @@
// Copyright (c) The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or https://www.opensource.org/licenses/mit-license.php.
#ifndef BITCOIN_TEST_UTIL_COMMON_H
#define BITCOIN_TEST_UTIL_COMMON_H
#include <ostream>
#include <optional>
#include <string>
/**
* BOOST_CHECK_EXCEPTION predicates to check the specific validation error.
* Use as
* BOOST_CHECK_EXCEPTION(code that throws, exception type, HasReason("foo"));
*/
class HasReason
{
public:
explicit HasReason(std::string_view reason) : m_reason(reason) {}
bool operator()(std::string_view s) const { return s.find(m_reason) != std::string_view::npos; }
bool operator()(const std::exception& e) const { return (*this)(e.what()); }
private:
const std::string m_reason;
};
// Make types usable in BOOST_CHECK_* @{
namespace std {
template <typename T> requires std::is_enum_v<T>
inline std::ostream& operator<<(std::ostream& os, const T& e)
{
return os << static_cast<std::underlying_type_t<T>>(e);
}
template <typename T>
inline std::ostream& operator<<(std::ostream& os, const std::optional<T>& v)
{
return v ? os << *v
: os << "std::nullopt";
}
} // namespace std
template <typename T>
concept HasToString = requires(const T& t) { t.ToString(); };
template <HasToString T>
inline std::ostream& operator<<(std::ostream& os, const T& obj)
{
return os << obj.ToString();
}
// @}
#endif // BITCOIN_TEST_UTIL_COMMON_H

View File

@ -620,26 +620,3 @@ CBlock getBlock13b8a()
stream >> TX_WITH_WITNESS(block);
return block;
}
std::ostream& operator<<(std::ostream& os, const arith_uint256& num)
{
return os << num.ToString();
}
std::ostream& operator<<(std::ostream& os, const uint160& num)
{
return os << num.ToString();
}
std::ostream& operator<<(std::ostream& os, const uint256& num)
{
return os << num.ToString();
}
std::ostream& operator<<(std::ostream& os, const Txid& txid) {
return os << txid.ToString();
}
std::ostream& operator<<(std::ostream& os, const Wtxid& wtxid) {
return os << wtxid.ToString();
}

View File

@ -261,43 +261,4 @@ std::unique_ptr<T> MakeNoLogFileContext(const ChainType chain_type = ChainType::
CBlock getBlock13b8a();
// Make types usable in BOOST_CHECK_* @{
namespace std {
template <typename T> requires std::is_enum_v<T>
inline std::ostream& operator<<(std::ostream& os, const T& e)
{
return os << static_cast<std::underlying_type_t<T>>(e);
}
template <typename T>
inline std::ostream& operator<<(std::ostream& os, const std::optional<T>& v)
{
return v ? os << *v
: os << "std::nullopt";
}
} // namespace std
std::ostream& operator<<(std::ostream& os, const arith_uint256& num);
std::ostream& operator<<(std::ostream& os, const uint160& num);
std::ostream& operator<<(std::ostream& os, const uint256& num);
std::ostream& operator<<(std::ostream& os, const Txid& txid);
std::ostream& operator<<(std::ostream& os, const Wtxid& wtxid);
// @}
/**
* BOOST_CHECK_EXCEPTION predicates to check the specific validation error.
* Use as
* BOOST_CHECK_EXCEPTION(code that throws, exception type, HasReason("foo"));
*/
class HasReason
{
public:
explicit HasReason(std::string_view reason) : m_reason(reason) {}
bool operator()(std::string_view s) const { return s.find(m_reason) != std::string_view::npos; }
bool operator()(const std::exception& e) const { return (*this)(e.what()); }
private:
const std::string m_reason;
};
#endif // BITCOIN_TEST_UTIL_SETUP_COMMON_H

View File

@ -5,7 +5,7 @@
#include <util/check.h>
#include <boost/test/unit_test.hpp>
#include <test/util/setup_common.h>
#include <test/util/common.h>
BOOST_AUTO_TEST_SUITE(util_check_tests)

View File

@ -7,7 +7,8 @@
#include <vector>
#include <boost/test/unit_test.hpp>
#include <test/util/setup_common.h>
#include <test/util/common.h>
#include <tinyformat.h>
using namespace util;
using util::detail::CheckNumFormatSpecifiers;

View File

@ -9,6 +9,7 @@
#include <script/parsing.h>
#include <span.h>
#include <sync.h>
#include <test/util/common.h>
#include <test/util/random.h>
#include <test/util/setup_common.h>
#include <uint256.h>

View File

@ -10,6 +10,7 @@
#include <node/miner.h>
#include <pow.h>
#include <random.h>
#include <test/util/common.h>
#include <test/util/random.h>
#include <test/util/script.h>
#include <test/util/setup_common.h>

View File

@ -11,6 +11,7 @@
#include <script/script.h>
#include <sync.h>
#include <test/util/chainstate.h>
#include <test/util/common.h>
#include <test/util/coins.h>
#include <test/util/random.h>
#include <test/util/setup_common.h>

View File

@ -12,6 +12,7 @@
#include <rpc/blockchain.h>
#include <sync.h>
#include <test/util/chainstate.h>
#include <test/util/common.h>
#include <test/util/logging.h>
#include <test/util/random.h>
#include <test/util/setup_common.h>

View File

@ -5,6 +5,7 @@
#include <sync.h>
#include <test/util/coins.h>
#include <test/util/random.h>
#include <test/util/common.h>
#include <test/util/setup_common.h>
#include <validation.h>

View File

@ -6,6 +6,7 @@
#include <chainparams.h>
#include <consensus/params.h>
#include <test/util/random.h>
#include <test/util/common.h>
#include <test/util/setup_common.h>
#include <util/chaintype.h>
#include <versionbits.h>

View File

@ -105,8 +105,8 @@ public:
{
assert(num_workers > 0);
LOCK(m_mutex);
if (m_interrupt) throw std::runtime_error("Thread pool has been interrupted or is stopping");
if (!m_workers.empty()) throw std::runtime_error("Thread pool already started");
m_interrupt = false; // Reset
// Create workers
m_workers.reserve(num_workers);
@ -122,6 +122,7 @@ public:
* Any remaining tasks in the queue will be processed before returning.
*
* Must be called from a controller (non-worker) thread.
* Concurrent calls to Start() will be rejected while Stop() is in progress.
*/
void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
@ -138,10 +139,16 @@ public:
threads_to_join.swap(m_workers);
}
m_cv.notify_all();
// Help draining queue
while (ProcessTask()) {}
// Free resources
for (auto& worker : threads_to_join) worker.join();
// Since we currently wait for tasks completion, sanity-check empty queue
WITH_LOCK(m_mutex, Assume(m_work_queue.empty()));
// Note: m_interrupt is left true until next Start()
LOCK(m_mutex);
Assume(m_work_queue.empty());
// Re-allow Start() now that all workers have exited
m_interrupt = false;
}
enum class SubmitError {
@ -183,18 +190,19 @@ public:
* @brief Execute a single queued task synchronously.
* Removes one task from the queue and executes it on the calling thread.
*/
void ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
bool ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
std::packaged_task<void()> task;
{
LOCK(m_mutex);
if (m_work_queue.empty()) return;
if (m_work_queue.empty()) return false;
// Pop the task
task = std::move(m_work_queue.front());
m_work_queue.pop();
}
task();
return true;
}
/**
@ -202,6 +210,10 @@ public:
*
* Wakes all worker threads so they can drain the queue and exit.
* Unlike Stop(), this function does not wait for threads to finish.
*
* Note: The next step in the pool lifecycle is calling Stop(), which
* releases any dangling resources and resets the pool state
* for shutdown or restart.
*/
void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{

View File

@ -7,6 +7,7 @@
#include <policy/policy.h>
#include <primitives/transaction.h>
#include <random.h>
#include <test/util/common.h>
#include <test/util/setup_common.h>
#include <util/translation.h>
#include <wallet/coincontrol.h>

View File

@ -4,6 +4,7 @@
#include <boost/test/unit_test.hpp>
#include <test/util/common.h>
#include <test/util/setup_common.h>
#include <util/check.h>
#include <util/fs.h>
@ -14,7 +15,6 @@
#include <wallet/walletutil.h>
#include <cstddef>
#include <fstream>
#include <memory>
#include <span>
#include <string>

View File

@ -17,6 +17,7 @@
#include <policy/policy.h>
#include <rpc/server.h>
#include <script/solver.h>
#include <test/util/common.h>
#include <test/util/logging.h>
#include <test/util/random.h>
#include <test/util/setup_common.h>

View File

@ -4,6 +4,7 @@
#include <wallet/transaction.h>
#include <test/util/common.h>
#include <wallet/test/wallet_test_fixture.h>
#include <boost/test/unit_test.hpp>

View File

@ -4,6 +4,7 @@
#include <wallet/test/util.h>
#include <wallet/wallet.h>
#include <test/util/common.h>
#include <test/util/logging.h>
#include <test/util/setup_common.h>

View File

@ -191,7 +191,7 @@ class AuthServiceProxy():
content_type = http_response.getheader('Content-Type')
if content_type != 'application/json':
raise JSONRPCException(
{'code': -342, 'message': 'non-JSON HTTP response with \'%i %s\' from server' % (http_response.status, http_response.reason)},
{'code': -342, 'message': f"non-JSON HTTP response with \'{http_response.status} {http_response.reason}\' from server: {http_response.read().decode()}"},
http_response.status)
data = http_response.read()