http: replace WorkQueue and threads handling for ThreadPool

Replace the HTTP server's WorkQueue implementation and single threads
handling code with ThreadPool for processing HTTP requests. The
ThreadPool class encapsulates all this functionality on a reusable
class, properly unit and fuzz tested (the previous code was not
unit nor fuzz tested at all).

This cleanly separates responsibilities:
The HTTP server now focuses solely on receiving and dispatching requests,
while ThreadPool handles concurrency, queuing, and execution.
It simplifies init, shutdown and requests tracking.

This also allows us to experiment with further performance improvements at
the task queuing and execution level, such as a lock-free structure, task
prioritization or any other performance improvement in the future, without
having to deal with HTTP code that lives on a different layer.
This commit is contained in:
furszy 2026-01-28 15:08:01 -05:00
parent c323f882ed
commit 38fd85c676
No known key found for this signature in database
GPG Key ID: 5DD23CCC686AA623
2 changed files with 24 additions and 125 deletions

View File

@ -17,6 +17,7 @@
#include <util/signalinterrupt.h>
#include <util/strencodings.h>
#include <util/threadnames.h>
#include <util/threadpool.h>
#include <util/translation.h>
#include <condition_variable>
@ -49,83 +50,6 @@ using common::InvalidPortErrMsg;
/** Maximum size of http request (request line + headers) */
static const size_t MAX_HEADERS_SIZE = 8192;
/** HTTP request work item */
class HTTPWorkItem final : public HTTPClosure
{
public:
HTTPWorkItem(std::unique_ptr<HTTPRequest> _req, const std::string &_path, const HTTPRequestHandler& _func):
req(std::move(_req)), path(_path), func(_func)
{
}
void operator()() override
{
func(req.get(), path);
}
std::unique_ptr<HTTPRequest> req;
private:
std::string path;
HTTPRequestHandler func;
};
/** Simple work queue for distributing work over multiple threads.
* Work items are simply callable objects.
*/
template <typename WorkItem>
class WorkQueue
{
private:
Mutex cs;
std::condition_variable cond GUARDED_BY(cs);
std::deque<std::unique_ptr<WorkItem>> queue GUARDED_BY(cs);
bool running GUARDED_BY(cs){true};
const size_t maxDepth;
public:
explicit WorkQueue(size_t _maxDepth) : maxDepth(_maxDepth)
{
}
/** Precondition: worker threads have all stopped (they have been joined).
*/
~WorkQueue() = default;
/** Enqueue a work item */
bool Enqueue(WorkItem* item) EXCLUSIVE_LOCKS_REQUIRED(!cs)
{
LOCK(cs);
if (!running || queue.size() >= maxDepth) {
return false;
}
queue.emplace_back(std::unique_ptr<WorkItem>(item));
cond.notify_one();
return true;
}
/** Thread function */
void Run() EXCLUSIVE_LOCKS_REQUIRED(!cs)
{
while (true) {
std::unique_ptr<WorkItem> i;
{
WAIT_LOCK(cs, lock);
while (running && queue.empty())
cond.wait(lock);
if (!running && queue.empty())
break;
i = std::move(queue.front());
queue.pop_front();
}
(*i)();
}
}
/** Interrupt and exit loops */
void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!cs)
{
LOCK(cs);
running = false;
cond.notify_all();
}
};
struct HTTPPathHandler
{
HTTPPathHandler(std::string _prefix, bool _exactMatch, HTTPRequestHandler _handler):
@ -145,13 +69,14 @@ static struct event_base* eventBase = nullptr;
static struct evhttp* eventHTTP = nullptr;
//! List of subnets to allow RPC connections from
static std::vector<CSubNet> rpc_allow_subnets;
//! Work queue for handling longer requests off the event loop thread
static std::unique_ptr<WorkQueue<HTTPClosure>> g_work_queue{nullptr};
//! Handlers for (sub)paths
static GlobalMutex g_httppathhandlers_mutex;
static std::vector<HTTPPathHandler> pathHandlers GUARDED_BY(g_httppathhandlers_mutex);
//! Bound listening sockets
static std::vector<evhttp_bound_socket *> boundSockets;
//! Http thread pool - future: encapsulate in HttpContext
static ThreadPool g_threadpool_http("http");
static int g_max_queue_depth{100};
/**
* @brief Helps keep track of open `evhttp_connection`s with active `evhttp_requests`
@ -327,10 +252,17 @@ static void http_request_cb(struct evhttp_request* req, void* arg)
// Dispatch to worker thread
if (i != iend) {
auto item = std::make_unique<HTTPWorkItem>(std::move(hreq), path, [fn = i->handler](HTTPRequest* req, const std::string& path_inner) {
if (static_cast<int>(g_threadpool_http.WorkQueueSize()) >= g_max_queue_depth) {
LogWarning("Request rejected because http work queue depth exceeded, it can be increased with the -rpcworkqueue= setting");
hreq->WriteReply(HTTP_SERVICE_UNAVAILABLE, "Work queue depth exceeded");
return;
}
auto item = [req = std::move(hreq), in_path = std::move(path), fn = i->handler]() {
std::string err_msg;
try {
return fn(req, path_inner);
fn(req.get(), in_path);
return;
} catch (const std::exception& e) {
LogWarning("Unexpected error while processing request for '%s'. Error msg: '%s'", req->GetURI(), e.what());
err_msg = e.what();
@ -342,16 +274,9 @@ static void http_request_cb(struct evhttp_request* req, void* arg)
req->WriteHeader("Connection", "close");
// TODO: Implement specific error formatting for the REST and JSON-RPC servers responses.
req->WriteReply(HTTP_INTERNAL_SERVER_ERROR, err_msg);
return false;
});
};
assert(g_work_queue);
if (g_work_queue->Enqueue(item.get())) {
(void)item.release(); /* if true, queue took ownership */
} else {
LogWarning("Request rejected because http work queue depth exceeded, it can be increased with the -rpcworkqueue= setting");
item->req->WriteReply(HTTP_SERVICE_UNAVAILABLE, "Work queue depth exceeded");
}
[[maybe_unused]] auto _{g_threadpool_http.Submit(std::move(item))};
} else {
hreq->WriteReply(HTTP_NOT_FOUND);
}
@ -429,13 +354,6 @@ static bool HTTPBindAddresses(struct evhttp* http)
return !boundSockets.empty();
}
/** Simple wrapper to set thread name and run work queue */
static void HTTPWorkQueueRun(WorkQueue<HTTPClosure>* queue, int worker_num)
{
util::ThreadRename(strprintf("httpworker.%i", worker_num));
queue->Run();
}
/** libevent event log callback */
static void libevent_log_cb(int severity, const char *msg)
{
@ -492,10 +410,9 @@ bool InitHTTPServer(const util::SignalInterrupt& interrupt)
}
LogDebug(BCLog::HTTP, "Initialized HTTP server\n");
int workQueueDepth = std::max((long)gArgs.GetIntArg("-rpcworkqueue", DEFAULT_HTTP_WORKQUEUE), 1L);
LogDebug(BCLog::HTTP, "creating work queue of depth %d\n", workQueueDepth);
g_max_queue_depth = std::max((long)gArgs.GetIntArg("-rpcworkqueue", DEFAULT_HTTP_WORKQUEUE), 1L);
LogDebug(BCLog::HTTP, "set work queue of depth %d\n", g_max_queue_depth);
g_work_queue = std::make_unique<WorkQueue<HTTPClosure>>(workQueueDepth);
// transfer ownership to eventBase/HTTP via .release()
eventBase = base_ctr.release();
eventHTTP = http_ctr.release();
@ -511,17 +428,13 @@ void UpdateHTTPServerLogging(bool enable) {
}
static std::thread g_thread_http;
static std::vector<std::thread> g_thread_http_workers;
void StartHTTPServer()
{
int rpcThreads = std::max((long)gArgs.GetIntArg("-rpcthreads", DEFAULT_HTTP_THREADS), 1L);
LogInfo("Starting HTTP server with %d worker threads\n", rpcThreads);
g_threadpool_http.Start(rpcThreads);
g_thread_http = std::thread(ThreadHTTP, eventBase);
for (int i = 0; i < rpcThreads; i++) {
g_thread_http_workers.emplace_back(HTTPWorkQueueRun, g_work_queue.get(), i);
}
}
void InterruptHTTPServer()
@ -531,21 +444,17 @@ void InterruptHTTPServer()
// Reject requests on current connections
evhttp_set_gencb(eventHTTP, http_reject_request_cb, nullptr);
}
if (g_work_queue) {
g_work_queue->Interrupt();
}
// Interrupt pool after disabling requests
g_threadpool_http.Interrupt();
}
void StopHTTPServer()
{
LogDebug(BCLog::HTTP, "Stopping HTTP server\n");
if (g_work_queue) {
LogDebug(BCLog::HTTP, "Waiting for HTTP worker threads to exit\n");
for (auto& thread : g_thread_http_workers) {
thread.join();
}
g_thread_http_workers.clear();
}
LogDebug(BCLog::HTTP, "Waiting for HTTP worker threads to exit\n");
g_threadpool_http.Stop();
// Unlisten sockets, these are what make the event loop running, which means
// that after this and all connections are closed the event loop will quit.
for (evhttp_bound_socket *socket : boundSockets) {
@ -573,7 +482,6 @@ void StopHTTPServer()
event_base_free(eventBase);
eventBase = nullptr;
}
g_work_queue.reset();
LogDebug(BCLog::HTTP, "Stopped HTTP server\n");
}

View File

@ -159,15 +159,6 @@ public:
*/
std::optional<std::string> GetQueryParameterFromUri(const char* uri, const std::string& key);
/** Event handler closure.
*/
class HTTPClosure
{
public:
virtual void operator()() = 0;
virtual ~HTTPClosure() = default;
};
/** Event class. This can be used either as a cross-thread trigger or as a timer.
*/
class HTTPEvent