From 38fd85c676a072ebf256e806beda9d7533790baa Mon Sep 17 00:00:00 2001 From: furszy Date: Wed, 28 Jan 2026 15:08:01 -0500 Subject: [PATCH] http: replace WorkQueue and threads handling for ThreadPool Replace the HTTP server's WorkQueue implementation and single threads handling code with ThreadPool for processing HTTP requests. The ThreadPool class encapsulates all this functionality on a reusable class, properly unit and fuzz tested (the previous code was not unit nor fuzz tested at all). This cleanly separates responsibilities: The HTTP server now focuses solely on receiving and dispatching requests, while ThreadPool handles concurrency, queuing, and execution. It simplifies init, shutdown and requests tracking. This also allows us to experiment with further performance improvements at the task queuing and execution level, such as a lock-free structure, task prioritization or any other performance improvement in the future, without having to deal with HTTP code that lives on a different layer. --- src/httpserver.cpp | 140 ++++++++------------------------------------- src/httpserver.h | 9 --- 2 files changed, 24 insertions(+), 125 deletions(-) diff --git a/src/httpserver.cpp b/src/httpserver.cpp index 61df454af8e..b6a74d1c015 100644 --- a/src/httpserver.cpp +++ b/src/httpserver.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include @@ -49,83 +50,6 @@ using common::InvalidPortErrMsg; /** Maximum size of http request (request line + headers) */ static const size_t MAX_HEADERS_SIZE = 8192; -/** HTTP request work item */ -class HTTPWorkItem final : public HTTPClosure -{ -public: - HTTPWorkItem(std::unique_ptr _req, const std::string &_path, const HTTPRequestHandler& _func): - req(std::move(_req)), path(_path), func(_func) - { - } - void operator()() override - { - func(req.get(), path); - } - - std::unique_ptr req; - -private: - std::string path; - HTTPRequestHandler func; -}; - -/** Simple work queue for distributing work over multiple threads. - * Work items are simply callable objects. - */ -template -class WorkQueue -{ -private: - Mutex cs; - std::condition_variable cond GUARDED_BY(cs); - std::deque> queue GUARDED_BY(cs); - bool running GUARDED_BY(cs){true}; - const size_t maxDepth; - -public: - explicit WorkQueue(size_t _maxDepth) : maxDepth(_maxDepth) - { - } - /** Precondition: worker threads have all stopped (they have been joined). - */ - ~WorkQueue() = default; - /** Enqueue a work item */ - bool Enqueue(WorkItem* item) EXCLUSIVE_LOCKS_REQUIRED(!cs) - { - LOCK(cs); - if (!running || queue.size() >= maxDepth) { - return false; - } - queue.emplace_back(std::unique_ptr(item)); - cond.notify_one(); - return true; - } - /** Thread function */ - void Run() EXCLUSIVE_LOCKS_REQUIRED(!cs) - { - while (true) { - std::unique_ptr i; - { - WAIT_LOCK(cs, lock); - while (running && queue.empty()) - cond.wait(lock); - if (!running && queue.empty()) - break; - i = std::move(queue.front()); - queue.pop_front(); - } - (*i)(); - } - } - /** Interrupt and exit loops */ - void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!cs) - { - LOCK(cs); - running = false; - cond.notify_all(); - } -}; - struct HTTPPathHandler { HTTPPathHandler(std::string _prefix, bool _exactMatch, HTTPRequestHandler _handler): @@ -145,13 +69,14 @@ static struct event_base* eventBase = nullptr; static struct evhttp* eventHTTP = nullptr; //! List of subnets to allow RPC connections from static std::vector rpc_allow_subnets; -//! Work queue for handling longer requests off the event loop thread -static std::unique_ptr> g_work_queue{nullptr}; //! Handlers for (sub)paths static GlobalMutex g_httppathhandlers_mutex; static std::vector pathHandlers GUARDED_BY(g_httppathhandlers_mutex); //! Bound listening sockets static std::vector boundSockets; +//! Http thread pool - future: encapsulate in HttpContext +static ThreadPool g_threadpool_http("http"); +static int g_max_queue_depth{100}; /** * @brief Helps keep track of open `evhttp_connection`s with active `evhttp_requests` @@ -327,10 +252,17 @@ static void http_request_cb(struct evhttp_request* req, void* arg) // Dispatch to worker thread if (i != iend) { - auto item = std::make_unique(std::move(hreq), path, [fn = i->handler](HTTPRequest* req, const std::string& path_inner) { + if (static_cast(g_threadpool_http.WorkQueueSize()) >= g_max_queue_depth) { + LogWarning("Request rejected because http work queue depth exceeded, it can be increased with the -rpcworkqueue= setting"); + hreq->WriteReply(HTTP_SERVICE_UNAVAILABLE, "Work queue depth exceeded"); + return; + } + + auto item = [req = std::move(hreq), in_path = std::move(path), fn = i->handler]() { std::string err_msg; try { - return fn(req, path_inner); + fn(req.get(), in_path); + return; } catch (const std::exception& e) { LogWarning("Unexpected error while processing request for '%s'. Error msg: '%s'", req->GetURI(), e.what()); err_msg = e.what(); @@ -342,16 +274,9 @@ static void http_request_cb(struct evhttp_request* req, void* arg) req->WriteHeader("Connection", "close"); // TODO: Implement specific error formatting for the REST and JSON-RPC servers responses. req->WriteReply(HTTP_INTERNAL_SERVER_ERROR, err_msg); - return false; - }); + }; - assert(g_work_queue); - if (g_work_queue->Enqueue(item.get())) { - (void)item.release(); /* if true, queue took ownership */ - } else { - LogWarning("Request rejected because http work queue depth exceeded, it can be increased with the -rpcworkqueue= setting"); - item->req->WriteReply(HTTP_SERVICE_UNAVAILABLE, "Work queue depth exceeded"); - } + [[maybe_unused]] auto _{g_threadpool_http.Submit(std::move(item))}; } else { hreq->WriteReply(HTTP_NOT_FOUND); } @@ -429,13 +354,6 @@ static bool HTTPBindAddresses(struct evhttp* http) return !boundSockets.empty(); } -/** Simple wrapper to set thread name and run work queue */ -static void HTTPWorkQueueRun(WorkQueue* queue, int worker_num) -{ - util::ThreadRename(strprintf("httpworker.%i", worker_num)); - queue->Run(); -} - /** libevent event log callback */ static void libevent_log_cb(int severity, const char *msg) { @@ -492,10 +410,9 @@ bool InitHTTPServer(const util::SignalInterrupt& interrupt) } LogDebug(BCLog::HTTP, "Initialized HTTP server\n"); - int workQueueDepth = std::max((long)gArgs.GetIntArg("-rpcworkqueue", DEFAULT_HTTP_WORKQUEUE), 1L); - LogDebug(BCLog::HTTP, "creating work queue of depth %d\n", workQueueDepth); + g_max_queue_depth = std::max((long)gArgs.GetIntArg("-rpcworkqueue", DEFAULT_HTTP_WORKQUEUE), 1L); + LogDebug(BCLog::HTTP, "set work queue of depth %d\n", g_max_queue_depth); - g_work_queue = std::make_unique>(workQueueDepth); // transfer ownership to eventBase/HTTP via .release() eventBase = base_ctr.release(); eventHTTP = http_ctr.release(); @@ -511,17 +428,13 @@ void UpdateHTTPServerLogging(bool enable) { } static std::thread g_thread_http; -static std::vector g_thread_http_workers; void StartHTTPServer() { int rpcThreads = std::max((long)gArgs.GetIntArg("-rpcthreads", DEFAULT_HTTP_THREADS), 1L); LogInfo("Starting HTTP server with %d worker threads\n", rpcThreads); + g_threadpool_http.Start(rpcThreads); g_thread_http = std::thread(ThreadHTTP, eventBase); - - for (int i = 0; i < rpcThreads; i++) { - g_thread_http_workers.emplace_back(HTTPWorkQueueRun, g_work_queue.get(), i); - } } void InterruptHTTPServer() @@ -531,21 +444,17 @@ void InterruptHTTPServer() // Reject requests on current connections evhttp_set_gencb(eventHTTP, http_reject_request_cb, nullptr); } - if (g_work_queue) { - g_work_queue->Interrupt(); - } + // Interrupt pool after disabling requests + g_threadpool_http.Interrupt(); } void StopHTTPServer() { LogDebug(BCLog::HTTP, "Stopping HTTP server\n"); - if (g_work_queue) { - LogDebug(BCLog::HTTP, "Waiting for HTTP worker threads to exit\n"); - for (auto& thread : g_thread_http_workers) { - thread.join(); - } - g_thread_http_workers.clear(); - } + + LogDebug(BCLog::HTTP, "Waiting for HTTP worker threads to exit\n"); + g_threadpool_http.Stop(); + // Unlisten sockets, these are what make the event loop running, which means // that after this and all connections are closed the event loop will quit. for (evhttp_bound_socket *socket : boundSockets) { @@ -573,7 +482,6 @@ void StopHTTPServer() event_base_free(eventBase); eventBase = nullptr; } - g_work_queue.reset(); LogDebug(BCLog::HTTP, "Stopped HTTP server\n"); } diff --git a/src/httpserver.h b/src/httpserver.h index 1ef3aaeb0ab..5461480d44f 100644 --- a/src/httpserver.h +++ b/src/httpserver.h @@ -159,15 +159,6 @@ public: */ std::optional GetQueryParameterFromUri(const char* uri, const std::string& key); -/** Event handler closure. - */ -class HTTPClosure -{ -public: - virtual void operator()() = 0; - virtual ~HTTPClosure() = default; -}; - /** Event class. This can be used either as a cross-thread trigger or as a timer. */ class HTTPEvent