Merge commit 'b7ca3bf061b51108d155283c1ad503c0af7eab0d' into pr/subtree-8

This commit is contained in:
Ryan Ofsky 2026-02-25 11:08:49 -05:00
commit cb15f5a317
12 changed files with 381 additions and 91 deletions

View File

@ -7,40 +7,55 @@ Library versions are tracked with simple
Versioning policy is described in the [version.h](../include/mp/version.h)
include.
## v7
## v8
- Current unstable version in master branch.
- Intended to be compatible with Bitcoin Core 30.1 and future releases.
## v7.0
- Adds SpawnProcess race fix, cmake `target_capnp_sources` option, ci and documentation improvements.
- Used in Bitcoin Core master branch, pulled in by [#34363](https://github.com/bitcoin/bitcoin/pull/34363).
## v7.0-pre1
- Adds support for log levels to reduce logging and "thread busy" error to avoid a crash on misuse. Fixes intermittent mptest hang and makes other minor improvements.
- Used in Bitcoin Core 30.1 and 30.2 releases and 30.x branch. Pulled in by [#33412](https://github.com/bitcoin/bitcoin/pull/33412), [#33518](https://github.com/bitcoin/bitcoin/pull/33518), and [#33519](https://github.com/bitcoin/bitcoin/pull/33519).
## v6.0
- `EventLoop::addClient` and `EventLoop::removeClient` methods dropped,
- Adds fixes for unclean shutdowns and thread sanitizer issues and adds CI scripts.
- Drops `EventLoop::addClient` and `EventLoop::removeClient` methods,
requiring clients to use new `EventLoopRef` class instead.
- Compatible with Bitcoin Core 30.0 release.
- Used in Bitcoin Core 30.0 release. Pulled in by [#31741](https://github.com/bitcoin/bitcoin/pull/31741), [#32641](https://github.com/bitcoin/bitcoin/pull/32641), [#32345](https://github.com/bitcoin/bitcoin/pull/32345), [#33241](https://github.com/bitcoin/bitcoin/pull/33241), and [#33322](https://github.com/bitcoin/bitcoin/pull/33322).
## v5.0
- Adds build improvements and tidy/warning fixes.
- Used in Bitcoin Core 29 releases, pulled in by [#31945](https://github.com/bitcoin/bitcoin/pull/31945).
## v5.0-pre1
- Adds many improvements to Bitcoin Core mining interface: splitting up type headers, fixing shutdown bugs, adding subtree build support.
- Broke up `proxy-types.h` into `type-*.h` files requiring clients to explicitly
include overloads needed for C++ ↔️ Cap'n Proto type conversions.
- Now requires C++ 20 support.
- Compatible with Bitcoin Core 29 releases.
- Minimum required version for Bitcoin Core 29 releases, pulled in by [#30509](https://github.com/bitcoin/bitcoin/pull/30509), [#30510](https://github.com/bitcoin/bitcoin/pull/30510), [#31105](https://github.com/bitcoin/bitcoin/pull/31105), [#31740](https://github.com/bitcoin/bitcoin/pull/31740).
## v4.0
- Added better cmake support, installing cmake package files so clients do not
need to use pkgconfig.
- Compatible with Bitcoin Core 28 releases.
- Used in Bitcoin Core 28 releases, pulled in by [#30490](https://github.com/bitcoin/bitcoin/pull/30490) and [#30513](https://github.com/bitcoin/bitcoin/pull/30513).
## v3.0
- Dropped compatibility with Cap'n Proto versions before 0.7.
- Compatible with Bitcoin Core 27 releases.
- Used in Bitcoin Core 27 releases, pulled in by [#28735](https://github.com/bitcoin/bitcoin/pull/28735), [#28907](https://github.com/bitcoin/bitcoin/pull/28907), and [#29276](https://github.com/bitcoin/bitcoin/pull/29276).
## v2.0
- Changed `PassField` function signature.
- Now requires C++17 support.
- Compatible with Bitcoin Core 25 and 26 releases.
- Used in Bitcoin Core 25 and 26 releases, pulled in by [#26672](https://github.com/bitcoin/bitcoin/pull/26672).
## v1.0
- Dropped hardcoded includes in generated files, now requiring `include` and
`includeTypes` annotations.
- Compatible with Bitcoin Core 22, 23, and 24 releases.
- Used in Bitcoin Core 22, 23, and 24 releases, pulled in by [#19160](https://github.com/bitcoin/bitcoin/pull/19160).
## v0.0
- Initial version used in a downstream release.
- Compatible with Bitcoin Core 21 releases.
- Used in Bitcoin Core 21 releases, pulled in by [#16367](https://github.com/bitcoin/bitcoin/pull/16367), [#18588](https://github.com/bitcoin/bitcoin/pull/18588), and [#18677](https://github.com/bitcoin/bitcoin/pull/18677).

View File

@ -48,6 +48,19 @@ struct ServerInvokeContext : InvokeContext
ProxyServer& proxy_server;
CallContext& call_context;
int req;
//! For IPC methods that execute asynchronously, not on the event-loop
//! thread: lock preventing the event-loop thread from freeing the params or
//! results structs if the request is canceled while the worker thread is
//! reading params (`call_context.getParams()`) or writing results
//! (`call_context.getResults()`).
Lock* cancel_lock{nullptr};
//! For IPC methods that execute asynchronously, not on the event-loop
//! thread, this is set to true if the IPC call was canceled by the client
//! or canceled by a disconnection. If the call runs on the event-loop
//! thread, it can't be canceled. This should be accessed with cancel_lock
//! held if it is not null, since in the asynchronous case it is accessed
//! from multiple threads.
bool request_canceled{false};
ServerInvokeContext(ProxyServer& proxy_server, CallContext& call_context, int req)
: InvokeContext{*proxy_server.m_context.connection}, proxy_server{proxy_server}, call_context{call_context}, req{req}
@ -82,11 +95,23 @@ template <>
struct ProxyServer<Thread> final : public Thread::Server
{
public:
ProxyServer(ThreadContext& thread_context, std::thread&& thread);
ProxyServer(Connection& connection, ThreadContext& thread_context, std::thread&& thread);
~ProxyServer();
kj::Promise<void> getName(GetNameContext context) override;
//! Run a callback function fn returning T on this thread. The function will
//! be queued and executed as soon as the thread is idle, and when fn
//! returns, the promise returned by this method will be fulfilled with the
//! value fn returned.
template<typename T, typename Fn>
kj::Promise<T> post(Fn&& fn);
EventLoopRef m_loop;
ThreadContext& m_thread_context;
std::thread m_thread;
//! Promise signaled when m_thread_context.waiter is ready and there is no
//! post() callback function waiting to execute.
kj::Promise<void> m_thread_ready{kj::READY_NOW};
};
//! Handler for kj::TaskSet failed task events.
@ -322,7 +347,12 @@ public:
//! thread is blocked waiting for server response, this is what allows the
//! client to run the request in the same thread, the same way code would run in a
//! single process, with the callback sharing the same thread stack as the original
//! call.)
//! call.) To support this, the clientInvoke function calls Waiter::wait() to
//! block the client IPC thread while initial request is in progress. Then if
//! there is a callback, it is executed with Waiter::post().
//!
//! The Waiter class is also used server-side by `ProxyServer<Thread>::post()`
//! to execute IPC calls on worker threads.
struct Waiter
{
Waiter() = default;
@ -404,11 +434,11 @@ public:
template <typename F>
void onDisconnect(F&& f)
{
// Add disconnect handler to local TaskSet to ensure it is cancelled and
// Add disconnect handler to local TaskSet to ensure it is canceled and
// will never run after connection object is destroyed. But when disconnect
// handler fires, do not call the function f right away, instead add it
// to the EventLoop TaskSet to avoid "Promise callback destroyed itself"
// error in cases where f deletes this Connection object.
// error in the typical case where f deletes this Connection object.
m_on_disconnect.add(m_network.onDisconnect().then(
[f = std::forward<F>(f), this]() mutable { m_loop->m_task_set->add(kj::evalLater(kj::mv(f))); }));
}
@ -416,6 +446,9 @@ public:
EventLoopRef m_loop;
kj::Own<kj::AsyncIoStream> m_stream;
LoggingErrorHandler m_error_handler{*m_loop};
//! TaskSet used to cancel the m_network.onDisconnect() handler for remote
//! disconnections, if the connection is closed locally first by deleting
//! this Connection object.
kj::TaskSet m_on_disconnect{m_error_handler};
::capnp::TwoPartyVatNetwork m_network;
std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId>> m_rpc_system;
@ -428,6 +461,11 @@ public:
//! ThreadMap.makeThread) used to service requests to clients.
::capnp::CapabilityServerSet<Thread> m_threads;
//! Canceler for canceling promises that we want to discard when the
//! connection is destroyed. This is used to interrupt method calls that are
//! still executing at time of disconnection.
kj::Canceler m_canceler;
//! Cleanup functions to run if connection is broken unexpectedly. List
//! will be empty if all ProxyClient are destroyed cleanly before the
//! connection is destroyed.
@ -675,6 +713,70 @@ struct ThreadContext
bool loop_thread = false;
};
template<typename T, typename Fn>
kj::Promise<T> ProxyServer<Thread>::post(Fn&& fn)
{
auto ready = kj::newPromiseAndFulfiller<void>(); // Signaled when waiter is ready to post again.
auto cancel_monitor_ptr = kj::heap<CancelMonitor>();
CancelMonitor& cancel_monitor = *cancel_monitor_ptr;
// Keep a reference to the ProxyServer<Thread> instance by assigning it to
// the self variable. ProxyServer instances are reference-counted and if the
// client drops its reference, this variable keeps the instance alive until
// the thread finishes executing. The self variable needs to be destroyed on
// the event loop thread so it is freed in a sync() call below.
auto self = thisCap();
auto ret = m_thread_ready.then([this, self = std::move(self), fn = std::forward<Fn>(fn), ready_fulfiller = kj::mv(ready.fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable {
auto result = kj::newPromiseAndFulfiller<T>(); // Signaled when fn() is called, with its return value.
bool posted = m_thread_context.waiter->post([this, self = std::move(self), fn = std::forward<Fn>(fn), ready_fulfiller = kj::mv(ready_fulfiller), result_fulfiller = kj::mv(result.fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable {
// Fulfill ready.promise now, as soon as the Waiter starts executing
// this lambda, so the next ProxyServer<Thread>::post() call can
// immediately call waiter->post(). It is important to do this
// before calling fn() because fn() can make an IPC call back to the
// client, which can make another IPC call to this server thread.
// (This typically happens when IPC methods take std::function
// parameters.) When this happens the second call to the server
// thread should not be blocked waiting for the first call.
m_loop->sync([ready_fulfiller = kj::mv(ready_fulfiller)]() mutable {
ready_fulfiller->fulfill();
ready_fulfiller = nullptr;
});
std::optional<T> result_value;
kj::Maybe<kj::Exception> exception{kj::runCatchingExceptions([&]{ result_value.emplace(fn(*cancel_monitor_ptr)); })};
m_loop->sync([this, &result_value, &exception, self = kj::mv(self), result_fulfiller = kj::mv(result_fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable {
// Destroy CancelMonitor here before fulfilling or rejecting the
// promise so it doesn't get triggered when the promise is
// destroyed.
cancel_monitor_ptr = nullptr;
// Send results to the fulfiller. Technically it would be ok to
// skip this if promise was canceled, but it's simpler to just
// do it unconditionally.
KJ_IF_MAYBE(e, exception) {
assert(!result_value);
result_fulfiller->reject(kj::mv(*e));
} else {
assert(result_value);
result_fulfiller->fulfill(kj::mv(*result_value));
result_value.reset();
}
result_fulfiller = nullptr;
// Use evalLater to destroy the ProxyServer<Thread> self
// reference, if it is the last reference, because the
// ProxyServer<Thread> destructor needs to join the thread,
// which can't happen until this sync() block has exited.
m_loop->m_task_set->add(kj::evalLater([self = kj::mv(self)] {}));
});
});
// Assert that calling Waiter::post did not fail. It could only return
// false if a new function was posted before the previous one finished
// executing, but new functions are only posted when m_thread_ready is
// signaled, so this should never happen.
assert(posted);
return kj::mv(result.promise);
}).attach(kj::heap<CancelProbe>(cancel_monitor));
m_thread_ready = kj::mv(ready.promise);
return ret;
}
//! Given stream file descriptor, make a new ProxyClient object to send requests
//! over the stream. Also create a new Connection object embedded in the
//! client that is freed when the client is closed.

View File

@ -445,9 +445,36 @@ struct ServerCall
template <typename ServerContext, typename... Args>
decltype(auto) invoke(ServerContext& server_context, TypeList<>, Args&&... args) const
{
return ProxyServerMethodTraits<typename decltype(server_context.call_context.getParams())::Reads>::invoke(
server_context,
std::forward<Args>(args)...);
// If cancel_lock is set, release it while executing the method, and
// reacquire it afterwards. The lock is needed to prevent params and
// response structs from being deleted by the event loop thread if the
// request is canceled, so it is only needed before and after method
// execution. It is important to release the lock during execution
// because the method can take arbitrarily long to return and the event
// loop will need the lock itself in on_cancel if the call is canceled.
if (server_context.cancel_lock) server_context.cancel_lock->m_lock.unlock();
return TryFinally(
[&]() -> decltype(auto) {
return ProxyServerMethodTraits<
typename decltype(server_context.call_context.getParams())::Reads
>::invoke(server_context, std::forward<Args>(args)...);
},
[&] {
if (server_context.cancel_lock) server_context.cancel_lock->m_lock.lock();
// If the IPC request was canceled, throw InterruptException
// because there is no point continuing and trying to fill the
// call_context.getResults() struct. It's also important to stop
// executing because the connection may have been destroyed as
// described in https://github.com/bitcoin/bitcoin/issues/34250
// and there could be invalid references to the destroyed
// Connection object if this continued.
// If the IPC method itself threw an exception, the
// InterruptException thrown below will take precedence over it.
// Since the call has been canceled that exception can't be
// returned to the caller, so it needs to be discarded like
// other result values.
if (server_context.request_canceled) throw InterruptException{"canceled"};
});
}
};

View File

@ -153,6 +153,7 @@ public:
ProxyServerBase(std::shared_ptr<Impl> impl, Connection& connection);
virtual ~ProxyServerBase();
void invokeDestroy();
using Interface_::Server::thisCap;
/**
* Implementation pointer that may or may not be owned and deleted when this

View File

@ -8,6 +8,8 @@
#include <mp/proxy-io.h>
#include <mp/util.h>
#include <kj/string.h>
namespace mp {
template <typename Output>
void CustomBuildField(TypeList<>,
@ -26,7 +28,7 @@ void CustomBuildField(TypeList<>,
// future calls over this connection can reuse it.
auto [callback_thread, _]{SetThread(
GuardedRef{thread_context.waiter->m_mutex, thread_context.callback_threads}, &connection,
[&] { return connection.m_threads.add(kj::heap<ProxyServer<Thread>>(thread_context, std::thread{})); })};
[&] { return connection.m_threads.add(kj::heap<ProxyServer<Thread>>(connection, thread_context, std::thread{})); })};
// Call remote ThreadMap.makeThread function so server will create a
// dedicated worker thread to run function calls from this thread. Store the
@ -61,11 +63,17 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
{
const auto& params = server_context.call_context.getParams();
Context::Reader context_arg = Accessor::get(params);
auto future = kj::newPromiseAndFulfiller<typename ServerContext::CallContext>();
auto& server = server_context.proxy_server;
int req = server_context.req;
auto invoke = [fulfiller = kj::mv(future.fulfiller),
call_context = kj::mv(server_context.call_context), &server, req, fn, args...]() mutable {
// Keep a reference to the ProxyServer instance by assigning it to the self
// variable. ProxyServer instances are reference-counted and if the client
// drops its reference and the IPC call is canceled, this variable keeps the
// instance alive until the method finishes executing. The self variable
// needs to be destroyed on the event loop thread so it is freed in a sync()
// call below.
auto self = server.thisCap();
auto invoke = [self = kj::mv(self), call_context = kj::mv(server_context.call_context), &server, req, fn, args...](CancelMonitor& cancel_monitor) mutable {
MP_LOG(*server.m_context.loop, Log::Debug) << "IPC server executing request #" << req;
const auto& params = call_context.getParams();
Context::Reader context_arg = Accessor::get(params);
ServerContext server_context{server, call_context, req};
@ -90,8 +98,35 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
auto& thread_context = g_thread_context;
auto& request_threads = thread_context.request_threads;
ConnThread request_thread;
bool inserted;
bool inserted{false};
Mutex cancel_mutex;
Lock cancel_lock{cancel_mutex};
server_context.cancel_lock = &cancel_lock;
server.m_context.loop->sync([&] {
// Detect request being canceled before it executes.
if (cancel_monitor.m_canceled) {
server_context.request_canceled = true;
return;
}
// Detect request being canceled while it executes.
assert(!cancel_monitor.m_on_cancel);
cancel_monitor.m_on_cancel = [&server, &server_context, &cancel_mutex, req]() {
MP_LOG(*server.m_context.loop, Log::Info) << "IPC server request #" << req << " canceled while executing.";
// Lock cancel_mutex here to block the event loop
// thread and prevent it from deleting the request's
// params and response structs while the execution
// thread is accessing them. Because this lock is
// released before the event loop thread does delete
// the structs, the mutex does not provide any
// protection from the event loop deleting the
// structs _before_ the execution thread acquires
// it. So in addition to locking the mutex, the
// execution thread always checks request_canceled
// as well before accessing the structs.
Lock cancel_lock{cancel_mutex};
server_context.request_canceled = true;
};
// Update requests_threads map if not canceled.
std::tie(request_thread, inserted) = SetThread(
GuardedRef{thread_context.waiter->m_mutex, request_threads}, server.m_context.connection,
[&] { return context_arg.getCallbackThread(); });
@ -103,13 +138,23 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
// recursive call (IPC call calling back to the caller which
// makes another IPC call), so avoid modifying the map.
const bool erase_thread{inserted};
KJ_DEFER(if (erase_thread) {
KJ_DEFER(
// Release the cancel lock before calling loop->sync and
// waiting for the event loop thread, because if a
// cancellation happened, it needs to run the on_cancel
// callback above. It's safe to release cancel_lock at
// this point because the fn.invoke() call below will be
// finished and no longer accessing the params or
// results structs.
cancel_lock.m_lock.unlock();
// Erase the request_threads entry on the event loop
// thread with loop->sync(), so if the connection is
// broken there is not a race between this thread and
// the disconnect handler trying to destroy the thread
// client object.
server.m_context.loop->sync([&] {
auto self_dispose{kj::mv(self)};
if (erase_thread) {
// Look up the thread again without using existing
// iterator since entry may no longer be there after
// a disconnect. Destroy node after releasing
@ -121,51 +166,52 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
Lock lock(thread_context.waiter->m_mutex);
removed = request_threads.extract(server.m_context.connection);
}
}
});
});
fn.invoke(server_context, args...);
}
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
server.m_context.loop->sync([&] {
auto fulfiller_dispose = kj::mv(fulfiller);
fulfiller_dispose->fulfill(kj::mv(call_context));
});
}))
{
server.m_context.loop->sync([&]() {
auto fulfiller_dispose = kj::mv(fulfiller);
fulfiller_dispose->reject(kj::mv(*exception));
});
);
if (server_context.request_canceled) {
MP_LOG(*server.m_context.loop, Log::Info) << "IPC server request #" << req << " canceled before it could be executed";
} else KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]{
try {
fn.invoke(server_context, args...);
} catch (const InterruptException& e) {
MP_LOG(*server.m_context.loop, Log::Info) << "IPC server request #" << req << " interrupted (" << e.what() << ")";
}
})) {
MP_LOG(*server.m_context.loop, Log::Error) << "IPC server request #" << req << " uncaught exception (" << kj::str(*exception).cStr() << ")";
throw kj::mv(*exception);
}
// End of scope: if KJ_DEFER was reached, it runs here
}
return call_context;
};
// Lookup Thread object specified by the client. The specified thread should
// be a local Thread::Server object, but it needs to be looked up
// asynchronously with getLocalServer().
auto thread_client = context_arg.getThread();
return server.m_context.connection->m_threads.getLocalServer(thread_client)
auto result = server.m_context.connection->m_threads.getLocalServer(thread_client)
.then([&server, invoke = kj::mv(invoke), req](const kj::Maybe<Thread::Server&>& perhaps) mutable {
// Assuming the thread object is found, pass it a pointer to the
// `invoke` lambda above which will invoke the function on that
// thread.
KJ_IF_MAYBE (thread_server, perhaps) {
const auto& thread = static_cast<ProxyServer<Thread>&>(*thread_server);
auto& thread = static_cast<ProxyServer<Thread>&>(*thread_server);
MP_LOG(*server.m_context.loop, Log::Debug)
<< "IPC server post request #" << req << " {" << thread.m_thread_context.thread_name << "}";
if (!thread.m_thread_context.waiter->post(std::move(invoke))) {
MP_LOG(*server.m_context.loop, Log::Error)
<< "IPC server error request #" << req
<< " {" << thread.m_thread_context.thread_name << "}" << ", thread busy";
throw std::runtime_error("thread busy");
}
return thread.template post<typename ServerContext::CallContext>(std::move(invoke));
} else {
MP_LOG(*server.m_context.loop, Log::Error)
<< "IPC server error request #" << req << ", missing thread to execute request";
throw std::runtime_error("invalid thread handle");
}
})
// Wait for the invocation to finish before returning to the caller.
.then([invoke_wait = kj::mv(future.promise)]() mutable { return kj::mv(invoke_wait); });
});
// Use connection m_canceler object to cancel the result promise if the
// connection is destroyed. (By default Cap'n Proto does not cancel requests
// on disconnect, since it's possible clients might want to make requests
// and immediately disconnect without waiting for results, but not want the
// requests to be canceled.)
return server.m_context.connection->m_canceler.wrap(kj::mv(result));
}
} // namespace mp

View File

@ -9,6 +9,7 @@
#include <cassert>
#include <cstddef>
#include <cstring>
#include <exception>
#include <functional>
#include <kj/string-tree.h>
#include <mutex>
@ -209,6 +210,38 @@ void Unlock(Lock& lock, Callback&& callback)
callback();
}
//! Invoke a function and run a follow-up action before returning the original
//! result.
//!
//! This can be used similarly to KJ_DEFER to run cleanup code, but works better
//! if the cleanup function can throw because it avoids clang bug
//! https://github.com/llvm/llvm-project/issues/12658 which skips calling
//! destructors in that case and can lead to memory leaks. Also, if both
//! functions throw, this lets one exception take precedence instead of
//! terminating due to having two active exceptions.
template <typename Fn, typename After>
decltype(auto) TryFinally(Fn&& fn, After&& after)
{
bool success{false};
using R = std::invoke_result_t<Fn>;
try {
if constexpr (std::is_void_v<R>) {
std::forward<Fn>(fn)();
success = true;
std::forward<After>(after)();
return;
} else {
decltype(auto) result = std::forward<Fn>(fn)();
success = true;
std::forward<After>(after)();
return result;
}
} catch (...) {
if (!success) std::forward<After>(after)();
throw;
}
}
//! Format current thread name as "{exe_name}-{$pid}/{thread_name}-{$tid}".
std::string ThreadName(const char* exe_name);
@ -241,6 +274,69 @@ inline char* CharCast(unsigned char* c) { return (char*)c; }
inline const char* CharCast(const char* c) { return c; }
inline const char* CharCast(const unsigned char* c) { return (const char*)c; }
//! Exception thrown from code executing an IPC call that is interrupted.
struct InterruptException final : std::exception {
explicit InterruptException(std::string message) : m_message(std::move(message)) {}
const char* what() const noexcept override { return m_message.c_str(); }
std::string m_message;
};
class CancelProbe;
//! Helper class that detects when a promise is canceled. Used to detect
//! canceled requests and prevent potential crashes on unclean disconnects.
//!
//! In the future, this could also be used to support a way for wrapped C++
//! methods to detect cancellation (like approach #4 in
//! https://github.com/bitcoin/bitcoin/issues/33575).
class CancelMonitor
{
public:
inline ~CancelMonitor();
inline void promiseDestroyed(CancelProbe& probe);
bool m_canceled{false};
std::function<void()> m_on_cancel;
CancelProbe* m_probe{nullptr};
};
//! Helper object to attach to a promise and update a CancelMonitor.
class CancelProbe
{
public:
CancelProbe(CancelMonitor& monitor) : m_monitor(&monitor)
{
assert(!monitor.m_probe);
monitor.m_probe = this;
}
~CancelProbe()
{
if (m_monitor) m_monitor->promiseDestroyed(*this);
}
CancelMonitor* m_monitor;
};
CancelMonitor::~CancelMonitor()
{
if (m_probe) {
assert(m_probe->m_monitor == this);
m_probe->m_monitor = nullptr;
m_probe = nullptr;
}
}
void CancelMonitor::promiseDestroyed(CancelProbe& probe)
{
// If promise is being destroyed, assume the promise has been canceled. In
// theory this method could be called when a promise was fulfilled or
// rejected rather than canceled, but it's safe to assume that's not the
// case because the CancelMonitor class is meant to be used inside code
// fulfilling or rejecting the promise and destroyed before doing so.
assert(m_probe == &probe);
m_canceled = true;
if (m_on_cancel) m_on_cancel();
m_probe = nullptr;
}
} // namespace mp
#endif // MP_UTIL_H

View File

@ -5,22 +5,30 @@
#ifndef MP_VERSION_H
#define MP_VERSION_H
//! Major version number. Should be incremented in the master branch before
//! changes that introduce major new features or break API compatibility, if
//! there are clients relying on the previous API. (If an API changes multiple
//! times and nothing uses the intermediate changes, it is sufficient to
//! increment this only once before the first change.)
//! @file
//! @brief Major and minor version numbers
//!
//! Versioning uses a cruder form of SemVer where the major number is
//! incremented with all significant changes, regardless of whether they are
//! backward compatible, and the minor number is treated like a patch level and
//! only incremented when a fix or backport is applied to an old branch.
//! Major version number. Should be incremented after any release or external
//! usage of the library (like a subtree update) so the previous number
//! identifies that release. Should also be incremented before any change that
//! breaks backward compatibility or introduces nontrivial features, so
//! downstream code can use it to detect compatibility.
//!
//! Each time this is incremented, a new stable branch should be created. E.g.
//! when this is incremented to 8, a "v7" stable branch should be created
//! pointing at the prior merge commit. The /doc/versions.md file should also be
//! updated, noting any significant or incompatible changes made since the
//! previous version, and backported to the stable branch before it is tagged.
#define MP_MAJOR_VERSION 7
//! previous version.
#define MP_MAJOR_VERSION 8
//! Minor version number. Should be incremented in stable branches after
//! backporting changes. The /doc/versions.md file in the master and stable
//! branches should also be updated to list the new minor version.
//! backporting changes. The /doc/versions.md file should also be updated to
//! list the new minor version.
#define MP_MINOR_VERSION 0
#endif // MP_VERSION_H

View File

@ -211,6 +211,7 @@ static void Generate(kj::StringPtr src_prefix,
cpp_server << "#include <kj/async.h>\n";
cpp_server << "#include <kj/common.h>\n";
cpp_server << "#include <kj/exception.h>\n";
cpp_server << "#include <kj/tuple.h>\n";
cpp_server << "#include <mp/proxy.h>\n";
cpp_server << "#include <mp/util.h>\n";
cpp_server << "#include <" << PROXY_TYPES << ">\n";

View File

@ -38,7 +38,7 @@
namespace mp {
thread_local ThreadContext g_thread_context;
thread_local ThreadContext g_thread_context; // NOLINT(bitcoin-nontrivial-threadlocal)
void LoggingErrorHandler::taskFailed(kj::Exception&& exception)
{
@ -87,6 +87,10 @@ Connection::~Connection()
// event loop thread, and if there was a remote disconnect, this is called
// by an onDisconnect callback directly from the event loop thread.
assert(std::this_thread::get_id() == m_loop->m_thread_id);
// Try to cancel any calls that may be executing.
m_canceler.cancel("Interrupted by disconnect");
// Shut down RPC system first, since this will garbage collect any
// ProxyServer objects that were not freed before the connection was closed.
// Typically all ProxyServer objects associated with this connection will be
@ -362,8 +366,8 @@ ProxyClient<Thread>::~ProxyClient()
}
}
ProxyServer<Thread>::ProxyServer(ThreadContext& thread_context, std::thread&& thread)
: m_thread_context(thread_context), m_thread(std::move(thread))
ProxyServer<Thread>::ProxyServer(Connection& connection, ThreadContext& thread_context, std::thread&& thread)
: m_loop{*connection.m_loop}, m_thread_context(thread_context), m_thread(std::move(thread))
{
assert(m_thread_context.waiter.get() != nullptr);
}
@ -418,7 +422,7 @@ kj::Promise<void> ProxyServer<ThreadMap>::makeThread(MakeThreadContext context)
// is just waiter getting set to null.)
g_thread_context.waiter->wait(lock, [] { return !g_thread_context.waiter; });
});
auto thread_server = kj::heap<ProxyServer<Thread>>(*thread_context.get_future().get(), std::move(thread));
auto thread_server = kj::heap<ProxyServer<Thread>>(m_connection, *thread_context.get_future().get(), std::move(thread));
auto thread_client = m_connection.m_threads.add(kj::mv(thread_server));
context.getResults().setResult(kj::mv(thread_client));
return kj::READY_NOW;

View File

@ -33,6 +33,7 @@ interface FooInterface $Proxy.wrap("mp::test::FooImplementation") {
passFn @16 (context :Proxy.Context, fn :FooFn) -> (result :Int32);
callFn @17 () -> ();
callFnAsync @18 (context :Proxy.Context) -> ();
callIntFnAsync @21 (context :Proxy.Context, arg :Int32) -> (result :Int32);
}
interface FooCallback $Proxy.wrap("mp::test::FooCallback") {

View File

@ -83,7 +83,9 @@ public:
std::shared_ptr<FooCallback> m_callback;
void callFn() { assert(m_fn); m_fn(); }
void callFnAsync() { assert(m_fn); m_fn(); }
int callIntFnAsync(int arg) { assert(m_int_fn); return m_int_fn(arg); }
std::function<void()> m_fn;
std::function<int(int)> m_int_fn;
};
} // namespace test

View File

@ -8,7 +8,9 @@
#include <atomic>
#include <capnp/capability.h>
#include <capnp/rpc.h>
#include <cassert>
#include <condition_variable>
#include <cstdint>
#include <cstring>
#include <functional>
#include <future>
@ -16,9 +18,7 @@
#include <kj/async-io.h>
#include <kj/common.h>
#include <kj/debug.h>
#include <kj/exception.h>
#include <kj/memory.h>
#include <kj/string.h>
#include <kj/test.h>
#include <memory>
#include <mp/proxy.h>
@ -31,7 +31,6 @@
#include <stdexcept>
#include <string>
#include <string_view>
#include <system_error>
#include <thread>
#include <type_traits>
#include <utility>
@ -98,7 +97,7 @@ public:
client_connection->m_rpc_system->bootstrap(ServerVatId().vat_id).castAs<messages::FooInterface>(),
client_connection.get(), /* destroy_connection= */ client_owns_connection);
if (client_owns_connection) {
client_connection.release();
(void)client_connection.release();
} else {
client_disconnect = [&] { loop.sync([&] { client_connection.reset(); }); };
}
@ -317,7 +316,7 @@ KJ_TEST("Calling IPC method, disconnecting and blocking during the call")
signal.set_value();
}
KJ_TEST("Make simultaneous IPC calls to trigger 'thread busy' error")
KJ_TEST("Make simultaneous IPC calls on single remote thread")
{
TestSetup setup;
ProxyClient<messages::FooInterface>* foo = setup.client.get();
@ -336,51 +335,39 @@ KJ_TEST("Make simultaneous IPC calls to trigger 'thread busy' error")
request_thread = &tc.request_threads.at(foo->m_context.connection)->m_client;
});
setup.server->m_impl->m_fn = [&] {
try
{
signal.get_future().get();
}
catch (const std::future_error& e)
{
KJ_EXPECT(e.code() == std::make_error_code(std::future_errc::future_already_retrieved));
}
// Call callIntFnAsync 3 times with n=100, 200, 300
std::atomic<int> expected = 100;
setup.server->m_impl->m_int_fn = [&](int n) {
assert(n == expected);
expected += 100;
return n;
};
auto client{foo->m_client};
bool caught_thread_busy = false;
// NOTE: '3' was chosen because it was the lowest number
// of simultaneous calls required to reliably catch a "thread busy" error
std::atomic<size_t> running{3};
foo->m_context.loop->sync([&]
{
for (size_t i = 0; i < running; i++)
{
auto request{client.callFnAsyncRequest()};
auto request{client.callIntFnAsyncRequest()};
auto context{request.initContext()};
context.setCallbackThread(*callback_thread);
context.setThread(*request_thread);
request.setArg(100 * (i+1));
foo->m_context.loop->m_task_set->add(request.send().then(
[&](auto&& results) {
[&running, &tc, i](auto&& results) {
assert(results.getResult() == static_cast<int32_t>(100 * (i+1)));
running -= 1;
tc.waiter->m_cv.notify_all();
},
[&](kj::Exception&& e) {
KJ_EXPECT(std::string_view{e.getDescription().cStr()} ==
"remote exception: std::exception: thread busy");
caught_thread_busy = true;
running -= 1;
signal.set_value();
tc.waiter->m_cv.notify_all();
}
));
}));
}
});
{
Lock lock(tc.waiter->m_mutex);
tc.waiter->wait(lock, [&running] { return running == 0; });
}
KJ_EXPECT(caught_thread_busy);
KJ_EXPECT(expected == 400);
}
} // namespace test