mirror of
https://github.com/bitcoin/bitcoin.git
synced 2026-03-06 03:36:18 +00:00
1868a84451f Merge bitcoin-core/libmultiprocess#245: type-context.h: Extent cancel_mutex lock to prevent theoretical race fd4a90d3103 Merge bitcoin-core/libmultiprocess#244: ci: suppress two tidy lint issues 16dfc368640 ci: avoid bugprone-unused-return-value lint in test dacd5eda464 ci: suppress nontrivial-threadlocal lint in proxy.cpp ef96a5b2be2 doc: Comment cleanups after #240 e0f1cd76219 type-context.h: Extent cancel_mutex lock to prevent theoretical race 290702c74ce Merge bitcoin-core/libmultiprocess#240: Avoid errors from asynchronous (non-c++) clients 3a69d4755af Merge bitcoin-core/libmultiprocess#241: doc: Bump version number v7 -> v8 0174450ca2e Prevent crash on unclean disconnect if abandoned IPC call returns interface pointer ddb5f74196f Allow simultaneous calls on same Context.thread c4762c7b513 refactor: Add ProxyServer<Thread>::post() method 0ade1b40ac5 doc: Bump version number git-subtree-dir: src/ipc/libmultiprocess git-subtree-split: 1868a84451fe1b6a00116375a5f717230bb2533e
219 lines
13 KiB
C++
219 lines
13 KiB
C++
// Copyright (c) The Bitcoin Core developers
|
|
// Distributed under the MIT software license, see the accompanying
|
|
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
|
|
|
#ifndef MP_PROXY_TYPE_CONTEXT_H
|
|
#define MP_PROXY_TYPE_CONTEXT_H
|
|
|
|
#include <mp/proxy-io.h>
|
|
#include <mp/util.h>
|
|
|
|
#include <kj/string.h>
|
|
|
|
namespace mp {
|
|
template <typename Output>
|
|
void CustomBuildField(TypeList<>,
|
|
Priority<1>,
|
|
ClientInvokeContext& invoke_context,
|
|
Output&& output,
|
|
typename std::enable_if<std::is_same<decltype(output.get()), Context::Builder>::value>::type* enable = nullptr)
|
|
{
|
|
auto& connection = invoke_context.connection;
|
|
auto& thread_context = invoke_context.thread_context;
|
|
|
|
// Create local Thread::Server object corresponding to the current thread
|
|
// and pass a Thread::Client reference to it in the Context.callbackThread
|
|
// field so the function being called can make callbacks to this thread.
|
|
// Also store the Thread::Client reference in the callback_threads map so
|
|
// future calls over this connection can reuse it.
|
|
auto [callback_thread, _]{SetThread(
|
|
GuardedRef{thread_context.waiter->m_mutex, thread_context.callback_threads}, &connection,
|
|
[&] { return connection.m_threads.add(kj::heap<ProxyServer<Thread>>(connection, thread_context, std::thread{})); })};
|
|
|
|
// Call remote ThreadMap.makeThread function so server will create a
|
|
// dedicated worker thread to run function calls from this thread. Store the
|
|
// Thread::Client reference it returns in the request_threads map.
|
|
auto make_request_thread{[&]{
|
|
// This code will only run if an IPC client call is being made for the
|
|
// first time on this thread. After the first call, subsequent calls
|
|
// will use the existing request thread. This code will also never run at
|
|
// all if the current thread is a request thread created for a different
|
|
// IPC client, because in that case PassField code (below) will have set
|
|
// request_thread to point to the calling thread.
|
|
auto request = connection.m_thread_map.makeThreadRequest();
|
|
request.setName(thread_context.thread_name);
|
|
return request.send().getResult(); // Nonblocking due to capnp request pipelining.
|
|
}};
|
|
auto [request_thread, _1]{SetThread(
|
|
GuardedRef{thread_context.waiter->m_mutex, thread_context.request_threads},
|
|
&connection, make_request_thread)};
|
|
|
|
auto context = output.init();
|
|
context.setThread(request_thread->second->m_client);
|
|
context.setCallbackThread(callback_thread->second->m_client);
|
|
}
|
|
|
|
//! PassField override for mp.Context arguments. Return asynchronously and call
|
|
//! function on other thread found in context.
|
|
template <typename Accessor, typename ServerContext, typename Fn, typename... Args>
|
|
auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& fn, Args&&... args) ->
|
|
typename std::enable_if<
|
|
std::is_same<decltype(Accessor::get(server_context.call_context.getParams())), Context::Reader>::value,
|
|
kj::Promise<typename ServerContext::CallContext>>::type
|
|
{
|
|
const auto& params = server_context.call_context.getParams();
|
|
Context::Reader context_arg = Accessor::get(params);
|
|
auto& server = server_context.proxy_server;
|
|
int req = server_context.req;
|
|
// Keep a reference to the ProxyServer instance by assigning it to the self
|
|
// variable. ProxyServer instances are reference-counted and if the client
|
|
// drops its reference and the IPC call is canceled, this variable keeps the
|
|
// instance alive until the method finishes executing. The self variable
|
|
// needs to be destroyed on the event loop thread so it is freed in a sync()
|
|
// call below.
|
|
auto self = server.thisCap();
|
|
auto invoke = [self = kj::mv(self), call_context = kj::mv(server_context.call_context), &server, req, fn, args...](CancelMonitor& cancel_monitor) mutable {
|
|
MP_LOG(*server.m_context.loop, Log::Debug) << "IPC server executing request #" << req;
|
|
const auto& params = call_context.getParams();
|
|
Context::Reader context_arg = Accessor::get(params);
|
|
ServerContext server_context{server, call_context, req};
|
|
{
|
|
// Before invoking the function, store a reference to the
|
|
// callbackThread provided by the client in the
|
|
// thread_local.request_threads map. This way, if this
|
|
// server thread needs to execute any RPCs that call back to
|
|
// the client, they will happen on the same client thread
|
|
// that is waiting for this function, just like what would
|
|
// happen if this were a normal function call made on the
|
|
// local stack.
|
|
//
|
|
// If the request_threads map already has an entry for this
|
|
// connection, it will be left unchanged, and it indicates
|
|
// that the current thread is an RPC client thread which is
|
|
// in the middle of an RPC call, and the current RPC call is
|
|
// a nested call from the remote thread handling that RPC
|
|
// call. In this case, the callbackThread value should point
|
|
// to the same thread already in the map, so there is no
|
|
// need to update the map.
|
|
auto& thread_context = g_thread_context;
|
|
auto& request_threads = thread_context.request_threads;
|
|
ConnThread request_thread;
|
|
bool inserted{false};
|
|
Mutex cancel_mutex;
|
|
Lock cancel_lock{cancel_mutex};
|
|
server_context.cancel_lock = &cancel_lock;
|
|
server.m_context.loop->sync([&] {
|
|
// Detect request being canceled before it executes.
|
|
if (cancel_monitor.m_canceled) {
|
|
server_context.request_canceled = true;
|
|
return;
|
|
}
|
|
// Detect request being canceled while it executes.
|
|
assert(!cancel_monitor.m_on_cancel);
|
|
cancel_monitor.m_on_cancel = [&server, &server_context, &cancel_mutex, req]() {
|
|
MP_LOG(*server.m_context.loop, Log::Info) << "IPC server request #" << req << " canceled while executing.";
|
|
// Lock cancel_mutex here to block the event loop
|
|
// thread and prevent it from deleting the request's
|
|
// params and response structs while the execution
|
|
// thread is accessing them. Because this lock is
|
|
// released before the event loop thread does delete
|
|
// the structs, the mutex does not provide any
|
|
// protection from the event loop deleting the
|
|
// structs _before_ the execution thread acquires
|
|
// it. So in addition to locking the mutex, the
|
|
// execution thread always checks request_canceled
|
|
// as well before accessing the structs.
|
|
Lock cancel_lock{cancel_mutex};
|
|
server_context.request_canceled = true;
|
|
};
|
|
// Update requests_threads map if not canceled.
|
|
std::tie(request_thread, inserted) = SetThread(
|
|
GuardedRef{thread_context.waiter->m_mutex, request_threads}, server.m_context.connection,
|
|
[&] { return context_arg.getCallbackThread(); });
|
|
});
|
|
|
|
// If an entry was inserted into the request_threads map,
|
|
// remove it after calling fn.invoke. If an entry was not
|
|
// inserted, one already existed, meaning this must be a
|
|
// recursive call (IPC call calling back to the caller which
|
|
// makes another IPC call), so avoid modifying the map.
|
|
const bool erase_thread{inserted};
|
|
KJ_DEFER(
|
|
// Release the cancel lock before calling loop->sync and
|
|
// waiting for the event loop thread, because if a
|
|
// cancellation happened, it needs to run the on_cancel
|
|
// callback above. It's safe to release cancel_lock at
|
|
// this point because the fn.invoke() call below will be
|
|
// finished and no longer accessing the params or
|
|
// results structs.
|
|
cancel_lock.m_lock.unlock();
|
|
// Erase the request_threads entry on the event loop
|
|
// thread with loop->sync(), so if the connection is
|
|
// broken there is not a race between this thread and
|
|
// the disconnect handler trying to destroy the thread
|
|
// client object.
|
|
server.m_context.loop->sync([&] {
|
|
auto self_dispose{kj::mv(self)};
|
|
if (erase_thread) {
|
|
// Look up the thread again without using existing
|
|
// iterator since entry may no longer be there after
|
|
// a disconnect. Destroy node after releasing
|
|
// Waiter::m_mutex, so the ProxyClient<Thread>
|
|
// destructor is able to use EventLoop::mutex
|
|
// without violating lock order.
|
|
ConnThreads::node_type removed;
|
|
{
|
|
Lock lock(thread_context.waiter->m_mutex);
|
|
removed = request_threads.extract(server.m_context.connection);
|
|
}
|
|
}
|
|
});
|
|
);
|
|
if (server_context.request_canceled) {
|
|
MP_LOG(*server.m_context.loop, Log::Info) << "IPC server request #" << req << " canceled before it could be executed";
|
|
} else KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]{
|
|
try {
|
|
fn.invoke(server_context, args...);
|
|
} catch (const InterruptException& e) {
|
|
MP_LOG(*server.m_context.loop, Log::Info) << "IPC server request #" << req << " interrupted (" << e.what() << ")";
|
|
}
|
|
})) {
|
|
MP_LOG(*server.m_context.loop, Log::Error) << "IPC server request #" << req << " uncaught exception (" << kj::str(*exception).cStr() << ")";
|
|
throw kj::mv(*exception);
|
|
}
|
|
// End of scope: if KJ_DEFER was reached, it runs here
|
|
}
|
|
return call_context;
|
|
};
|
|
|
|
// Lookup Thread object specified by the client. The specified thread should
|
|
// be a local Thread::Server object, but it needs to be looked up
|
|
// asynchronously with getLocalServer().
|
|
auto thread_client = context_arg.getThread();
|
|
auto result = server.m_context.connection->m_threads.getLocalServer(thread_client)
|
|
.then([&server, invoke = kj::mv(invoke), req](const kj::Maybe<Thread::Server&>& perhaps) mutable {
|
|
// Assuming the thread object is found, pass it a pointer to the
|
|
// `invoke` lambda above which will invoke the function on that
|
|
// thread.
|
|
KJ_IF_MAYBE (thread_server, perhaps) {
|
|
auto& thread = static_cast<ProxyServer<Thread>&>(*thread_server);
|
|
MP_LOG(*server.m_context.loop, Log::Debug)
|
|
<< "IPC server post request #" << req << " {" << thread.m_thread_context.thread_name << "}";
|
|
return thread.template post<typename ServerContext::CallContext>(std::move(invoke));
|
|
} else {
|
|
MP_LOG(*server.m_context.loop, Log::Error)
|
|
<< "IPC server error request #" << req << ", missing thread to execute request";
|
|
throw std::runtime_error("invalid thread handle");
|
|
}
|
|
});
|
|
// Use connection m_canceler object to cancel the result promise if the
|
|
// connection is destroyed. (By default Cap'n Proto does not cancel requests
|
|
// on disconnect, since it's possible clients might want to make requests
|
|
// and immediately disconnect without waiting for results, but not want the
|
|
// requests to be canceled.)
|
|
return server.m_context.connection->m_canceler.wrap(kj::mv(result));
|
|
}
|
|
} // namespace mp
|
|
|
|
#endif // MP_PROXY_TYPE_CONTEXT_H
|