mirror of
https://github.com/bitcoin/bitcoin.git
synced 2026-03-06 03:36:18 +00:00
Merge bitcoin/bitcoin#34422: Update libmultiprocess subtree to be more stable with rust IPC client
8fe91f37194edcca1b7dfdd06bd0d4f5b2154e9b test: Updates needed after bitcoin-core/libmultiprocess#240 (Ryan Ofsky)
b7ca3bf061b51108d155283c1ad503c0af7eab0d Squashed 'src/ipc/libmultiprocess/' changes from 1fc65008f7d..1868a84451f (Ryan Ofsky)
1fea3bae5cabc2cd3105bfc277f219454698b00a ipc, test: Add tests for unclean disconnect and thread busy behavior (Ryan Ofsky)
Pull request description:
Includes:
- https://github.com/bitcoin-core/libmultiprocess/pull/241
- https://github.com/bitcoin-core/libmultiprocess/pull/240
- https://github.com/bitcoin-core/libmultiprocess/pull/244
- https://github.com/bitcoin-core/libmultiprocess/pull/245
The main change is https://github.com/bitcoin-core/libmultiprocess/pull/240 which fixes issues with asynchronous requests (https://github.com/bitcoin/bitcoin/issues/33923) and unclean disconnects (https://github.com/bitcoin/bitcoin/issues/34250) that happen with the rust mining client. It also adds tests for these fixes which had some previous review in #34284 (that PR was closed to simplify dependencies between PRs).
The changes can be verified by running `test/lint/git-subtree-check.sh src/ipc/libmultiprocess` as described in [developer notes](https://github.com/bitcoin/bitcoin/blob/master/doc/developer-notes.md#subtrees) and [lint instructions](https://github.com/bitcoin/bitcoin/tree/master/test/lint#git-subtree-checksh)
Resolves #33923 and #34250
ACKs for top commit:
Sjors:
re-ACK 8fe91f37194edcca1b7dfdd06bd0d4f5b2154e9b
janb84:
reACK 8fe91f37194edcca1b7dfdd06bd0d4f5b2154e9b
Eunovo:
ACK 8fe91f3719
Tree-SHA512: 7e8923610502ebd8603bbea703f82178ab9e956874d394da3451f5268afda2b964d0eeb399a74d49c4123e728a14c27c0296118577a6063ff03b2b8203a257ce
This commit is contained in:
commit
bf9ef4f043
@ -7,40 +7,55 @@ Library versions are tracked with simple
|
||||
Versioning policy is described in the [version.h](../include/mp/version.h)
|
||||
include.
|
||||
|
||||
## v7
|
||||
## v8
|
||||
- Current unstable version in master branch.
|
||||
- Intended to be compatible with Bitcoin Core 30.1 and future releases.
|
||||
|
||||
## v7.0
|
||||
- Adds SpawnProcess race fix, cmake `target_capnp_sources` option, ci and documentation improvements.
|
||||
- Used in Bitcoin Core master branch, pulled in by [#34363](https://github.com/bitcoin/bitcoin/pull/34363).
|
||||
|
||||
## v7.0-pre1
|
||||
|
||||
- Adds support for log levels to reduce logging and "thread busy" error to avoid a crash on misuse. Fixes intermittent mptest hang and makes other minor improvements.
|
||||
- Used in Bitcoin Core 30.1 and 30.2 releases and 30.x branch. Pulled in by [#33412](https://github.com/bitcoin/bitcoin/pull/33412), [#33518](https://github.com/bitcoin/bitcoin/pull/33518), and [#33519](https://github.com/bitcoin/bitcoin/pull/33519).
|
||||
|
||||
## v6.0
|
||||
- `EventLoop::addClient` and `EventLoop::removeClient` methods dropped,
|
||||
|
||||
- Adds fixes for unclean shutdowns and thread sanitizer issues and adds CI scripts.
|
||||
- Drops `EventLoop::addClient` and `EventLoop::removeClient` methods,
|
||||
requiring clients to use new `EventLoopRef` class instead.
|
||||
- Compatible with Bitcoin Core 30.0 release.
|
||||
- Used in Bitcoin Core 30.0 release. Pulled in by [#31741](https://github.com/bitcoin/bitcoin/pull/31741), [#32641](https://github.com/bitcoin/bitcoin/pull/32641), [#32345](https://github.com/bitcoin/bitcoin/pull/32345), [#33241](https://github.com/bitcoin/bitcoin/pull/33241), and [#33322](https://github.com/bitcoin/bitcoin/pull/33322).
|
||||
|
||||
## v5.0
|
||||
- Adds build improvements and tidy/warning fixes.
|
||||
- Used in Bitcoin Core 29 releases, pulled in by [#31945](https://github.com/bitcoin/bitcoin/pull/31945).
|
||||
|
||||
## v5.0-pre1
|
||||
- Adds many improvements to Bitcoin Core mining interface: splitting up type headers, fixing shutdown bugs, adding subtree build support.
|
||||
- Broke up `proxy-types.h` into `type-*.h` files requiring clients to explicitly
|
||||
include overloads needed for C++ ↔️ Cap'n Proto type conversions.
|
||||
- Now requires C++ 20 support.
|
||||
- Compatible with Bitcoin Core 29 releases.
|
||||
- Minimum required version for Bitcoin Core 29 releases, pulled in by [#30509](https://github.com/bitcoin/bitcoin/pull/30509), [#30510](https://github.com/bitcoin/bitcoin/pull/30510), [#31105](https://github.com/bitcoin/bitcoin/pull/31105), [#31740](https://github.com/bitcoin/bitcoin/pull/31740).
|
||||
|
||||
## v4.0
|
||||
- Added better cmake support, installing cmake package files so clients do not
|
||||
need to use pkgconfig.
|
||||
- Compatible with Bitcoin Core 28 releases.
|
||||
- Used in Bitcoin Core 28 releases, pulled in by [#30490](https://github.com/bitcoin/bitcoin/pull/30490) and [#30513](https://github.com/bitcoin/bitcoin/pull/30513).
|
||||
|
||||
## v3.0
|
||||
- Dropped compatibility with Cap'n Proto versions before 0.7.
|
||||
- Compatible with Bitcoin Core 27 releases.
|
||||
- Used in Bitcoin Core 27 releases, pulled in by [#28735](https://github.com/bitcoin/bitcoin/pull/28735), [#28907](https://github.com/bitcoin/bitcoin/pull/28907), and [#29276](https://github.com/bitcoin/bitcoin/pull/29276).
|
||||
|
||||
## v2.0
|
||||
- Changed `PassField` function signature.
|
||||
- Now requires C++17 support.
|
||||
- Compatible with Bitcoin Core 25 and 26 releases.
|
||||
- Used in Bitcoin Core 25 and 26 releases, pulled in by [#26672](https://github.com/bitcoin/bitcoin/pull/26672).
|
||||
|
||||
## v1.0
|
||||
- Dropped hardcoded includes in generated files, now requiring `include` and
|
||||
`includeTypes` annotations.
|
||||
- Compatible with Bitcoin Core 22, 23, and 24 releases.
|
||||
- Used in Bitcoin Core 22, 23, and 24 releases, pulled in by [#19160](https://github.com/bitcoin/bitcoin/pull/19160).
|
||||
|
||||
## v0.0
|
||||
- Initial version used in a downstream release.
|
||||
- Compatible with Bitcoin Core 21 releases.
|
||||
- Used in Bitcoin Core 21 releases, pulled in by [#16367](https://github.com/bitcoin/bitcoin/pull/16367), [#18588](https://github.com/bitcoin/bitcoin/pull/18588), and [#18677](https://github.com/bitcoin/bitcoin/pull/18677).
|
||||
|
||||
@ -48,6 +48,19 @@ struct ServerInvokeContext : InvokeContext
|
||||
ProxyServer& proxy_server;
|
||||
CallContext& call_context;
|
||||
int req;
|
||||
//! For IPC methods that execute asynchronously, not on the event-loop
|
||||
//! thread: lock preventing the event-loop thread from freeing the params or
|
||||
//! results structs if the request is canceled while the worker thread is
|
||||
//! reading params (`call_context.getParams()`) or writing results
|
||||
//! (`call_context.getResults()`).
|
||||
Lock* cancel_lock{nullptr};
|
||||
//! For IPC methods that execute asynchronously, not on the event-loop
|
||||
//! thread, this is set to true if the IPC call was canceled by the client
|
||||
//! or canceled by a disconnection. If the call runs on the event-loop
|
||||
//! thread, it can't be canceled. This should be accessed with cancel_lock
|
||||
//! held if it is not null, since in the asynchronous case it is accessed
|
||||
//! from multiple threads.
|
||||
bool request_canceled{false};
|
||||
|
||||
ServerInvokeContext(ProxyServer& proxy_server, CallContext& call_context, int req)
|
||||
: InvokeContext{*proxy_server.m_context.connection}, proxy_server{proxy_server}, call_context{call_context}, req{req}
|
||||
@ -82,11 +95,23 @@ template <>
|
||||
struct ProxyServer<Thread> final : public Thread::Server
|
||||
{
|
||||
public:
|
||||
ProxyServer(ThreadContext& thread_context, std::thread&& thread);
|
||||
ProxyServer(Connection& connection, ThreadContext& thread_context, std::thread&& thread);
|
||||
~ProxyServer();
|
||||
kj::Promise<void> getName(GetNameContext context) override;
|
||||
|
||||
//! Run a callback function fn returning T on this thread. The function will
|
||||
//! be queued and executed as soon as the thread is idle, and when fn
|
||||
//! returns, the promise returned by this method will be fulfilled with the
|
||||
//! value fn returned.
|
||||
template<typename T, typename Fn>
|
||||
kj::Promise<T> post(Fn&& fn);
|
||||
|
||||
EventLoopRef m_loop;
|
||||
ThreadContext& m_thread_context;
|
||||
std::thread m_thread;
|
||||
//! Promise signaled when m_thread_context.waiter is ready and there is no
|
||||
//! post() callback function waiting to execute.
|
||||
kj::Promise<void> m_thread_ready{kj::READY_NOW};
|
||||
};
|
||||
|
||||
//! Handler for kj::TaskSet failed task events.
|
||||
@ -322,7 +347,12 @@ public:
|
||||
//! thread is blocked waiting for server response, this is what allows the
|
||||
//! client to run the request in the same thread, the same way code would run in a
|
||||
//! single process, with the callback sharing the same thread stack as the original
|
||||
//! call.)
|
||||
//! call.) To support this, the clientInvoke function calls Waiter::wait() to
|
||||
//! block the client IPC thread while initial request is in progress. Then if
|
||||
//! there is a callback, it is executed with Waiter::post().
|
||||
//!
|
||||
//! The Waiter class is also used server-side by `ProxyServer<Thread>::post()`
|
||||
//! to execute IPC calls on worker threads.
|
||||
struct Waiter
|
||||
{
|
||||
Waiter() = default;
|
||||
@ -404,11 +434,11 @@ public:
|
||||
template <typename F>
|
||||
void onDisconnect(F&& f)
|
||||
{
|
||||
// Add disconnect handler to local TaskSet to ensure it is cancelled and
|
||||
// Add disconnect handler to local TaskSet to ensure it is canceled and
|
||||
// will never run after connection object is destroyed. But when disconnect
|
||||
// handler fires, do not call the function f right away, instead add it
|
||||
// to the EventLoop TaskSet to avoid "Promise callback destroyed itself"
|
||||
// error in cases where f deletes this Connection object.
|
||||
// error in the typical case where f deletes this Connection object.
|
||||
m_on_disconnect.add(m_network.onDisconnect().then(
|
||||
[f = std::forward<F>(f), this]() mutable { m_loop->m_task_set->add(kj::evalLater(kj::mv(f))); }));
|
||||
}
|
||||
@ -416,6 +446,9 @@ public:
|
||||
EventLoopRef m_loop;
|
||||
kj::Own<kj::AsyncIoStream> m_stream;
|
||||
LoggingErrorHandler m_error_handler{*m_loop};
|
||||
//! TaskSet used to cancel the m_network.onDisconnect() handler for remote
|
||||
//! disconnections, if the connection is closed locally first by deleting
|
||||
//! this Connection object.
|
||||
kj::TaskSet m_on_disconnect{m_error_handler};
|
||||
::capnp::TwoPartyVatNetwork m_network;
|
||||
std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId>> m_rpc_system;
|
||||
@ -428,6 +461,11 @@ public:
|
||||
//! ThreadMap.makeThread) used to service requests to clients.
|
||||
::capnp::CapabilityServerSet<Thread> m_threads;
|
||||
|
||||
//! Canceler for canceling promises that we want to discard when the
|
||||
//! connection is destroyed. This is used to interrupt method calls that are
|
||||
//! still executing at time of disconnection.
|
||||
kj::Canceler m_canceler;
|
||||
|
||||
//! Cleanup functions to run if connection is broken unexpectedly. List
|
||||
//! will be empty if all ProxyClient are destroyed cleanly before the
|
||||
//! connection is destroyed.
|
||||
@ -675,6 +713,70 @@ struct ThreadContext
|
||||
bool loop_thread = false;
|
||||
};
|
||||
|
||||
template<typename T, typename Fn>
|
||||
kj::Promise<T> ProxyServer<Thread>::post(Fn&& fn)
|
||||
{
|
||||
auto ready = kj::newPromiseAndFulfiller<void>(); // Signaled when waiter is ready to post again.
|
||||
auto cancel_monitor_ptr = kj::heap<CancelMonitor>();
|
||||
CancelMonitor& cancel_monitor = *cancel_monitor_ptr;
|
||||
// Keep a reference to the ProxyServer<Thread> instance by assigning it to
|
||||
// the self variable. ProxyServer instances are reference-counted and if the
|
||||
// client drops its reference, this variable keeps the instance alive until
|
||||
// the thread finishes executing. The self variable needs to be destroyed on
|
||||
// the event loop thread so it is freed in a sync() call below.
|
||||
auto self = thisCap();
|
||||
auto ret = m_thread_ready.then([this, self = std::move(self), fn = std::forward<Fn>(fn), ready_fulfiller = kj::mv(ready.fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable {
|
||||
auto result = kj::newPromiseAndFulfiller<T>(); // Signaled when fn() is called, with its return value.
|
||||
bool posted = m_thread_context.waiter->post([this, self = std::move(self), fn = std::forward<Fn>(fn), ready_fulfiller = kj::mv(ready_fulfiller), result_fulfiller = kj::mv(result.fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable {
|
||||
// Fulfill ready.promise now, as soon as the Waiter starts executing
|
||||
// this lambda, so the next ProxyServer<Thread>::post() call can
|
||||
// immediately call waiter->post(). It is important to do this
|
||||
// before calling fn() because fn() can make an IPC call back to the
|
||||
// client, which can make another IPC call to this server thread.
|
||||
// (This typically happens when IPC methods take std::function
|
||||
// parameters.) When this happens the second call to the server
|
||||
// thread should not be blocked waiting for the first call.
|
||||
m_loop->sync([ready_fulfiller = kj::mv(ready_fulfiller)]() mutable {
|
||||
ready_fulfiller->fulfill();
|
||||
ready_fulfiller = nullptr;
|
||||
});
|
||||
std::optional<T> result_value;
|
||||
kj::Maybe<kj::Exception> exception{kj::runCatchingExceptions([&]{ result_value.emplace(fn(*cancel_monitor_ptr)); })};
|
||||
m_loop->sync([this, &result_value, &exception, self = kj::mv(self), result_fulfiller = kj::mv(result_fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable {
|
||||
// Destroy CancelMonitor here before fulfilling or rejecting the
|
||||
// promise so it doesn't get triggered when the promise is
|
||||
// destroyed.
|
||||
cancel_monitor_ptr = nullptr;
|
||||
// Send results to the fulfiller. Technically it would be ok to
|
||||
// skip this if promise was canceled, but it's simpler to just
|
||||
// do it unconditionally.
|
||||
KJ_IF_MAYBE(e, exception) {
|
||||
assert(!result_value);
|
||||
result_fulfiller->reject(kj::mv(*e));
|
||||
} else {
|
||||
assert(result_value);
|
||||
result_fulfiller->fulfill(kj::mv(*result_value));
|
||||
result_value.reset();
|
||||
}
|
||||
result_fulfiller = nullptr;
|
||||
// Use evalLater to destroy the ProxyServer<Thread> self
|
||||
// reference, if it is the last reference, because the
|
||||
// ProxyServer<Thread> destructor needs to join the thread,
|
||||
// which can't happen until this sync() block has exited.
|
||||
m_loop->m_task_set->add(kj::evalLater([self = kj::mv(self)] {}));
|
||||
});
|
||||
});
|
||||
// Assert that calling Waiter::post did not fail. It could only return
|
||||
// false if a new function was posted before the previous one finished
|
||||
// executing, but new functions are only posted when m_thread_ready is
|
||||
// signaled, so this should never happen.
|
||||
assert(posted);
|
||||
return kj::mv(result.promise);
|
||||
}).attach(kj::heap<CancelProbe>(cancel_monitor));
|
||||
m_thread_ready = kj::mv(ready.promise);
|
||||
return ret;
|
||||
}
|
||||
|
||||
//! Given stream file descriptor, make a new ProxyClient object to send requests
|
||||
//! over the stream. Also create a new Connection object embedded in the
|
||||
//! client that is freed when the client is closed.
|
||||
|
||||
@ -445,9 +445,36 @@ struct ServerCall
|
||||
template <typename ServerContext, typename... Args>
|
||||
decltype(auto) invoke(ServerContext& server_context, TypeList<>, Args&&... args) const
|
||||
{
|
||||
return ProxyServerMethodTraits<typename decltype(server_context.call_context.getParams())::Reads>::invoke(
|
||||
server_context,
|
||||
std::forward<Args>(args)...);
|
||||
// If cancel_lock is set, release it while executing the method, and
|
||||
// reacquire it afterwards. The lock is needed to prevent params and
|
||||
// response structs from being deleted by the event loop thread if the
|
||||
// request is canceled, so it is only needed before and after method
|
||||
// execution. It is important to release the lock during execution
|
||||
// because the method can take arbitrarily long to return and the event
|
||||
// loop will need the lock itself in on_cancel if the call is canceled.
|
||||
if (server_context.cancel_lock) server_context.cancel_lock->m_lock.unlock();
|
||||
return TryFinally(
|
||||
[&]() -> decltype(auto) {
|
||||
return ProxyServerMethodTraits<
|
||||
typename decltype(server_context.call_context.getParams())::Reads
|
||||
>::invoke(server_context, std::forward<Args>(args)...);
|
||||
},
|
||||
[&] {
|
||||
if (server_context.cancel_lock) server_context.cancel_lock->m_lock.lock();
|
||||
// If the IPC request was canceled, throw InterruptException
|
||||
// because there is no point continuing and trying to fill the
|
||||
// call_context.getResults() struct. It's also important to stop
|
||||
// executing because the connection may have been destroyed as
|
||||
// described in https://github.com/bitcoin/bitcoin/issues/34250
|
||||
// and there could be invalid references to the destroyed
|
||||
// Connection object if this continued.
|
||||
// If the IPC method itself threw an exception, the
|
||||
// InterruptException thrown below will take precedence over it.
|
||||
// Since the call has been canceled that exception can't be
|
||||
// returned to the caller, so it needs to be discarded like
|
||||
// other result values.
|
||||
if (server_context.request_canceled) throw InterruptException{"canceled"};
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@ -153,6 +153,7 @@ public:
|
||||
ProxyServerBase(std::shared_ptr<Impl> impl, Connection& connection);
|
||||
virtual ~ProxyServerBase();
|
||||
void invokeDestroy();
|
||||
using Interface_::Server::thisCap;
|
||||
|
||||
/**
|
||||
* Implementation pointer that may or may not be owned and deleted when this
|
||||
|
||||
@ -8,6 +8,8 @@
|
||||
#include <mp/proxy-io.h>
|
||||
#include <mp/util.h>
|
||||
|
||||
#include <kj/string.h>
|
||||
|
||||
namespace mp {
|
||||
template <typename Output>
|
||||
void CustomBuildField(TypeList<>,
|
||||
@ -26,7 +28,7 @@ void CustomBuildField(TypeList<>,
|
||||
// future calls over this connection can reuse it.
|
||||
auto [callback_thread, _]{SetThread(
|
||||
GuardedRef{thread_context.waiter->m_mutex, thread_context.callback_threads}, &connection,
|
||||
[&] { return connection.m_threads.add(kj::heap<ProxyServer<Thread>>(thread_context, std::thread{})); })};
|
||||
[&] { return connection.m_threads.add(kj::heap<ProxyServer<Thread>>(connection, thread_context, std::thread{})); })};
|
||||
|
||||
// Call remote ThreadMap.makeThread function so server will create a
|
||||
// dedicated worker thread to run function calls from this thread. Store the
|
||||
@ -61,11 +63,17 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
|
||||
{
|
||||
const auto& params = server_context.call_context.getParams();
|
||||
Context::Reader context_arg = Accessor::get(params);
|
||||
auto future = kj::newPromiseAndFulfiller<typename ServerContext::CallContext>();
|
||||
auto& server = server_context.proxy_server;
|
||||
int req = server_context.req;
|
||||
auto invoke = [fulfiller = kj::mv(future.fulfiller),
|
||||
call_context = kj::mv(server_context.call_context), &server, req, fn, args...]() mutable {
|
||||
// Keep a reference to the ProxyServer instance by assigning it to the self
|
||||
// variable. ProxyServer instances are reference-counted and if the client
|
||||
// drops its reference and the IPC call is canceled, this variable keeps the
|
||||
// instance alive until the method finishes executing. The self variable
|
||||
// needs to be destroyed on the event loop thread so it is freed in a sync()
|
||||
// call below.
|
||||
auto self = server.thisCap();
|
||||
auto invoke = [self = kj::mv(self), call_context = kj::mv(server_context.call_context), &server, req, fn, args...](CancelMonitor& cancel_monitor) mutable {
|
||||
MP_LOG(*server.m_context.loop, Log::Debug) << "IPC server executing request #" << req;
|
||||
const auto& params = call_context.getParams();
|
||||
Context::Reader context_arg = Accessor::get(params);
|
||||
ServerContext server_context{server, call_context, req};
|
||||
@ -90,8 +98,35 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
|
||||
auto& thread_context = g_thread_context;
|
||||
auto& request_threads = thread_context.request_threads;
|
||||
ConnThread request_thread;
|
||||
bool inserted;
|
||||
bool inserted{false};
|
||||
Mutex cancel_mutex;
|
||||
Lock cancel_lock{cancel_mutex};
|
||||
server_context.cancel_lock = &cancel_lock;
|
||||
server.m_context.loop->sync([&] {
|
||||
// Detect request being canceled before it executes.
|
||||
if (cancel_monitor.m_canceled) {
|
||||
server_context.request_canceled = true;
|
||||
return;
|
||||
}
|
||||
// Detect request being canceled while it executes.
|
||||
assert(!cancel_monitor.m_on_cancel);
|
||||
cancel_monitor.m_on_cancel = [&server, &server_context, &cancel_mutex, req]() {
|
||||
MP_LOG(*server.m_context.loop, Log::Info) << "IPC server request #" << req << " canceled while executing.";
|
||||
// Lock cancel_mutex here to block the event loop
|
||||
// thread and prevent it from deleting the request's
|
||||
// params and response structs while the execution
|
||||
// thread is accessing them. Because this lock is
|
||||
// released before the event loop thread does delete
|
||||
// the structs, the mutex does not provide any
|
||||
// protection from the event loop deleting the
|
||||
// structs _before_ the execution thread acquires
|
||||
// it. So in addition to locking the mutex, the
|
||||
// execution thread always checks request_canceled
|
||||
// as well before accessing the structs.
|
||||
Lock cancel_lock{cancel_mutex};
|
||||
server_context.request_canceled = true;
|
||||
};
|
||||
// Update requests_threads map if not canceled.
|
||||
std::tie(request_thread, inserted) = SetThread(
|
||||
GuardedRef{thread_context.waiter->m_mutex, request_threads}, server.m_context.connection,
|
||||
[&] { return context_arg.getCallbackThread(); });
|
||||
@ -103,13 +138,23 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
|
||||
// recursive call (IPC call calling back to the caller which
|
||||
// makes another IPC call), so avoid modifying the map.
|
||||
const bool erase_thread{inserted};
|
||||
KJ_DEFER(if (erase_thread) {
|
||||
KJ_DEFER(
|
||||
// Release the cancel lock before calling loop->sync and
|
||||
// waiting for the event loop thread, because if a
|
||||
// cancellation happened, it needs to run the on_cancel
|
||||
// callback above. It's safe to release cancel_lock at
|
||||
// this point because the fn.invoke() call below will be
|
||||
// finished and no longer accessing the params or
|
||||
// results structs.
|
||||
cancel_lock.m_lock.unlock();
|
||||
// Erase the request_threads entry on the event loop
|
||||
// thread with loop->sync(), so if the connection is
|
||||
// broken there is not a race between this thread and
|
||||
// the disconnect handler trying to destroy the thread
|
||||
// client object.
|
||||
server.m_context.loop->sync([&] {
|
||||
auto self_dispose{kj::mv(self)};
|
||||
if (erase_thread) {
|
||||
// Look up the thread again without using existing
|
||||
// iterator since entry may no longer be there after
|
||||
// a disconnect. Destroy node after releasing
|
||||
@ -121,51 +166,52 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
|
||||
Lock lock(thread_context.waiter->m_mutex);
|
||||
removed = request_threads.extract(server.m_context.connection);
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
fn.invoke(server_context, args...);
|
||||
}
|
||||
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
|
||||
server.m_context.loop->sync([&] {
|
||||
auto fulfiller_dispose = kj::mv(fulfiller);
|
||||
fulfiller_dispose->fulfill(kj::mv(call_context));
|
||||
});
|
||||
}))
|
||||
{
|
||||
server.m_context.loop->sync([&]() {
|
||||
auto fulfiller_dispose = kj::mv(fulfiller);
|
||||
fulfiller_dispose->reject(kj::mv(*exception));
|
||||
});
|
||||
);
|
||||
if (server_context.request_canceled) {
|
||||
MP_LOG(*server.m_context.loop, Log::Info) << "IPC server request #" << req << " canceled before it could be executed";
|
||||
} else KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]{
|
||||
try {
|
||||
fn.invoke(server_context, args...);
|
||||
} catch (const InterruptException& e) {
|
||||
MP_LOG(*server.m_context.loop, Log::Info) << "IPC server request #" << req << " interrupted (" << e.what() << ")";
|
||||
}
|
||||
})) {
|
||||
MP_LOG(*server.m_context.loop, Log::Error) << "IPC server request #" << req << " uncaught exception (" << kj::str(*exception).cStr() << ")";
|
||||
throw kj::mv(*exception);
|
||||
}
|
||||
// End of scope: if KJ_DEFER was reached, it runs here
|
||||
}
|
||||
return call_context;
|
||||
};
|
||||
|
||||
// Lookup Thread object specified by the client. The specified thread should
|
||||
// be a local Thread::Server object, but it needs to be looked up
|
||||
// asynchronously with getLocalServer().
|
||||
auto thread_client = context_arg.getThread();
|
||||
return server.m_context.connection->m_threads.getLocalServer(thread_client)
|
||||
auto result = server.m_context.connection->m_threads.getLocalServer(thread_client)
|
||||
.then([&server, invoke = kj::mv(invoke), req](const kj::Maybe<Thread::Server&>& perhaps) mutable {
|
||||
// Assuming the thread object is found, pass it a pointer to the
|
||||
// `invoke` lambda above which will invoke the function on that
|
||||
// thread.
|
||||
KJ_IF_MAYBE (thread_server, perhaps) {
|
||||
const auto& thread = static_cast<ProxyServer<Thread>&>(*thread_server);
|
||||
auto& thread = static_cast<ProxyServer<Thread>&>(*thread_server);
|
||||
MP_LOG(*server.m_context.loop, Log::Debug)
|
||||
<< "IPC server post request #" << req << " {" << thread.m_thread_context.thread_name << "}";
|
||||
if (!thread.m_thread_context.waiter->post(std::move(invoke))) {
|
||||
MP_LOG(*server.m_context.loop, Log::Error)
|
||||
<< "IPC server error request #" << req
|
||||
<< " {" << thread.m_thread_context.thread_name << "}" << ", thread busy";
|
||||
throw std::runtime_error("thread busy");
|
||||
}
|
||||
return thread.template post<typename ServerContext::CallContext>(std::move(invoke));
|
||||
} else {
|
||||
MP_LOG(*server.m_context.loop, Log::Error)
|
||||
<< "IPC server error request #" << req << ", missing thread to execute request";
|
||||
throw std::runtime_error("invalid thread handle");
|
||||
}
|
||||
})
|
||||
// Wait for the invocation to finish before returning to the caller.
|
||||
.then([invoke_wait = kj::mv(future.promise)]() mutable { return kj::mv(invoke_wait); });
|
||||
});
|
||||
// Use connection m_canceler object to cancel the result promise if the
|
||||
// connection is destroyed. (By default Cap'n Proto does not cancel requests
|
||||
// on disconnect, since it's possible clients might want to make requests
|
||||
// and immediately disconnect without waiting for results, but not want the
|
||||
// requests to be canceled.)
|
||||
return server.m_context.connection->m_canceler.wrap(kj::mv(result));
|
||||
}
|
||||
} // namespace mp
|
||||
|
||||
|
||||
@ -9,6 +9,7 @@
|
||||
#include <cassert>
|
||||
#include <cstddef>
|
||||
#include <cstring>
|
||||
#include <exception>
|
||||
#include <functional>
|
||||
#include <kj/string-tree.h>
|
||||
#include <mutex>
|
||||
@ -209,6 +210,38 @@ void Unlock(Lock& lock, Callback&& callback)
|
||||
callback();
|
||||
}
|
||||
|
||||
//! Invoke a function and run a follow-up action before returning the original
|
||||
//! result.
|
||||
//!
|
||||
//! This can be used similarly to KJ_DEFER to run cleanup code, but works better
|
||||
//! if the cleanup function can throw because it avoids clang bug
|
||||
//! https://github.com/llvm/llvm-project/issues/12658 which skips calling
|
||||
//! destructors in that case and can lead to memory leaks. Also, if both
|
||||
//! functions throw, this lets one exception take precedence instead of
|
||||
//! terminating due to having two active exceptions.
|
||||
template <typename Fn, typename After>
|
||||
decltype(auto) TryFinally(Fn&& fn, After&& after)
|
||||
{
|
||||
bool success{false};
|
||||
using R = std::invoke_result_t<Fn>;
|
||||
try {
|
||||
if constexpr (std::is_void_v<R>) {
|
||||
std::forward<Fn>(fn)();
|
||||
success = true;
|
||||
std::forward<After>(after)();
|
||||
return;
|
||||
} else {
|
||||
decltype(auto) result = std::forward<Fn>(fn)();
|
||||
success = true;
|
||||
std::forward<After>(after)();
|
||||
return result;
|
||||
}
|
||||
} catch (...) {
|
||||
if (!success) std::forward<After>(after)();
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
//! Format current thread name as "{exe_name}-{$pid}/{thread_name}-{$tid}".
|
||||
std::string ThreadName(const char* exe_name);
|
||||
|
||||
@ -241,6 +274,69 @@ inline char* CharCast(unsigned char* c) { return (char*)c; }
|
||||
inline const char* CharCast(const char* c) { return c; }
|
||||
inline const char* CharCast(const unsigned char* c) { return (const char*)c; }
|
||||
|
||||
//! Exception thrown from code executing an IPC call that is interrupted.
|
||||
struct InterruptException final : std::exception {
|
||||
explicit InterruptException(std::string message) : m_message(std::move(message)) {}
|
||||
const char* what() const noexcept override { return m_message.c_str(); }
|
||||
std::string m_message;
|
||||
};
|
||||
|
||||
class CancelProbe;
|
||||
|
||||
//! Helper class that detects when a promise is canceled. Used to detect
|
||||
//! canceled requests and prevent potential crashes on unclean disconnects.
|
||||
//!
|
||||
//! In the future, this could also be used to support a way for wrapped C++
|
||||
//! methods to detect cancellation (like approach #4 in
|
||||
//! https://github.com/bitcoin/bitcoin/issues/33575).
|
||||
class CancelMonitor
|
||||
{
|
||||
public:
|
||||
inline ~CancelMonitor();
|
||||
inline void promiseDestroyed(CancelProbe& probe);
|
||||
|
||||
bool m_canceled{false};
|
||||
std::function<void()> m_on_cancel;
|
||||
CancelProbe* m_probe{nullptr};
|
||||
};
|
||||
|
||||
//! Helper object to attach to a promise and update a CancelMonitor.
|
||||
class CancelProbe
|
||||
{
|
||||
public:
|
||||
CancelProbe(CancelMonitor& monitor) : m_monitor(&monitor)
|
||||
{
|
||||
assert(!monitor.m_probe);
|
||||
monitor.m_probe = this;
|
||||
}
|
||||
~CancelProbe()
|
||||
{
|
||||
if (m_monitor) m_monitor->promiseDestroyed(*this);
|
||||
}
|
||||
CancelMonitor* m_monitor;
|
||||
};
|
||||
|
||||
CancelMonitor::~CancelMonitor()
|
||||
{
|
||||
if (m_probe) {
|
||||
assert(m_probe->m_monitor == this);
|
||||
m_probe->m_monitor = nullptr;
|
||||
m_probe = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
void CancelMonitor::promiseDestroyed(CancelProbe& probe)
|
||||
{
|
||||
// If promise is being destroyed, assume the promise has been canceled. In
|
||||
// theory this method could be called when a promise was fulfilled or
|
||||
// rejected rather than canceled, but it's safe to assume that's not the
|
||||
// case because the CancelMonitor class is meant to be used inside code
|
||||
// fulfilling or rejecting the promise and destroyed before doing so.
|
||||
assert(m_probe == &probe);
|
||||
m_canceled = true;
|
||||
if (m_on_cancel) m_on_cancel();
|
||||
m_probe = nullptr;
|
||||
}
|
||||
} // namespace mp
|
||||
|
||||
#endif // MP_UTIL_H
|
||||
|
||||
@ -5,22 +5,30 @@
|
||||
#ifndef MP_VERSION_H
|
||||
#define MP_VERSION_H
|
||||
|
||||
//! Major version number. Should be incremented in the master branch before
|
||||
//! changes that introduce major new features or break API compatibility, if
|
||||
//! there are clients relying on the previous API. (If an API changes multiple
|
||||
//! times and nothing uses the intermediate changes, it is sufficient to
|
||||
//! increment this only once before the first change.)
|
||||
//! @file
|
||||
//! @brief Major and minor version numbers
|
||||
//!
|
||||
//! Versioning uses a cruder form of SemVer where the major number is
|
||||
//! incremented with all significant changes, regardless of whether they are
|
||||
//! backward compatible, and the minor number is treated like a patch level and
|
||||
//! only incremented when a fix or backport is applied to an old branch.
|
||||
|
||||
//! Major version number. Should be incremented after any release or external
|
||||
//! usage of the library (like a subtree update) so the previous number
|
||||
//! identifies that release. Should also be incremented before any change that
|
||||
//! breaks backward compatibility or introduces nontrivial features, so
|
||||
//! downstream code can use it to detect compatibility.
|
||||
//!
|
||||
//! Each time this is incremented, a new stable branch should be created. E.g.
|
||||
//! when this is incremented to 8, a "v7" stable branch should be created
|
||||
//! pointing at the prior merge commit. The /doc/versions.md file should also be
|
||||
//! updated, noting any significant or incompatible changes made since the
|
||||
//! previous version, and backported to the stable branch before it is tagged.
|
||||
#define MP_MAJOR_VERSION 7
|
||||
//! previous version.
|
||||
#define MP_MAJOR_VERSION 8
|
||||
|
||||
//! Minor version number. Should be incremented in stable branches after
|
||||
//! backporting changes. The /doc/versions.md file in the master and stable
|
||||
//! branches should also be updated to list the new minor version.
|
||||
//! backporting changes. The /doc/versions.md file should also be updated to
|
||||
//! list the new minor version.
|
||||
#define MP_MINOR_VERSION 0
|
||||
|
||||
#endif // MP_VERSION_H
|
||||
|
||||
@ -211,6 +211,7 @@ static void Generate(kj::StringPtr src_prefix,
|
||||
cpp_server << "#include <kj/async.h>\n";
|
||||
cpp_server << "#include <kj/common.h>\n";
|
||||
cpp_server << "#include <kj/exception.h>\n";
|
||||
cpp_server << "#include <kj/tuple.h>\n";
|
||||
cpp_server << "#include <mp/proxy.h>\n";
|
||||
cpp_server << "#include <mp/util.h>\n";
|
||||
cpp_server << "#include <" << PROXY_TYPES << ">\n";
|
||||
|
||||
@ -38,7 +38,7 @@
|
||||
|
||||
namespace mp {
|
||||
|
||||
thread_local ThreadContext g_thread_context;
|
||||
thread_local ThreadContext g_thread_context; // NOLINT(bitcoin-nontrivial-threadlocal)
|
||||
|
||||
void LoggingErrorHandler::taskFailed(kj::Exception&& exception)
|
||||
{
|
||||
@ -87,6 +87,10 @@ Connection::~Connection()
|
||||
// event loop thread, and if there was a remote disconnect, this is called
|
||||
// by an onDisconnect callback directly from the event loop thread.
|
||||
assert(std::this_thread::get_id() == m_loop->m_thread_id);
|
||||
|
||||
// Try to cancel any calls that may be executing.
|
||||
m_canceler.cancel("Interrupted by disconnect");
|
||||
|
||||
// Shut down RPC system first, since this will garbage collect any
|
||||
// ProxyServer objects that were not freed before the connection was closed.
|
||||
// Typically all ProxyServer objects associated with this connection will be
|
||||
@ -362,8 +366,8 @@ ProxyClient<Thread>::~ProxyClient()
|
||||
}
|
||||
}
|
||||
|
||||
ProxyServer<Thread>::ProxyServer(ThreadContext& thread_context, std::thread&& thread)
|
||||
: m_thread_context(thread_context), m_thread(std::move(thread))
|
||||
ProxyServer<Thread>::ProxyServer(Connection& connection, ThreadContext& thread_context, std::thread&& thread)
|
||||
: m_loop{*connection.m_loop}, m_thread_context(thread_context), m_thread(std::move(thread))
|
||||
{
|
||||
assert(m_thread_context.waiter.get() != nullptr);
|
||||
}
|
||||
@ -418,7 +422,7 @@ kj::Promise<void> ProxyServer<ThreadMap>::makeThread(MakeThreadContext context)
|
||||
// is just waiter getting set to null.)
|
||||
g_thread_context.waiter->wait(lock, [] { return !g_thread_context.waiter; });
|
||||
});
|
||||
auto thread_server = kj::heap<ProxyServer<Thread>>(*thread_context.get_future().get(), std::move(thread));
|
||||
auto thread_server = kj::heap<ProxyServer<Thread>>(m_connection, *thread_context.get_future().get(), std::move(thread));
|
||||
auto thread_client = m_connection.m_threads.add(kj::mv(thread_server));
|
||||
context.getResults().setResult(kj::mv(thread_client));
|
||||
return kj::READY_NOW;
|
||||
|
||||
@ -33,6 +33,7 @@ interface FooInterface $Proxy.wrap("mp::test::FooImplementation") {
|
||||
passFn @16 (context :Proxy.Context, fn :FooFn) -> (result :Int32);
|
||||
callFn @17 () -> ();
|
||||
callFnAsync @18 (context :Proxy.Context) -> ();
|
||||
callIntFnAsync @21 (context :Proxy.Context, arg :Int32) -> (result :Int32);
|
||||
}
|
||||
|
||||
interface FooCallback $Proxy.wrap("mp::test::FooCallback") {
|
||||
|
||||
@ -83,7 +83,9 @@ public:
|
||||
std::shared_ptr<FooCallback> m_callback;
|
||||
void callFn() { assert(m_fn); m_fn(); }
|
||||
void callFnAsync() { assert(m_fn); m_fn(); }
|
||||
int callIntFnAsync(int arg) { assert(m_int_fn); return m_int_fn(arg); }
|
||||
std::function<void()> m_fn;
|
||||
std::function<int(int)> m_int_fn;
|
||||
};
|
||||
|
||||
} // namespace test
|
||||
|
||||
@ -8,7 +8,9 @@
|
||||
#include <atomic>
|
||||
#include <capnp/capability.h>
|
||||
#include <capnp/rpc.h>
|
||||
#include <cassert>
|
||||
#include <condition_variable>
|
||||
#include <cstdint>
|
||||
#include <cstring>
|
||||
#include <functional>
|
||||
#include <future>
|
||||
@ -16,9 +18,7 @@
|
||||
#include <kj/async-io.h>
|
||||
#include <kj/common.h>
|
||||
#include <kj/debug.h>
|
||||
#include <kj/exception.h>
|
||||
#include <kj/memory.h>
|
||||
#include <kj/string.h>
|
||||
#include <kj/test.h>
|
||||
#include <memory>
|
||||
#include <mp/proxy.h>
|
||||
@ -31,7 +31,6 @@
|
||||
#include <stdexcept>
|
||||
#include <string>
|
||||
#include <string_view>
|
||||
#include <system_error>
|
||||
#include <thread>
|
||||
#include <type_traits>
|
||||
#include <utility>
|
||||
@ -98,7 +97,7 @@ public:
|
||||
client_connection->m_rpc_system->bootstrap(ServerVatId().vat_id).castAs<messages::FooInterface>(),
|
||||
client_connection.get(), /* destroy_connection= */ client_owns_connection);
|
||||
if (client_owns_connection) {
|
||||
client_connection.release();
|
||||
(void)client_connection.release();
|
||||
} else {
|
||||
client_disconnect = [&] { loop.sync([&] { client_connection.reset(); }); };
|
||||
}
|
||||
@ -317,7 +316,7 @@ KJ_TEST("Calling IPC method, disconnecting and blocking during the call")
|
||||
signal.set_value();
|
||||
}
|
||||
|
||||
KJ_TEST("Make simultaneous IPC calls to trigger 'thread busy' error")
|
||||
KJ_TEST("Make simultaneous IPC calls on single remote thread")
|
||||
{
|
||||
TestSetup setup;
|
||||
ProxyClient<messages::FooInterface>* foo = setup.client.get();
|
||||
@ -336,51 +335,39 @@ KJ_TEST("Make simultaneous IPC calls to trigger 'thread busy' error")
|
||||
request_thread = &tc.request_threads.at(foo->m_context.connection)->m_client;
|
||||
});
|
||||
|
||||
setup.server->m_impl->m_fn = [&] {
|
||||
try
|
||||
{
|
||||
signal.get_future().get();
|
||||
}
|
||||
catch (const std::future_error& e)
|
||||
{
|
||||
KJ_EXPECT(e.code() == std::make_error_code(std::future_errc::future_already_retrieved));
|
||||
}
|
||||
// Call callIntFnAsync 3 times with n=100, 200, 300
|
||||
std::atomic<int> expected = 100;
|
||||
|
||||
setup.server->m_impl->m_int_fn = [&](int n) {
|
||||
assert(n == expected);
|
||||
expected += 100;
|
||||
return n;
|
||||
};
|
||||
|
||||
auto client{foo->m_client};
|
||||
bool caught_thread_busy = false;
|
||||
// NOTE: '3' was chosen because it was the lowest number
|
||||
// of simultaneous calls required to reliably catch a "thread busy" error
|
||||
std::atomic<size_t> running{3};
|
||||
foo->m_context.loop->sync([&]
|
||||
{
|
||||
for (size_t i = 0; i < running; i++)
|
||||
{
|
||||
auto request{client.callFnAsyncRequest()};
|
||||
auto request{client.callIntFnAsyncRequest()};
|
||||
auto context{request.initContext()};
|
||||
context.setCallbackThread(*callback_thread);
|
||||
context.setThread(*request_thread);
|
||||
request.setArg(100 * (i+1));
|
||||
foo->m_context.loop->m_task_set->add(request.send().then(
|
||||
[&](auto&& results) {
|
||||
[&running, &tc, i](auto&& results) {
|
||||
assert(results.getResult() == static_cast<int32_t>(100 * (i+1)));
|
||||
running -= 1;
|
||||
tc.waiter->m_cv.notify_all();
|
||||
},
|
||||
[&](kj::Exception&& e) {
|
||||
KJ_EXPECT(std::string_view{e.getDescription().cStr()} ==
|
||||
"remote exception: std::exception: thread busy");
|
||||
caught_thread_busy = true;
|
||||
running -= 1;
|
||||
signal.set_value();
|
||||
tc.waiter->m_cv.notify_all();
|
||||
}
|
||||
));
|
||||
}));
|
||||
}
|
||||
});
|
||||
{
|
||||
Lock lock(tc.waiter->m_mutex);
|
||||
tc.waiter->wait(lock, [&running] { return running == 0; });
|
||||
}
|
||||
KJ_EXPECT(caught_thread_busy);
|
||||
KJ_EXPECT(expected == 400);
|
||||
}
|
||||
|
||||
} // namespace test
|
||||
|
||||
@ -5,6 +5,7 @@
|
||||
"""Test the IPC (multiprocess) interface."""
|
||||
import asyncio
|
||||
|
||||
from contextlib import ExitStack
|
||||
from test_framework.test_framework import BitcoinTestFramework
|
||||
from test_framework.util import assert_equal
|
||||
from test_framework.ipc_util import (
|
||||
@ -81,10 +82,97 @@ class IPCInterfaceTest(BitcoinTestFramework):
|
||||
assert_equal(e.type, "FAILED")
|
||||
asyncio.run(capnp.run(async_routine()))
|
||||
|
||||
def run_unclean_disconnect_test(self):
|
||||
"""Test behavior when disconnecting during an IPC call that later
|
||||
returns a non-null interface pointer. This used to cause a crash as
|
||||
reported https://github.com/bitcoin/bitcoin/issues/34250, but now just
|
||||
results in a cancellation log message"""
|
||||
node = self.nodes[0]
|
||||
self.log.info("Running disconnect during BlockTemplate.waitNext")
|
||||
timeout = self.rpc_timeout * 1000.0
|
||||
disconnected_log_check = ExitStack()
|
||||
|
||||
async def async_routine():
|
||||
ctx, init = await make_capnp_init_ctx(self)
|
||||
self.log.debug("Create Mining proxy object")
|
||||
mining = init.makeMining(ctx).result
|
||||
|
||||
self.log.debug("Create a template")
|
||||
opts = self.capnp_modules['mining'].BlockCreateOptions()
|
||||
template = (await mining.createNewBlock(ctx, opts)).result
|
||||
|
||||
self.log.debug("Wait for a new template")
|
||||
waitoptions = self.capnp_modules['mining'].BlockWaitOptions()
|
||||
waitoptions.timeout = timeout
|
||||
waitoptions.feeThreshold = 1
|
||||
with node.assert_debug_log(expected_msgs=["BlockTemplate.waitNext", "IPC server post request"], timeout=2):
|
||||
promise = template.waitNext(ctx, waitoptions)
|
||||
await asyncio.sleep(0.1)
|
||||
disconnected_log_check.enter_context(node.assert_debug_log(expected_msgs=["IPC server: socket disconnected", "canceled while executing"], timeout=2))
|
||||
del promise
|
||||
|
||||
asyncio.run(capnp.run(async_routine()))
|
||||
|
||||
# Wait for socket disconnected log message, then generate a block to
|
||||
# cause the waitNext() call to return a new template. Look for a
|
||||
# canceled IPC log message after waitNext returns.
|
||||
with node.assert_debug_log(expected_msgs=["interrupted (canceled)"], timeout=2):
|
||||
disconnected_log_check.close()
|
||||
self.generate(node, 1)
|
||||
|
||||
def run_thread_busy_test(self):
|
||||
"""Test behavior when sending multiple calls to the same server thread
|
||||
which used to cause a crash as reported
|
||||
https://github.com/bitcoin/bitcoin/issues/33923."""
|
||||
node = self.nodes[0]
|
||||
self.log.info("Running thread busy test")
|
||||
timeout = self.rpc_timeout * 1000.0
|
||||
|
||||
async def async_routine():
|
||||
ctx, init = await make_capnp_init_ctx(self)
|
||||
self.log.debug("Create Mining proxy object")
|
||||
mining = init.makeMining(ctx).result
|
||||
|
||||
self.log.debug("Create a template")
|
||||
opts = self.capnp_modules['mining'].BlockCreateOptions()
|
||||
template = (await mining.createNewBlock(ctx, opts)).result
|
||||
|
||||
self.log.debug("Wait for a new template")
|
||||
waitoptions = self.capnp_modules['mining'].BlockWaitOptions()
|
||||
waitoptions.timeout = timeout
|
||||
waitoptions.feeThreshold = 1
|
||||
|
||||
# Make multiple waitNext calls where the first will start to
|
||||
# execute, and the second and third will be posted waiting to
|
||||
# execute. Previously, the third call would fail calling
|
||||
# mp::Waiter::post() because the waiting function slot is occupied,
|
||||
# but now posts are queued.
|
||||
with node.assert_debug_log(expected_msgs=["BlockTemplate.waitNext", "IPC server post request"], timeout=2):
|
||||
promise1 = template.waitNext(ctx, waitoptions)
|
||||
await asyncio.sleep(0.1)
|
||||
with node.assert_debug_log(expected_msgs=["BlockTemplate.waitNext", "IPC server post request"], timeout=2):
|
||||
promise2 = template.waitNext(ctx, waitoptions)
|
||||
await asyncio.sleep(0.1)
|
||||
with node.assert_debug_log(expected_msgs=["BlockTemplate.waitNext", "IPC server post request"], timeout=2):
|
||||
promise3 = template.waitNext(ctx, waitoptions)
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
# Generate a new block to make the active waitNext calls return, then clean up.
|
||||
with node.assert_debug_log(expected_msgs=["IPC server send response"], timeout=2):
|
||||
self.generate(node, 1, sync_fun=self.no_op)
|
||||
await ((await promise1).result).destroy(ctx)
|
||||
await ((await promise2).result).destroy(ctx)
|
||||
await ((await promise3).result).destroy(ctx)
|
||||
await template.destroy(ctx)
|
||||
|
||||
asyncio.run(capnp.run(async_routine()))
|
||||
|
||||
def run_test(self):
|
||||
self.run_echo_test()
|
||||
self.run_mining_test()
|
||||
self.run_deprecated_mining_test()
|
||||
self.run_unclean_disconnect_test()
|
||||
self.run_thread_busy_test()
|
||||
|
||||
if __name__ == '__main__':
|
||||
IPCInterfaceTest(__file__).main()
|
||||
|
||||
@ -7,7 +7,7 @@ import asyncio
|
||||
import time
|
||||
from contextlib import AsyncExitStack
|
||||
from io import BytesIO
|
||||
import re
|
||||
import platform
|
||||
from test_framework.blocktools import NULL_OUTPOINT
|
||||
from test_framework.messages import (
|
||||
MAX_BLOCK_WEIGHT,
|
||||
@ -313,17 +313,15 @@ class IPCMiningTest(BitcoinTestFramework):
|
||||
await mining.createNewBlock(ctx, opts)
|
||||
raise AssertionError("createNewBlock unexpectedly succeeded")
|
||||
except capnp.lib.capnp.KjException as e:
|
||||
if e.type == "DISCONNECTED":
|
||||
# The remote exception isn't caught currently and leads to a
|
||||
# std::terminate call. Just detect and restart in this case.
|
||||
# This bug is fixed with
|
||||
# https://github.com/bitcoin-core/libmultiprocess/pull/218
|
||||
assert_equal(e.description, "Peer disconnected.")
|
||||
self.nodes[0].wait_until_stopped(expected_ret_code=(-11, -6, 1, 66), expected_stderr=re.compile(""))
|
||||
self.start_node(0)
|
||||
if e.description == "remote exception: unknown non-KJ exception of type: kj::Exception":
|
||||
# macOS + REDUCE_EXPORTS bug: Cap'n Proto fails to recognize
|
||||
# its own exception type and returns a generic error instead.
|
||||
# https://github.com/bitcoin/bitcoin/pull/34422#discussion_r2863852691
|
||||
# Assert this only occurs on Darwin until fixed.
|
||||
assert_equal(platform.system(), "Darwin")
|
||||
else:
|
||||
assert_equal(e.description, "remote exception: std::exception: block_reserved_weight (0) must be at least 2000 weight units")
|
||||
assert_equal(e.type, "FAILED")
|
||||
assert_equal(e.type, "FAILED")
|
||||
|
||||
asyncio.run(capnp.run(async_routine()))
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user