mirror of
https://github.com/bitcoin/bitcoin.git
synced 2026-03-16 16:32:47 +00:00
Merge bitcoin/bitcoin#34576: threadpool: add ranged Submit overload
79571b918130e66436b2d43489835c38bb3ae3e3 threadpool: add ranged Submit overload (Andrew Toth) Pull request description: The current `ThreadPool::Submit` is not very efficient when we have a use case where we need to submit multiple tasks immediately. The `Submit` method must take the lock for each task, and notifies only a single worker thread. This will cause lock contention with the awakened worker thread trying to take the lock and the caller trying to submit the next task. Introduce a `Submit` overload, which takes the lock once and submits a range of tasks, then notifies all worker threads after the lock is released. This is needed for #31132 to be able to use `ThreadPool`. ACKs for top commit: l0rinc: ACK 79571b918130e66436b2d43489835c38bb3ae3e3 rkrux: ACK 79571b9 sedited: Re-ACK 79571b918130e66436b2d43489835c38bb3ae3e3 willcl-ark: ACK 79571b918130e66436b2d43489835c38bb3ae3e3 Tree-SHA512: 1fbe0c150f01b9ea5be3459cd10b817045af52eaf6f14a1a298a68853890da4033c1b21bdc6f995bb55029fb4ab536e9dbf58d98e2e1e12b25298fa3470b4ba6
This commit is contained in:
commit
524aa1e533
@ -11,7 +11,11 @@
|
||||
#include <util/time.h>
|
||||
|
||||
#include <boost/test/unit_test.hpp>
|
||||
|
||||
#include <array>
|
||||
#include <functional>
|
||||
#include <latch>
|
||||
#include <ranges>
|
||||
#include <semaphore>
|
||||
|
||||
// General test values
|
||||
@ -41,6 +45,7 @@ struct ThreadPoolFixture {
|
||||
// 11) Start() must not cause a deadlock when called during Stop().
|
||||
// 12) Ensure queued tasks complete after Interrupt().
|
||||
// 13) Ensure the Stop() calling thread helps drain the queue.
|
||||
// 14) Submit range of tasks in one lock acquisition.
|
||||
BOOST_FIXTURE_TEST_SUITE(threadpool_tests, ThreadPoolFixture)
|
||||
|
||||
#define WAIT_FOR(futures) \
|
||||
@ -107,6 +112,11 @@ BOOST_AUTO_TEST_CASE(submit_fails_with_correct_error)
|
||||
res = threadPool.Submit(fn_empty);
|
||||
BOOST_CHECK(!res);
|
||||
BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "No active workers");
|
||||
|
||||
std::vector<std::function<void()>> tasks;
|
||||
const auto range_res{threadPool.Submit(std::move(tasks))};
|
||||
BOOST_CHECK(!range_res);
|
||||
BOOST_CHECK_EQUAL(SubmitErrorString(range_res.error()), "No active workers");
|
||||
}
|
||||
|
||||
// Test 1, submit tasks and verify completion
|
||||
@ -323,6 +333,11 @@ BOOST_AUTO_TEST_CASE(interrupt_blocks_new_submissions)
|
||||
BOOST_CHECK(!res);
|
||||
BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "Interrupted");
|
||||
|
||||
std::vector<std::function<void()>> tasks;
|
||||
const auto range_res{threadPool.Submit(std::move(tasks))};
|
||||
BOOST_CHECK(!range_res);
|
||||
BOOST_CHECK_EQUAL(SubmitErrorString(range_res.error()), "Interrupted");
|
||||
|
||||
// Reset pool
|
||||
threadPool.Stop();
|
||||
|
||||
@ -442,4 +457,43 @@ BOOST_AUTO_TEST_CASE(stop_active_wait_drains_queue)
|
||||
WAIT_FOR(blocking_tasks);
|
||||
}
|
||||
|
||||
// Test 14, submit range of tasks in one lock acquisition
|
||||
BOOST_AUTO_TEST_CASE(submit_range_of_tasks_complete_successfully)
|
||||
{
|
||||
constexpr int32_t num_tasks{50};
|
||||
|
||||
ThreadPool threadPool{POOL_NAME};
|
||||
threadPool.Start(NUM_WORKERS_DEFAULT);
|
||||
std::atomic_int32_t sum{0};
|
||||
const auto square{[&sum](int32_t i) {
|
||||
sum.fetch_add(i, std::memory_order_relaxed);
|
||||
return i * i;
|
||||
}};
|
||||
|
||||
std::array<std::function<int32_t()>, static_cast<size_t>(num_tasks)> array_tasks;
|
||||
std::vector<std::function<int32_t()>> vector_tasks;
|
||||
vector_tasks.reserve(static_cast<size_t>(num_tasks));
|
||||
for (const auto i : std::views::iota(int32_t{1}, num_tasks + 1)) {
|
||||
array_tasks.at(static_cast<size_t>(i - 1)) = [i, square] { return square(i); };
|
||||
vector_tasks.emplace_back([i, square] { return square(i); });
|
||||
}
|
||||
|
||||
auto futures{std::move(*Assert(threadPool.Submit(std::move(array_tasks))))};
|
||||
BOOST_CHECK_EQUAL(futures.size(), static_cast<size_t>(num_tasks));
|
||||
std::ranges::move(*Assert(threadPool.Submit(std::move(vector_tasks))), std::back_inserter(futures));
|
||||
BOOST_CHECK_EQUAL(futures.size(), static_cast<size_t>(num_tasks * 2));
|
||||
|
||||
auto squares_sum{0};
|
||||
for (auto& future : futures) {
|
||||
squares_sum += future.get();
|
||||
}
|
||||
|
||||
// 2x Gauss sum.
|
||||
const auto expected_sum{2 * ((num_tasks * (num_tasks + 1)) / 2)};
|
||||
const auto expected_squares_sum{2 * ((num_tasks * (num_tasks + 1) * ((num_tasks * 2) + 1)) / 6)};
|
||||
BOOST_CHECK_EQUAL(sum, expected_sum);
|
||||
BOOST_CHECK_EQUAL(squares_sum, expected_squares_sum);
|
||||
BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_SUITE_END()
|
||||
|
||||
@ -7,8 +7,8 @@
|
||||
|
||||
#include <sync.h>
|
||||
#include <tinyformat.h>
|
||||
#include <util/expected.h>
|
||||
#include <util/check.h>
|
||||
#include <util/expected.h>
|
||||
#include <util/thread.h>
|
||||
|
||||
#include <algorithm>
|
||||
@ -16,6 +16,7 @@
|
||||
#include <functional>
|
||||
#include <future>
|
||||
#include <queue>
|
||||
#include <ranges>
|
||||
#include <thread>
|
||||
#include <type_traits>
|
||||
#include <utility>
|
||||
@ -156,6 +157,15 @@ public:
|
||||
Interrupted,
|
||||
};
|
||||
|
||||
template <class F>
|
||||
using Future = std::future<std::invoke_result_t<F>>;
|
||||
|
||||
template <class R>
|
||||
using RangeFuture = Future<std::ranges::range_reference_t<R>>;
|
||||
|
||||
template <class F>
|
||||
using PackagedTask = std::packaged_task<std::invoke_result_t<F>()>;
|
||||
|
||||
/**
|
||||
* @brief Enqueues a new task for asynchronous execution.
|
||||
*
|
||||
@ -171,9 +181,9 @@ public:
|
||||
* uncaught exceptions, as they would otherwise be silently discarded.
|
||||
*/
|
||||
template <class F>
|
||||
[[nodiscard]] util::Expected<std::future<std::invoke_result_t<F>>, SubmitError> Submit(F&& fn) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
|
||||
[[nodiscard]] util::Expected<Future<F>, SubmitError> Submit(F&& fn) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
|
||||
{
|
||||
std::packaged_task<std::invoke_result_t<F>()> task{std::forward<F>(fn)};
|
||||
PackagedTask<F> task{std::forward<F>(fn)};
|
||||
auto future{task.get_future()};
|
||||
{
|
||||
LOCK(m_mutex);
|
||||
@ -186,6 +196,46 @@ public:
|
||||
return {std::move(future)};
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Enqueues a range of tasks for asynchronous execution.
|
||||
*
|
||||
* @param fns Callables to execute asynchronously.
|
||||
* @return On success, a vector of futures containing each element of fns's result in order.
|
||||
* On failure, an error indicating why the range was rejected:
|
||||
* - SubmitError::Inactive: Pool has no workers (never started or already stopped).
|
||||
* - SubmitError::Interrupted: Pool task acceptance has been interrupted.
|
||||
*
|
||||
* This is more efficient when submitting many tasks at once, since
|
||||
* the queue lock is only taken once internally and all worker threads are
|
||||
* notified. For single tasks, Submit() is preferred since only one worker
|
||||
* thread is notified.
|
||||
*
|
||||
* Thread-safe: Can be called from any thread, including within submitted callables.
|
||||
*
|
||||
* @warning Ignoring the returned futures requires guarding tasks against
|
||||
* uncaught exceptions, as they would otherwise be silently discarded.
|
||||
*/
|
||||
template <std::ranges::sized_range R>
|
||||
requires(!std::is_lvalue_reference_v<R>)
|
||||
[[nodiscard]] util::Expected<std::vector<RangeFuture<R>>, SubmitError> Submit(R&& fns) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
|
||||
{
|
||||
std::vector<RangeFuture<R>> futures;
|
||||
futures.reserve(std::ranges::size(fns));
|
||||
|
||||
{
|
||||
LOCK(m_mutex);
|
||||
if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive};
|
||||
if (m_interrupt) return util::Unexpected{SubmitError::Interrupted};
|
||||
for (auto&& fn : fns) {
|
||||
PackagedTask<std::ranges::range_reference_t<R>> task{std::move(fn)};
|
||||
futures.emplace_back(task.get_future());
|
||||
m_work_queue.emplace(std::move(task));
|
||||
}
|
||||
}
|
||||
m_cv.notify_all();
|
||||
return {std::move(futures)};
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Execute a single queued task synchronously.
|
||||
* Removes one task from the queue and executes it on the calling thread.
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user