From 79571b918130e66436b2d43489835c38bb3ae3e3 Mon Sep 17 00:00:00 2001 From: Andrew Toth Date: Thu, 12 Feb 2026 12:16:50 -0500 Subject: [PATCH] threadpool: add ranged Submit overload Co-authored-by: l0rinc --- src/test/threadpool_tests.cpp | 54 +++++++++++++++++++++++++++++++++ src/util/threadpool.h | 56 +++++++++++++++++++++++++++++++++-- 2 files changed, 107 insertions(+), 3 deletions(-) diff --git a/src/test/threadpool_tests.cpp b/src/test/threadpool_tests.cpp index c5e3974f7a1..9af5348cd94 100644 --- a/src/test/threadpool_tests.cpp +++ b/src/test/threadpool_tests.cpp @@ -11,7 +11,11 @@ #include #include + +#include +#include #include +#include #include // General test values @@ -41,6 +45,7 @@ struct ThreadPoolFixture { // 11) Start() must not cause a deadlock when called during Stop(). // 12) Ensure queued tasks complete after Interrupt(). // 13) Ensure the Stop() calling thread helps drain the queue. +// 14) Submit range of tasks in one lock acquisition. BOOST_FIXTURE_TEST_SUITE(threadpool_tests, ThreadPoolFixture) #define WAIT_FOR(futures) \ @@ -107,6 +112,11 @@ BOOST_AUTO_TEST_CASE(submit_fails_with_correct_error) res = threadPool.Submit(fn_empty); BOOST_CHECK(!res); BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "No active workers"); + + std::vector> tasks; + const auto range_res{threadPool.Submit(std::move(tasks))}; + BOOST_CHECK(!range_res); + BOOST_CHECK_EQUAL(SubmitErrorString(range_res.error()), "No active workers"); } // Test 1, submit tasks and verify completion @@ -323,6 +333,11 @@ BOOST_AUTO_TEST_CASE(interrupt_blocks_new_submissions) BOOST_CHECK(!res); BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "Interrupted"); + std::vector> tasks; + const auto range_res{threadPool.Submit(std::move(tasks))}; + BOOST_CHECK(!range_res); + BOOST_CHECK_EQUAL(SubmitErrorString(range_res.error()), "Interrupted"); + // Reset pool threadPool.Stop(); @@ -442,4 +457,43 @@ BOOST_AUTO_TEST_CASE(stop_active_wait_drains_queue) WAIT_FOR(blocking_tasks); } +// Test 14, submit range of tasks in one lock acquisition +BOOST_AUTO_TEST_CASE(submit_range_of_tasks_complete_successfully) +{ + constexpr int32_t num_tasks{50}; + + ThreadPool threadPool{POOL_NAME}; + threadPool.Start(NUM_WORKERS_DEFAULT); + std::atomic_int32_t sum{0}; + const auto square{[&sum](int32_t i) { + sum.fetch_add(i, std::memory_order_relaxed); + return i * i; + }}; + + std::array, static_cast(num_tasks)> array_tasks; + std::vector> vector_tasks; + vector_tasks.reserve(static_cast(num_tasks)); + for (const auto i : std::views::iota(int32_t{1}, num_tasks + 1)) { + array_tasks.at(static_cast(i - 1)) = [i, square] { return square(i); }; + vector_tasks.emplace_back([i, square] { return square(i); }); + } + + auto futures{std::move(*Assert(threadPool.Submit(std::move(array_tasks))))}; + BOOST_CHECK_EQUAL(futures.size(), static_cast(num_tasks)); + std::ranges::move(*Assert(threadPool.Submit(std::move(vector_tasks))), std::back_inserter(futures)); + BOOST_CHECK_EQUAL(futures.size(), static_cast(num_tasks * 2)); + + auto squares_sum{0}; + for (auto& future : futures) { + squares_sum += future.get(); + } + + // 2x Gauss sum. + const auto expected_sum{2 * ((num_tasks * (num_tasks + 1)) / 2)}; + const auto expected_squares_sum{2 * ((num_tasks * (num_tasks + 1) * ((num_tasks * 2) + 1)) / 6)}; + BOOST_CHECK_EQUAL(sum, expected_sum); + BOOST_CHECK_EQUAL(squares_sum, expected_squares_sum); + BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0); +} + BOOST_AUTO_TEST_SUITE_END() diff --git a/src/util/threadpool.h b/src/util/threadpool.h index ca39afd4ab2..ff0590f860a 100644 --- a/src/util/threadpool.h +++ b/src/util/threadpool.h @@ -7,8 +7,8 @@ #include #include -#include #include +#include #include #include @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -156,6 +157,15 @@ public: Interrupted, }; + template + using Future = std::future>; + + template + using RangeFuture = Future>; + + template + using PackagedTask = std::packaged_task()>; + /** * @brief Enqueues a new task for asynchronous execution. * @@ -171,9 +181,9 @@ public: * uncaught exceptions, as they would otherwise be silently discarded. */ template - [[nodiscard]] util::Expected>, SubmitError> Submit(F&& fn) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) + [[nodiscard]] util::Expected, SubmitError> Submit(F&& fn) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) { - std::packaged_task()> task{std::forward(fn)}; + PackagedTask task{std::forward(fn)}; auto future{task.get_future()}; { LOCK(m_mutex); @@ -186,6 +196,46 @@ public: return {std::move(future)}; } + /** + * @brief Enqueues a range of tasks for asynchronous execution. + * + * @param fns Callables to execute asynchronously. + * @return On success, a vector of futures containing each element of fns's result in order. + * On failure, an error indicating why the range was rejected: + * - SubmitError::Inactive: Pool has no workers (never started or already stopped). + * - SubmitError::Interrupted: Pool task acceptance has been interrupted. + * + * This is more efficient when submitting many tasks at once, since + * the queue lock is only taken once internally and all worker threads are + * notified. For single tasks, Submit() is preferred since only one worker + * thread is notified. + * + * Thread-safe: Can be called from any thread, including within submitted callables. + * + * @warning Ignoring the returned futures requires guarding tasks against + * uncaught exceptions, as they would otherwise be silently discarded. + */ + template + requires(!std::is_lvalue_reference_v) + [[nodiscard]] util::Expected>, SubmitError> Submit(R&& fns) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) + { + std::vector> futures; + futures.reserve(std::ranges::size(fns)); + + { + LOCK(m_mutex); + if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive}; + if (m_interrupt) return util::Unexpected{SubmitError::Interrupted}; + for (auto&& fn : fns) { + PackagedTask> task{std::move(fn)}; + futures.emplace_back(task.get_future()); + m_work_queue.emplace(std::move(task)); + } + } + m_cv.notify_all(); + return {std::move(futures)}; + } + /** * @brief Execute a single queued task synchronously. * Removes one task from the queue and executes it on the calling thread.