diff --git a/src/httpserver.cpp b/src/httpserver.cpp index 61df454af8e..b6a74d1c015 100644 --- a/src/httpserver.cpp +++ b/src/httpserver.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include @@ -49,83 +50,6 @@ using common::InvalidPortErrMsg; /** Maximum size of http request (request line + headers) */ static const size_t MAX_HEADERS_SIZE = 8192; -/** HTTP request work item */ -class HTTPWorkItem final : public HTTPClosure -{ -public: - HTTPWorkItem(std::unique_ptr _req, const std::string &_path, const HTTPRequestHandler& _func): - req(std::move(_req)), path(_path), func(_func) - { - } - void operator()() override - { - func(req.get(), path); - } - - std::unique_ptr req; - -private: - std::string path; - HTTPRequestHandler func; -}; - -/** Simple work queue for distributing work over multiple threads. - * Work items are simply callable objects. - */ -template -class WorkQueue -{ -private: - Mutex cs; - std::condition_variable cond GUARDED_BY(cs); - std::deque> queue GUARDED_BY(cs); - bool running GUARDED_BY(cs){true}; - const size_t maxDepth; - -public: - explicit WorkQueue(size_t _maxDepth) : maxDepth(_maxDepth) - { - } - /** Precondition: worker threads have all stopped (they have been joined). - */ - ~WorkQueue() = default; - /** Enqueue a work item */ - bool Enqueue(WorkItem* item) EXCLUSIVE_LOCKS_REQUIRED(!cs) - { - LOCK(cs); - if (!running || queue.size() >= maxDepth) { - return false; - } - queue.emplace_back(std::unique_ptr(item)); - cond.notify_one(); - return true; - } - /** Thread function */ - void Run() EXCLUSIVE_LOCKS_REQUIRED(!cs) - { - while (true) { - std::unique_ptr i; - { - WAIT_LOCK(cs, lock); - while (running && queue.empty()) - cond.wait(lock); - if (!running && queue.empty()) - break; - i = std::move(queue.front()); - queue.pop_front(); - } - (*i)(); - } - } - /** Interrupt and exit loops */ - void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!cs) - { - LOCK(cs); - running = false; - cond.notify_all(); - } -}; - struct HTTPPathHandler { HTTPPathHandler(std::string _prefix, bool _exactMatch, HTTPRequestHandler _handler): @@ -145,13 +69,14 @@ static struct event_base* eventBase = nullptr; static struct evhttp* eventHTTP = nullptr; //! List of subnets to allow RPC connections from static std::vector rpc_allow_subnets; -//! Work queue for handling longer requests off the event loop thread -static std::unique_ptr> g_work_queue{nullptr}; //! Handlers for (sub)paths static GlobalMutex g_httppathhandlers_mutex; static std::vector pathHandlers GUARDED_BY(g_httppathhandlers_mutex); //! Bound listening sockets static std::vector boundSockets; +//! Http thread pool - future: encapsulate in HttpContext +static ThreadPool g_threadpool_http("http"); +static int g_max_queue_depth{100}; /** * @brief Helps keep track of open `evhttp_connection`s with active `evhttp_requests` @@ -327,10 +252,17 @@ static void http_request_cb(struct evhttp_request* req, void* arg) // Dispatch to worker thread if (i != iend) { - auto item = std::make_unique(std::move(hreq), path, [fn = i->handler](HTTPRequest* req, const std::string& path_inner) { + if (static_cast(g_threadpool_http.WorkQueueSize()) >= g_max_queue_depth) { + LogWarning("Request rejected because http work queue depth exceeded, it can be increased with the -rpcworkqueue= setting"); + hreq->WriteReply(HTTP_SERVICE_UNAVAILABLE, "Work queue depth exceeded"); + return; + } + + auto item = [req = std::move(hreq), in_path = std::move(path), fn = i->handler]() { std::string err_msg; try { - return fn(req, path_inner); + fn(req.get(), in_path); + return; } catch (const std::exception& e) { LogWarning("Unexpected error while processing request for '%s'. Error msg: '%s'", req->GetURI(), e.what()); err_msg = e.what(); @@ -342,16 +274,9 @@ static void http_request_cb(struct evhttp_request* req, void* arg) req->WriteHeader("Connection", "close"); // TODO: Implement specific error formatting for the REST and JSON-RPC servers responses. req->WriteReply(HTTP_INTERNAL_SERVER_ERROR, err_msg); - return false; - }); + }; - assert(g_work_queue); - if (g_work_queue->Enqueue(item.get())) { - (void)item.release(); /* if true, queue took ownership */ - } else { - LogWarning("Request rejected because http work queue depth exceeded, it can be increased with the -rpcworkqueue= setting"); - item->req->WriteReply(HTTP_SERVICE_UNAVAILABLE, "Work queue depth exceeded"); - } + [[maybe_unused]] auto _{g_threadpool_http.Submit(std::move(item))}; } else { hreq->WriteReply(HTTP_NOT_FOUND); } @@ -429,13 +354,6 @@ static bool HTTPBindAddresses(struct evhttp* http) return !boundSockets.empty(); } -/** Simple wrapper to set thread name and run work queue */ -static void HTTPWorkQueueRun(WorkQueue* queue, int worker_num) -{ - util::ThreadRename(strprintf("httpworker.%i", worker_num)); - queue->Run(); -} - /** libevent event log callback */ static void libevent_log_cb(int severity, const char *msg) { @@ -492,10 +410,9 @@ bool InitHTTPServer(const util::SignalInterrupt& interrupt) } LogDebug(BCLog::HTTP, "Initialized HTTP server\n"); - int workQueueDepth = std::max((long)gArgs.GetIntArg("-rpcworkqueue", DEFAULT_HTTP_WORKQUEUE), 1L); - LogDebug(BCLog::HTTP, "creating work queue of depth %d\n", workQueueDepth); + g_max_queue_depth = std::max((long)gArgs.GetIntArg("-rpcworkqueue", DEFAULT_HTTP_WORKQUEUE), 1L); + LogDebug(BCLog::HTTP, "set work queue of depth %d\n", g_max_queue_depth); - g_work_queue = std::make_unique>(workQueueDepth); // transfer ownership to eventBase/HTTP via .release() eventBase = base_ctr.release(); eventHTTP = http_ctr.release(); @@ -511,17 +428,13 @@ void UpdateHTTPServerLogging(bool enable) { } static std::thread g_thread_http; -static std::vector g_thread_http_workers; void StartHTTPServer() { int rpcThreads = std::max((long)gArgs.GetIntArg("-rpcthreads", DEFAULT_HTTP_THREADS), 1L); LogInfo("Starting HTTP server with %d worker threads\n", rpcThreads); + g_threadpool_http.Start(rpcThreads); g_thread_http = std::thread(ThreadHTTP, eventBase); - - for (int i = 0; i < rpcThreads; i++) { - g_thread_http_workers.emplace_back(HTTPWorkQueueRun, g_work_queue.get(), i); - } } void InterruptHTTPServer() @@ -531,21 +444,17 @@ void InterruptHTTPServer() // Reject requests on current connections evhttp_set_gencb(eventHTTP, http_reject_request_cb, nullptr); } - if (g_work_queue) { - g_work_queue->Interrupt(); - } + // Interrupt pool after disabling requests + g_threadpool_http.Interrupt(); } void StopHTTPServer() { LogDebug(BCLog::HTTP, "Stopping HTTP server\n"); - if (g_work_queue) { - LogDebug(BCLog::HTTP, "Waiting for HTTP worker threads to exit\n"); - for (auto& thread : g_thread_http_workers) { - thread.join(); - } - g_thread_http_workers.clear(); - } + + LogDebug(BCLog::HTTP, "Waiting for HTTP worker threads to exit\n"); + g_threadpool_http.Stop(); + // Unlisten sockets, these are what make the event loop running, which means // that after this and all connections are closed the event loop will quit. for (evhttp_bound_socket *socket : boundSockets) { @@ -573,7 +482,6 @@ void StopHTTPServer() event_base_free(eventBase); eventBase = nullptr; } - g_work_queue.reset(); LogDebug(BCLog::HTTP, "Stopped HTTP server\n"); } diff --git a/src/httpserver.h b/src/httpserver.h index 1ef3aaeb0ab..5461480d44f 100644 --- a/src/httpserver.h +++ b/src/httpserver.h @@ -159,15 +159,6 @@ public: */ std::optional GetQueryParameterFromUri(const char* uri, const std::string& key); -/** Event handler closure. - */ -class HTTPClosure -{ -public: - virtual void operator()() = 0; - virtual ~HTTPClosure() = default; -}; - /** Event class. This can be used either as a cross-thread trigger or as a timer. */ class HTTPEvent