From 1fea3bae5cabc2cd3105bfc277f219454698b00a Mon Sep 17 00:00:00 2001 From: Ryan Ofsky Date: Wed, 14 Jan 2026 08:37:37 -0500 Subject: [PATCH 1/3] ipc, test: Add tests for unclean disconnect and thread busy behavior Upcoming libmultiprocess changes are expected to alter this behavior (https://github.com/bitcoin/bitcoin/issues/34250#issuecomment-3749243782), making test coverage useful for documenting current behavior and validating the intended changes. --- test/functional/interface_ipc.py | 97 ++++++++++++++++++++++++++++++++ 1 file changed, 97 insertions(+) diff --git a/test/functional/interface_ipc.py b/test/functional/interface_ipc.py index 2fc4497a426..f90f72b0ece 100755 --- a/test/functional/interface_ipc.py +++ b/test/functional/interface_ipc.py @@ -4,7 +4,10 @@ # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Test the IPC (multiprocess) interface.""" import asyncio +import http.client +import re +from contextlib import ExitStack from test_framework.test_framework import BitcoinTestFramework from test_framework.util import assert_equal from test_framework.ipc_util import ( @@ -81,10 +84,104 @@ class IPCInterfaceTest(BitcoinTestFramework): assert_equal(e.type, "FAILED") asyncio.run(capnp.run(async_routine())) + def run_unclean_disconnect_test(self): + """Test behavior when disconnecting during an IPC call that later + returns a non-null interface pointer. Currently this behavior causes a + crash as reported https://github.com/bitcoin/bitcoin/issues/34250, but a + followup will change this behavior.""" + node = self.nodes[0] + self.log.info("Running disconnect during BlockTemplate.waitNext") + timeout = self.rpc_timeout * 1000.0 + disconnected_log_check = ExitStack() + + async def async_routine(): + ctx, init = await make_capnp_init_ctx(self) + self.log.debug("Create Mining proxy object") + mining = init.makeMining(ctx).result + + self.log.debug("Create a template") + opts = self.capnp_modules['mining'].BlockCreateOptions() + template = (await mining.createNewBlock(ctx, opts)).result + + self.log.debug("Wait for a new template") + waitoptions = self.capnp_modules['mining'].BlockWaitOptions() + waitoptions.timeout = timeout + waitoptions.feeThreshold = 1 + with node.assert_debug_log(expected_msgs=["BlockTemplate.waitNext", "IPC server post request"], timeout=2): + promise = template.waitNext(ctx, waitoptions) + await asyncio.sleep(0.1) + disconnected_log_check.enter_context(node.assert_debug_log(expected_msgs=["IPC server: socket disconnected"], timeout=2)) + del promise + + asyncio.run(capnp.run(async_routine())) + + # Wait for socket disconnected log message, then generate a block to + # cause the waitNext() call to return a new template. This will cause a + # crash and disconnect with error output. + disconnected_log_check.close() + try: + self.generate(node, 1) + except (http.client.RemoteDisconnected, BrokenPipeError, ConnectionResetError): + pass + node.wait_until_stopped(expected_ret_code=(-11, -6, 1, 66), expected_stderr=re.compile("")) + self.start_node(0) + + def run_thread_busy_test(self): + """Test behavior when sending multiple calls to the same server thread + which used to cause a crash as reported + https://github.com/bitcoin/bitcoin/issues/33923 and currently causes a + thread busy error. A future change will make this just queue the calls + for execution and not trigger any error""" + node = self.nodes[0] + self.log.info("Running thread busy test") + timeout = self.rpc_timeout * 1000.0 + + async def async_routine(): + ctx, init = await make_capnp_init_ctx(self) + self.log.debug("Create Mining proxy object") + mining = init.makeMining(ctx).result + + self.log.debug("Create a template") + opts = self.capnp_modules['mining'].BlockCreateOptions() + template = (await mining.createNewBlock(ctx, opts)).result + + self.log.debug("Wait for a new template") + waitoptions = self.capnp_modules['mining'].BlockWaitOptions() + waitoptions.timeout = timeout + waitoptions.feeThreshold = 1 + + # Make multiple waitNext calls where the first will start to + # execute, the second will be posted waiting to execute, and the + # third will fail to execute because the execution thread is busy. + with node.assert_debug_log(expected_msgs=["BlockTemplate.waitNext", "IPC server post request"], timeout=2): + promise1 = template.waitNext(ctx, waitoptions) + await asyncio.sleep(0.1) + with node.assert_debug_log(expected_msgs=["BlockTemplate.waitNext", "IPC server post request"], timeout=2): + promise2 = template.waitNext(ctx, waitoptions) + await asyncio.sleep(0.1) + try: + await template.waitNext(ctx, waitoptions) + except capnp.lib.capnp.KjException as e: + assert_equal(e.description, "remote exception: std::exception: thread busy") + assert_equal(e.type, "FAILED") + else: + raise AssertionError("Expected thread busy exception") + + # Generate a new block to make the active waitNext calls return, then clean up. + with node.assert_debug_log(expected_msgs=["IPC server send response"], timeout=2): + self.generate(node, 1, sync_fun=self.no_op) + await ((await promise1).result).destroy(ctx) + await ((await promise2).result).destroy(ctx) + await template.destroy(ctx) + + asyncio.run(capnp.run(async_routine())) + def run_test(self): self.run_echo_test() self.run_mining_test() self.run_deprecated_mining_test() + self.run_unclean_disconnect_test() + self.run_thread_busy_test() if __name__ == '__main__': IPCInterfaceTest(__file__).main() From b7ca3bf061b51108d155283c1ad503c0af7eab0d Mon Sep 17 00:00:00 2001 From: Ryan Ofsky Date: Wed, 25 Feb 2026 11:08:49 -0500 Subject: [PATCH 2/3] Squashed 'src/ipc/libmultiprocess/' changes from 1fc65008f7d..1868a84451f 1868a84451f Merge bitcoin-core/libmultiprocess#245: type-context.h: Extent cancel_mutex lock to prevent theoretical race fd4a90d3103 Merge bitcoin-core/libmultiprocess#244: ci: suppress two tidy lint issues 16dfc368640 ci: avoid bugprone-unused-return-value lint in test dacd5eda464 ci: suppress nontrivial-threadlocal lint in proxy.cpp ef96a5b2be2 doc: Comment cleanups after #240 e0f1cd76219 type-context.h: Extent cancel_mutex lock to prevent theoretical race 290702c74ce Merge bitcoin-core/libmultiprocess#240: Avoid errors from asynchronous (non-c++) clients 3a69d4755af Merge bitcoin-core/libmultiprocess#241: doc: Bump version number v7 -> v8 0174450ca2e Prevent crash on unclean disconnect if abandoned IPC call returns interface pointer ddb5f74196f Allow simultaneous calls on same Context.thread c4762c7b513 refactor: Add ProxyServer::post() method 0ade1b40ac5 doc: Bump version number git-subtree-dir: src/ipc/libmultiprocess git-subtree-split: 1868a84451fe1b6a00116375a5f717230bb2533e --- doc/versions.md | 35 ++++++++---- include/mp/proxy-io.h | 110 ++++++++++++++++++++++++++++++++++++-- include/mp/proxy-types.h | 33 ++++++++++-- include/mp/proxy.h | 1 + include/mp/type-context.h | 108 ++++++++++++++++++++++++++----------- include/mp/util.h | 96 +++++++++++++++++++++++++++++++++ include/mp/version.h | 26 +++++---- src/mp/gen.cpp | 1 + src/mp/proxy.cpp | 12 +++-- test/mp/test/foo.capnp | 1 + test/mp/test/foo.h | 2 + test/mp/test/test.cpp | 47 ++++++---------- 12 files changed, 381 insertions(+), 91 deletions(-) diff --git a/doc/versions.md b/doc/versions.md index 8623fcdb4fd..9a7b12238f0 100644 --- a/doc/versions.md +++ b/doc/versions.md @@ -7,40 +7,55 @@ Library versions are tracked with simple Versioning policy is described in the [version.h](../include/mp/version.h) include. -## v7 +## v8 - Current unstable version in master branch. -- Intended to be compatible with Bitcoin Core 30.1 and future releases. + +## v7.0 +- Adds SpawnProcess race fix, cmake `target_capnp_sources` option, ci and documentation improvements. +- Used in Bitcoin Core master branch, pulled in by [#34363](https://github.com/bitcoin/bitcoin/pull/34363). + +## v7.0-pre1 + +- Adds support for log levels to reduce logging and "thread busy" error to avoid a crash on misuse. Fixes intermittent mptest hang and makes other minor improvements. +- Used in Bitcoin Core 30.1 and 30.2 releases and 30.x branch. Pulled in by [#33412](https://github.com/bitcoin/bitcoin/pull/33412), [#33518](https://github.com/bitcoin/bitcoin/pull/33518), and [#33519](https://github.com/bitcoin/bitcoin/pull/33519). ## v6.0 -- `EventLoop::addClient` and `EventLoop::removeClient` methods dropped, + +- Adds fixes for unclean shutdowns and thread sanitizer issues and adds CI scripts. +- Drops `EventLoop::addClient` and `EventLoop::removeClient` methods, requiring clients to use new `EventLoopRef` class instead. -- Compatible with Bitcoin Core 30.0 release. +- Used in Bitcoin Core 30.0 release. Pulled in by [#31741](https://github.com/bitcoin/bitcoin/pull/31741), [#32641](https://github.com/bitcoin/bitcoin/pull/32641), [#32345](https://github.com/bitcoin/bitcoin/pull/32345), [#33241](https://github.com/bitcoin/bitcoin/pull/33241), and [#33322](https://github.com/bitcoin/bitcoin/pull/33322). ## v5.0 +- Adds build improvements and tidy/warning fixes. +- Used in Bitcoin Core 29 releases, pulled in by [#31945](https://github.com/bitcoin/bitcoin/pull/31945). + +## v5.0-pre1 +- Adds many improvements to Bitcoin Core mining interface: splitting up type headers, fixing shutdown bugs, adding subtree build support. - Broke up `proxy-types.h` into `type-*.h` files requiring clients to explicitly include overloads needed for C++ ↔️ Cap'n Proto type conversions. - Now requires C++ 20 support. -- Compatible with Bitcoin Core 29 releases. +- Minimum required version for Bitcoin Core 29 releases, pulled in by [#30509](https://github.com/bitcoin/bitcoin/pull/30509), [#30510](https://github.com/bitcoin/bitcoin/pull/30510), [#31105](https://github.com/bitcoin/bitcoin/pull/31105), [#31740](https://github.com/bitcoin/bitcoin/pull/31740). ## v4.0 - Added better cmake support, installing cmake package files so clients do not need to use pkgconfig. -- Compatible with Bitcoin Core 28 releases. +- Used in Bitcoin Core 28 releases, pulled in by [#30490](https://github.com/bitcoin/bitcoin/pull/30490) and [#30513](https://github.com/bitcoin/bitcoin/pull/30513). ## v3.0 - Dropped compatibility with Cap'n Proto versions before 0.7. -- Compatible with Bitcoin Core 27 releases. +- Used in Bitcoin Core 27 releases, pulled in by [#28735](https://github.com/bitcoin/bitcoin/pull/28735), [#28907](https://github.com/bitcoin/bitcoin/pull/28907), and [#29276](https://github.com/bitcoin/bitcoin/pull/29276). ## v2.0 - Changed `PassField` function signature. - Now requires C++17 support. -- Compatible with Bitcoin Core 25 and 26 releases. +- Used in Bitcoin Core 25 and 26 releases, pulled in by [#26672](https://github.com/bitcoin/bitcoin/pull/26672). ## v1.0 - Dropped hardcoded includes in generated files, now requiring `include` and `includeTypes` annotations. -- Compatible with Bitcoin Core 22, 23, and 24 releases. +- Used in Bitcoin Core 22, 23, and 24 releases, pulled in by [#19160](https://github.com/bitcoin/bitcoin/pull/19160). ## v0.0 - Initial version used in a downstream release. -- Compatible with Bitcoin Core 21 releases. +- Used in Bitcoin Core 21 releases, pulled in by [#16367](https://github.com/bitcoin/bitcoin/pull/16367), [#18588](https://github.com/bitcoin/bitcoin/pull/18588), and [#18677](https://github.com/bitcoin/bitcoin/pull/18677). diff --git a/include/mp/proxy-io.h b/include/mp/proxy-io.h index b3c67a3206f..c298257c4da 100644 --- a/include/mp/proxy-io.h +++ b/include/mp/proxy-io.h @@ -48,6 +48,19 @@ struct ServerInvokeContext : InvokeContext ProxyServer& proxy_server; CallContext& call_context; int req; + //! For IPC methods that execute asynchronously, not on the event-loop + //! thread: lock preventing the event-loop thread from freeing the params or + //! results structs if the request is canceled while the worker thread is + //! reading params (`call_context.getParams()`) or writing results + //! (`call_context.getResults()`). + Lock* cancel_lock{nullptr}; + //! For IPC methods that execute asynchronously, not on the event-loop + //! thread, this is set to true if the IPC call was canceled by the client + //! or canceled by a disconnection. If the call runs on the event-loop + //! thread, it can't be canceled. This should be accessed with cancel_lock + //! held if it is not null, since in the asynchronous case it is accessed + //! from multiple threads. + bool request_canceled{false}; ServerInvokeContext(ProxyServer& proxy_server, CallContext& call_context, int req) : InvokeContext{*proxy_server.m_context.connection}, proxy_server{proxy_server}, call_context{call_context}, req{req} @@ -82,11 +95,23 @@ template <> struct ProxyServer final : public Thread::Server { public: - ProxyServer(ThreadContext& thread_context, std::thread&& thread); + ProxyServer(Connection& connection, ThreadContext& thread_context, std::thread&& thread); ~ProxyServer(); kj::Promise getName(GetNameContext context) override; + + //! Run a callback function fn returning T on this thread. The function will + //! be queued and executed as soon as the thread is idle, and when fn + //! returns, the promise returned by this method will be fulfilled with the + //! value fn returned. + template + kj::Promise post(Fn&& fn); + + EventLoopRef m_loop; ThreadContext& m_thread_context; std::thread m_thread; + //! Promise signaled when m_thread_context.waiter is ready and there is no + //! post() callback function waiting to execute. + kj::Promise m_thread_ready{kj::READY_NOW}; }; //! Handler for kj::TaskSet failed task events. @@ -322,7 +347,12 @@ public: //! thread is blocked waiting for server response, this is what allows the //! client to run the request in the same thread, the same way code would run in a //! single process, with the callback sharing the same thread stack as the original -//! call.) +//! call.) To support this, the clientInvoke function calls Waiter::wait() to +//! block the client IPC thread while initial request is in progress. Then if +//! there is a callback, it is executed with Waiter::post(). +//! +//! The Waiter class is also used server-side by `ProxyServer::post()` +//! to execute IPC calls on worker threads. struct Waiter { Waiter() = default; @@ -404,11 +434,11 @@ public: template void onDisconnect(F&& f) { - // Add disconnect handler to local TaskSet to ensure it is cancelled and + // Add disconnect handler to local TaskSet to ensure it is canceled and // will never run after connection object is destroyed. But when disconnect // handler fires, do not call the function f right away, instead add it // to the EventLoop TaskSet to avoid "Promise callback destroyed itself" - // error in cases where f deletes this Connection object. + // error in the typical case where f deletes this Connection object. m_on_disconnect.add(m_network.onDisconnect().then( [f = std::forward(f), this]() mutable { m_loop->m_task_set->add(kj::evalLater(kj::mv(f))); })); } @@ -416,6 +446,9 @@ public: EventLoopRef m_loop; kj::Own m_stream; LoggingErrorHandler m_error_handler{*m_loop}; + //! TaskSet used to cancel the m_network.onDisconnect() handler for remote + //! disconnections, if the connection is closed locally first by deleting + //! this Connection object. kj::TaskSet m_on_disconnect{m_error_handler}; ::capnp::TwoPartyVatNetwork m_network; std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId>> m_rpc_system; @@ -428,6 +461,11 @@ public: //! ThreadMap.makeThread) used to service requests to clients. ::capnp::CapabilityServerSet m_threads; + //! Canceler for canceling promises that we want to discard when the + //! connection is destroyed. This is used to interrupt method calls that are + //! still executing at time of disconnection. + kj::Canceler m_canceler; + //! Cleanup functions to run if connection is broken unexpectedly. List //! will be empty if all ProxyClient are destroyed cleanly before the //! connection is destroyed. @@ -675,6 +713,70 @@ struct ThreadContext bool loop_thread = false; }; +template +kj::Promise ProxyServer::post(Fn&& fn) +{ + auto ready = kj::newPromiseAndFulfiller(); // Signaled when waiter is ready to post again. + auto cancel_monitor_ptr = kj::heap(); + CancelMonitor& cancel_monitor = *cancel_monitor_ptr; + // Keep a reference to the ProxyServer instance by assigning it to + // the self variable. ProxyServer instances are reference-counted and if the + // client drops its reference, this variable keeps the instance alive until + // the thread finishes executing. The self variable needs to be destroyed on + // the event loop thread so it is freed in a sync() call below. + auto self = thisCap(); + auto ret = m_thread_ready.then([this, self = std::move(self), fn = std::forward(fn), ready_fulfiller = kj::mv(ready.fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable { + auto result = kj::newPromiseAndFulfiller(); // Signaled when fn() is called, with its return value. + bool posted = m_thread_context.waiter->post([this, self = std::move(self), fn = std::forward(fn), ready_fulfiller = kj::mv(ready_fulfiller), result_fulfiller = kj::mv(result.fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable { + // Fulfill ready.promise now, as soon as the Waiter starts executing + // this lambda, so the next ProxyServer::post() call can + // immediately call waiter->post(). It is important to do this + // before calling fn() because fn() can make an IPC call back to the + // client, which can make another IPC call to this server thread. + // (This typically happens when IPC methods take std::function + // parameters.) When this happens the second call to the server + // thread should not be blocked waiting for the first call. + m_loop->sync([ready_fulfiller = kj::mv(ready_fulfiller)]() mutable { + ready_fulfiller->fulfill(); + ready_fulfiller = nullptr; + }); + std::optional result_value; + kj::Maybe exception{kj::runCatchingExceptions([&]{ result_value.emplace(fn(*cancel_monitor_ptr)); })}; + m_loop->sync([this, &result_value, &exception, self = kj::mv(self), result_fulfiller = kj::mv(result_fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable { + // Destroy CancelMonitor here before fulfilling or rejecting the + // promise so it doesn't get triggered when the promise is + // destroyed. + cancel_monitor_ptr = nullptr; + // Send results to the fulfiller. Technically it would be ok to + // skip this if promise was canceled, but it's simpler to just + // do it unconditionally. + KJ_IF_MAYBE(e, exception) { + assert(!result_value); + result_fulfiller->reject(kj::mv(*e)); + } else { + assert(result_value); + result_fulfiller->fulfill(kj::mv(*result_value)); + result_value.reset(); + } + result_fulfiller = nullptr; + // Use evalLater to destroy the ProxyServer self + // reference, if it is the last reference, because the + // ProxyServer destructor needs to join the thread, + // which can't happen until this sync() block has exited. + m_loop->m_task_set->add(kj::evalLater([self = kj::mv(self)] {})); + }); + }); + // Assert that calling Waiter::post did not fail. It could only return + // false if a new function was posted before the previous one finished + // executing, but new functions are only posted when m_thread_ready is + // signaled, so this should never happen. + assert(posted); + return kj::mv(result.promise); + }).attach(kj::heap(cancel_monitor)); + m_thread_ready = kj::mv(ready.promise); + return ret; +} + //! Given stream file descriptor, make a new ProxyClient object to send requests //! over the stream. Also create a new Connection object embedded in the //! client that is freed when the client is closed. diff --git a/include/mp/proxy-types.h b/include/mp/proxy-types.h index 7301aa59f4a..5d38048498f 100644 --- a/include/mp/proxy-types.h +++ b/include/mp/proxy-types.h @@ -445,9 +445,36 @@ struct ServerCall template decltype(auto) invoke(ServerContext& server_context, TypeList<>, Args&&... args) const { - return ProxyServerMethodTraits::invoke( - server_context, - std::forward(args)...); + // If cancel_lock is set, release it while executing the method, and + // reacquire it afterwards. The lock is needed to prevent params and + // response structs from being deleted by the event loop thread if the + // request is canceled, so it is only needed before and after method + // execution. It is important to release the lock during execution + // because the method can take arbitrarily long to return and the event + // loop will need the lock itself in on_cancel if the call is canceled. + if (server_context.cancel_lock) server_context.cancel_lock->m_lock.unlock(); + return TryFinally( + [&]() -> decltype(auto) { + return ProxyServerMethodTraits< + typename decltype(server_context.call_context.getParams())::Reads + >::invoke(server_context, std::forward(args)...); + }, + [&] { + if (server_context.cancel_lock) server_context.cancel_lock->m_lock.lock(); + // If the IPC request was canceled, throw InterruptException + // because there is no point continuing and trying to fill the + // call_context.getResults() struct. It's also important to stop + // executing because the connection may have been destroyed as + // described in https://github.com/bitcoin/bitcoin/issues/34250 + // and there could be invalid references to the destroyed + // Connection object if this continued. + // If the IPC method itself threw an exception, the + // InterruptException thrown below will take precedence over it. + // Since the call has been canceled that exception can't be + // returned to the caller, so it needs to be discarded like + // other result values. + if (server_context.request_canceled) throw InterruptException{"canceled"}; + }); } }; diff --git a/include/mp/proxy.h b/include/mp/proxy.h index fff511fde06..c55380c1c9c 100644 --- a/include/mp/proxy.h +++ b/include/mp/proxy.h @@ -153,6 +153,7 @@ public: ProxyServerBase(std::shared_ptr impl, Connection& connection); virtual ~ProxyServerBase(); void invokeDestroy(); + using Interface_::Server::thisCap; /** * Implementation pointer that may or may not be owned and deleted when this diff --git a/include/mp/type-context.h b/include/mp/type-context.h index 09ac1790a73..72c39630d00 100644 --- a/include/mp/type-context.h +++ b/include/mp/type-context.h @@ -8,6 +8,8 @@ #include #include +#include + namespace mp { template void CustomBuildField(TypeList<>, @@ -26,7 +28,7 @@ void CustomBuildField(TypeList<>, // future calls over this connection can reuse it. auto [callback_thread, _]{SetThread( GuardedRef{thread_context.waiter->m_mutex, thread_context.callback_threads}, &connection, - [&] { return connection.m_threads.add(kj::heap>(thread_context, std::thread{})); })}; + [&] { return connection.m_threads.add(kj::heap>(connection, thread_context, std::thread{})); })}; // Call remote ThreadMap.makeThread function so server will create a // dedicated worker thread to run function calls from this thread. Store the @@ -61,11 +63,17 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& { const auto& params = server_context.call_context.getParams(); Context::Reader context_arg = Accessor::get(params); - auto future = kj::newPromiseAndFulfiller(); auto& server = server_context.proxy_server; int req = server_context.req; - auto invoke = [fulfiller = kj::mv(future.fulfiller), - call_context = kj::mv(server_context.call_context), &server, req, fn, args...]() mutable { + // Keep a reference to the ProxyServer instance by assigning it to the self + // variable. ProxyServer instances are reference-counted and if the client + // drops its reference and the IPC call is canceled, this variable keeps the + // instance alive until the method finishes executing. The self variable + // needs to be destroyed on the event loop thread so it is freed in a sync() + // call below. + auto self = server.thisCap(); + auto invoke = [self = kj::mv(self), call_context = kj::mv(server_context.call_context), &server, req, fn, args...](CancelMonitor& cancel_monitor) mutable { + MP_LOG(*server.m_context.loop, Log::Debug) << "IPC server executing request #" << req; const auto& params = call_context.getParams(); Context::Reader context_arg = Accessor::get(params); ServerContext server_context{server, call_context, req}; @@ -90,8 +98,35 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& auto& thread_context = g_thread_context; auto& request_threads = thread_context.request_threads; ConnThread request_thread; - bool inserted; + bool inserted{false}; + Mutex cancel_mutex; + Lock cancel_lock{cancel_mutex}; + server_context.cancel_lock = &cancel_lock; server.m_context.loop->sync([&] { + // Detect request being canceled before it executes. + if (cancel_monitor.m_canceled) { + server_context.request_canceled = true; + return; + } + // Detect request being canceled while it executes. + assert(!cancel_monitor.m_on_cancel); + cancel_monitor.m_on_cancel = [&server, &server_context, &cancel_mutex, req]() { + MP_LOG(*server.m_context.loop, Log::Info) << "IPC server request #" << req << " canceled while executing."; + // Lock cancel_mutex here to block the event loop + // thread and prevent it from deleting the request's + // params and response structs while the execution + // thread is accessing them. Because this lock is + // released before the event loop thread does delete + // the structs, the mutex does not provide any + // protection from the event loop deleting the + // structs _before_ the execution thread acquires + // it. So in addition to locking the mutex, the + // execution thread always checks request_canceled + // as well before accessing the structs. + Lock cancel_lock{cancel_mutex}; + server_context.request_canceled = true; + }; + // Update requests_threads map if not canceled. std::tie(request_thread, inserted) = SetThread( GuardedRef{thread_context.waiter->m_mutex, request_threads}, server.m_context.connection, [&] { return context_arg.getCallbackThread(); }); @@ -103,13 +138,23 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& // recursive call (IPC call calling back to the caller which // makes another IPC call), so avoid modifying the map. const bool erase_thread{inserted}; - KJ_DEFER(if (erase_thread) { + KJ_DEFER( + // Release the cancel lock before calling loop->sync and + // waiting for the event loop thread, because if a + // cancellation happened, it needs to run the on_cancel + // callback above. It's safe to release cancel_lock at + // this point because the fn.invoke() call below will be + // finished and no longer accessing the params or + // results structs. + cancel_lock.m_lock.unlock(); // Erase the request_threads entry on the event loop // thread with loop->sync(), so if the connection is // broken there is not a race between this thread and // the disconnect handler trying to destroy the thread // client object. server.m_context.loop->sync([&] { + auto self_dispose{kj::mv(self)}; + if (erase_thread) { // Look up the thread again without using existing // iterator since entry may no longer be there after // a disconnect. Destroy node after releasing @@ -121,51 +166,52 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& Lock lock(thread_context.waiter->m_mutex); removed = request_threads.extract(server.m_context.connection); } + } }); - }); - fn.invoke(server_context, args...); - } - KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() { - server.m_context.loop->sync([&] { - auto fulfiller_dispose = kj::mv(fulfiller); - fulfiller_dispose->fulfill(kj::mv(call_context)); - }); - })) - { - server.m_context.loop->sync([&]() { - auto fulfiller_dispose = kj::mv(fulfiller); - fulfiller_dispose->reject(kj::mv(*exception)); - }); + ); + if (server_context.request_canceled) { + MP_LOG(*server.m_context.loop, Log::Info) << "IPC server request #" << req << " canceled before it could be executed"; + } else KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]{ + try { + fn.invoke(server_context, args...); + } catch (const InterruptException& e) { + MP_LOG(*server.m_context.loop, Log::Info) << "IPC server request #" << req << " interrupted (" << e.what() << ")"; + } + })) { + MP_LOG(*server.m_context.loop, Log::Error) << "IPC server request #" << req << " uncaught exception (" << kj::str(*exception).cStr() << ")"; + throw kj::mv(*exception); + } + // End of scope: if KJ_DEFER was reached, it runs here } + return call_context; }; // Lookup Thread object specified by the client. The specified thread should // be a local Thread::Server object, but it needs to be looked up // asynchronously with getLocalServer(). auto thread_client = context_arg.getThread(); - return server.m_context.connection->m_threads.getLocalServer(thread_client) + auto result = server.m_context.connection->m_threads.getLocalServer(thread_client) .then([&server, invoke = kj::mv(invoke), req](const kj::Maybe& perhaps) mutable { // Assuming the thread object is found, pass it a pointer to the // `invoke` lambda above which will invoke the function on that // thread. KJ_IF_MAYBE (thread_server, perhaps) { - const auto& thread = static_cast&>(*thread_server); + auto& thread = static_cast&>(*thread_server); MP_LOG(*server.m_context.loop, Log::Debug) << "IPC server post request #" << req << " {" << thread.m_thread_context.thread_name << "}"; - if (!thread.m_thread_context.waiter->post(std::move(invoke))) { - MP_LOG(*server.m_context.loop, Log::Error) - << "IPC server error request #" << req - << " {" << thread.m_thread_context.thread_name << "}" << ", thread busy"; - throw std::runtime_error("thread busy"); - } + return thread.template post(std::move(invoke)); } else { MP_LOG(*server.m_context.loop, Log::Error) << "IPC server error request #" << req << ", missing thread to execute request"; throw std::runtime_error("invalid thread handle"); } - }) - // Wait for the invocation to finish before returning to the caller. - .then([invoke_wait = kj::mv(future.promise)]() mutable { return kj::mv(invoke_wait); }); + }); + // Use connection m_canceler object to cancel the result promise if the + // connection is destroyed. (By default Cap'n Proto does not cancel requests + // on disconnect, since it's possible clients might want to make requests + // and immediately disconnect without waiting for results, but not want the + // requests to be canceled.) + return server.m_context.connection->m_canceler.wrap(kj::mv(result)); } } // namespace mp diff --git a/include/mp/util.h b/include/mp/util.h index 6cd11fda2e3..a3db1282bbc 100644 --- a/include/mp/util.h +++ b/include/mp/util.h @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -209,6 +210,38 @@ void Unlock(Lock& lock, Callback&& callback) callback(); } +//! Invoke a function and run a follow-up action before returning the original +//! result. +//! +//! This can be used similarly to KJ_DEFER to run cleanup code, but works better +//! if the cleanup function can throw because it avoids clang bug +//! https://github.com/llvm/llvm-project/issues/12658 which skips calling +//! destructors in that case and can lead to memory leaks. Also, if both +//! functions throw, this lets one exception take precedence instead of +//! terminating due to having two active exceptions. +template +decltype(auto) TryFinally(Fn&& fn, After&& after) +{ + bool success{false}; + using R = std::invoke_result_t; + try { + if constexpr (std::is_void_v) { + std::forward(fn)(); + success = true; + std::forward(after)(); + return; + } else { + decltype(auto) result = std::forward(fn)(); + success = true; + std::forward(after)(); + return result; + } + } catch (...) { + if (!success) std::forward(after)(); + throw; + } +} + //! Format current thread name as "{exe_name}-{$pid}/{thread_name}-{$tid}". std::string ThreadName(const char* exe_name); @@ -241,6 +274,69 @@ inline char* CharCast(unsigned char* c) { return (char*)c; } inline const char* CharCast(const char* c) { return c; } inline const char* CharCast(const unsigned char* c) { return (const char*)c; } +//! Exception thrown from code executing an IPC call that is interrupted. +struct InterruptException final : std::exception { + explicit InterruptException(std::string message) : m_message(std::move(message)) {} + const char* what() const noexcept override { return m_message.c_str(); } + std::string m_message; +}; + +class CancelProbe; + +//! Helper class that detects when a promise is canceled. Used to detect +//! canceled requests and prevent potential crashes on unclean disconnects. +//! +//! In the future, this could also be used to support a way for wrapped C++ +//! methods to detect cancellation (like approach #4 in +//! https://github.com/bitcoin/bitcoin/issues/33575). +class CancelMonitor +{ +public: + inline ~CancelMonitor(); + inline void promiseDestroyed(CancelProbe& probe); + + bool m_canceled{false}; + std::function m_on_cancel; + CancelProbe* m_probe{nullptr}; +}; + +//! Helper object to attach to a promise and update a CancelMonitor. +class CancelProbe +{ +public: + CancelProbe(CancelMonitor& monitor) : m_monitor(&monitor) + { + assert(!monitor.m_probe); + monitor.m_probe = this; + } + ~CancelProbe() + { + if (m_monitor) m_monitor->promiseDestroyed(*this); + } + CancelMonitor* m_monitor; +}; + +CancelMonitor::~CancelMonitor() +{ + if (m_probe) { + assert(m_probe->m_monitor == this); + m_probe->m_monitor = nullptr; + m_probe = nullptr; + } +} + +void CancelMonitor::promiseDestroyed(CancelProbe& probe) +{ + // If promise is being destroyed, assume the promise has been canceled. In + // theory this method could be called when a promise was fulfilled or + // rejected rather than canceled, but it's safe to assume that's not the + // case because the CancelMonitor class is meant to be used inside code + // fulfilling or rejecting the promise and destroyed before doing so. + assert(m_probe == &probe); + m_canceled = true; + if (m_on_cancel) m_on_cancel(); + m_probe = nullptr; +} } // namespace mp #endif // MP_UTIL_H diff --git a/include/mp/version.h b/include/mp/version.h index 01583ad95a9..a6b0096feb8 100644 --- a/include/mp/version.h +++ b/include/mp/version.h @@ -5,22 +5,30 @@ #ifndef MP_VERSION_H #define MP_VERSION_H -//! Major version number. Should be incremented in the master branch before -//! changes that introduce major new features or break API compatibility, if -//! there are clients relying on the previous API. (If an API changes multiple -//! times and nothing uses the intermediate changes, it is sufficient to -//! increment this only once before the first change.) +//! @file +//! @brief Major and minor version numbers +//! +//! Versioning uses a cruder form of SemVer where the major number is +//! incremented with all significant changes, regardless of whether they are +//! backward compatible, and the minor number is treated like a patch level and +//! only incremented when a fix or backport is applied to an old branch. + +//! Major version number. Should be incremented after any release or external +//! usage of the library (like a subtree update) so the previous number +//! identifies that release. Should also be incremented before any change that +//! breaks backward compatibility or introduces nontrivial features, so +//! downstream code can use it to detect compatibility. //! //! Each time this is incremented, a new stable branch should be created. E.g. //! when this is incremented to 8, a "v7" stable branch should be created //! pointing at the prior merge commit. The /doc/versions.md file should also be //! updated, noting any significant or incompatible changes made since the -//! previous version, and backported to the stable branch before it is tagged. -#define MP_MAJOR_VERSION 7 +//! previous version. +#define MP_MAJOR_VERSION 8 //! Minor version number. Should be incremented in stable branches after -//! backporting changes. The /doc/versions.md file in the master and stable -//! branches should also be updated to list the new minor version. +//! backporting changes. The /doc/versions.md file should also be updated to +//! list the new minor version. #define MP_MINOR_VERSION 0 #endif // MP_VERSION_H diff --git a/src/mp/gen.cpp b/src/mp/gen.cpp index cedb4dc42f9..53d3fd36efa 100644 --- a/src/mp/gen.cpp +++ b/src/mp/gen.cpp @@ -211,6 +211,7 @@ static void Generate(kj::StringPtr src_prefix, cpp_server << "#include \n"; cpp_server << "#include \n"; cpp_server << "#include \n"; + cpp_server << "#include \n"; cpp_server << "#include \n"; cpp_server << "#include \n"; cpp_server << "#include <" << PROXY_TYPES << ">\n"; diff --git a/src/mp/proxy.cpp b/src/mp/proxy.cpp index 9d81284ea4d..da22ae62c95 100644 --- a/src/mp/proxy.cpp +++ b/src/mp/proxy.cpp @@ -38,7 +38,7 @@ namespace mp { -thread_local ThreadContext g_thread_context; +thread_local ThreadContext g_thread_context; // NOLINT(bitcoin-nontrivial-threadlocal) void LoggingErrorHandler::taskFailed(kj::Exception&& exception) { @@ -87,6 +87,10 @@ Connection::~Connection() // event loop thread, and if there was a remote disconnect, this is called // by an onDisconnect callback directly from the event loop thread. assert(std::this_thread::get_id() == m_loop->m_thread_id); + + // Try to cancel any calls that may be executing. + m_canceler.cancel("Interrupted by disconnect"); + // Shut down RPC system first, since this will garbage collect any // ProxyServer objects that were not freed before the connection was closed. // Typically all ProxyServer objects associated with this connection will be @@ -362,8 +366,8 @@ ProxyClient::~ProxyClient() } } -ProxyServer::ProxyServer(ThreadContext& thread_context, std::thread&& thread) - : m_thread_context(thread_context), m_thread(std::move(thread)) +ProxyServer::ProxyServer(Connection& connection, ThreadContext& thread_context, std::thread&& thread) + : m_loop{*connection.m_loop}, m_thread_context(thread_context), m_thread(std::move(thread)) { assert(m_thread_context.waiter.get() != nullptr); } @@ -418,7 +422,7 @@ kj::Promise ProxyServer::makeThread(MakeThreadContext context) // is just waiter getting set to null.) g_thread_context.waiter->wait(lock, [] { return !g_thread_context.waiter; }); }); - auto thread_server = kj::heap>(*thread_context.get_future().get(), std::move(thread)); + auto thread_server = kj::heap>(m_connection, *thread_context.get_future().get(), std::move(thread)); auto thread_client = m_connection.m_threads.add(kj::mv(thread_server)); context.getResults().setResult(kj::mv(thread_client)); return kj::READY_NOW; diff --git a/test/mp/test/foo.capnp b/test/mp/test/foo.capnp index 67bc8e4fed3..5bdb5754bde 100644 --- a/test/mp/test/foo.capnp +++ b/test/mp/test/foo.capnp @@ -33,6 +33,7 @@ interface FooInterface $Proxy.wrap("mp::test::FooImplementation") { passFn @16 (context :Proxy.Context, fn :FooFn) -> (result :Int32); callFn @17 () -> (); callFnAsync @18 (context :Proxy.Context) -> (); + callIntFnAsync @21 (context :Proxy.Context, arg :Int32) -> (result :Int32); } interface FooCallback $Proxy.wrap("mp::test::FooCallback") { diff --git a/test/mp/test/foo.h b/test/mp/test/foo.h index 8415f6f0ac6..73f43da3531 100644 --- a/test/mp/test/foo.h +++ b/test/mp/test/foo.h @@ -83,7 +83,9 @@ public: std::shared_ptr m_callback; void callFn() { assert(m_fn); m_fn(); } void callFnAsync() { assert(m_fn); m_fn(); } + int callIntFnAsync(int arg) { assert(m_int_fn); return m_int_fn(arg); } std::function m_fn; + std::function m_int_fn; }; } // namespace test diff --git a/test/mp/test/test.cpp b/test/mp/test/test.cpp index aa155685977..b8df4677ca0 100644 --- a/test/mp/test/test.cpp +++ b/test/mp/test/test.cpp @@ -8,7 +8,9 @@ #include #include #include +#include #include +#include #include #include #include @@ -16,9 +18,7 @@ #include #include #include -#include #include -#include #include #include #include @@ -31,7 +31,6 @@ #include #include #include -#include #include #include #include @@ -98,7 +97,7 @@ public: client_connection->m_rpc_system->bootstrap(ServerVatId().vat_id).castAs(), client_connection.get(), /* destroy_connection= */ client_owns_connection); if (client_owns_connection) { - client_connection.release(); + (void)client_connection.release(); } else { client_disconnect = [&] { loop.sync([&] { client_connection.reset(); }); }; } @@ -317,7 +316,7 @@ KJ_TEST("Calling IPC method, disconnecting and blocking during the call") signal.set_value(); } -KJ_TEST("Make simultaneous IPC calls to trigger 'thread busy' error") +KJ_TEST("Make simultaneous IPC calls on single remote thread") { TestSetup setup; ProxyClient* foo = setup.client.get(); @@ -336,51 +335,39 @@ KJ_TEST("Make simultaneous IPC calls to trigger 'thread busy' error") request_thread = &tc.request_threads.at(foo->m_context.connection)->m_client; }); - setup.server->m_impl->m_fn = [&] { - try - { - signal.get_future().get(); - } - catch (const std::future_error& e) - { - KJ_EXPECT(e.code() == std::make_error_code(std::future_errc::future_already_retrieved)); - } + // Call callIntFnAsync 3 times with n=100, 200, 300 + std::atomic expected = 100; + + setup.server->m_impl->m_int_fn = [&](int n) { + assert(n == expected); + expected += 100; + return n; }; auto client{foo->m_client}; - bool caught_thread_busy = false; - // NOTE: '3' was chosen because it was the lowest number - // of simultaneous calls required to reliably catch a "thread busy" error std::atomic running{3}; foo->m_context.loop->sync([&] { for (size_t i = 0; i < running; i++) { - auto request{client.callFnAsyncRequest()}; + auto request{client.callIntFnAsyncRequest()}; auto context{request.initContext()}; context.setCallbackThread(*callback_thread); context.setThread(*request_thread); + request.setArg(100 * (i+1)); foo->m_context.loop->m_task_set->add(request.send().then( - [&](auto&& results) { + [&running, &tc, i](auto&& results) { + assert(results.getResult() == static_cast(100 * (i+1))); running -= 1; tc.waiter->m_cv.notify_all(); - }, - [&](kj::Exception&& e) { - KJ_EXPECT(std::string_view{e.getDescription().cStr()} == - "remote exception: std::exception: thread busy"); - caught_thread_busy = true; - running -= 1; - signal.set_value(); - tc.waiter->m_cv.notify_all(); - } - )); + })); } }); { Lock lock(tc.waiter->m_mutex); tc.waiter->wait(lock, [&running] { return running == 0; }); } - KJ_EXPECT(caught_thread_busy); + KJ_EXPECT(expected == 400); } } // namespace test From 8fe91f37194edcca1b7dfdd06bd0d4f5b2154e9b Mon Sep 17 00:00:00 2001 From: Ryan Ofsky Date: Tue, 20 Jan 2026 23:36:00 -0500 Subject: [PATCH 3/3] test: Updates needed after bitcoin-core/libmultiprocess#240 Upstream PR bitcoin-core/libmultiprocess#240 fixed various issues which require updates to python IPC tests. Those changes are made in this commit. --- test/functional/interface_ipc.py | 43 ++++++++++--------------- test/functional/interface_ipc_mining.py | 18 +++++------ 2 files changed, 25 insertions(+), 36 deletions(-) diff --git a/test/functional/interface_ipc.py b/test/functional/interface_ipc.py index f90f72b0ece..1e279e00cb3 100755 --- a/test/functional/interface_ipc.py +++ b/test/functional/interface_ipc.py @@ -4,8 +4,6 @@ # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Test the IPC (multiprocess) interface.""" import asyncio -import http.client -import re from contextlib import ExitStack from test_framework.test_framework import BitcoinTestFramework @@ -86,9 +84,9 @@ class IPCInterfaceTest(BitcoinTestFramework): def run_unclean_disconnect_test(self): """Test behavior when disconnecting during an IPC call that later - returns a non-null interface pointer. Currently this behavior causes a - crash as reported https://github.com/bitcoin/bitcoin/issues/34250, but a - followup will change this behavior.""" + returns a non-null interface pointer. This used to cause a crash as + reported https://github.com/bitcoin/bitcoin/issues/34250, but now just + results in a cancellation log message""" node = self.nodes[0] self.log.info("Running disconnect during BlockTemplate.waitNext") timeout = self.rpc_timeout * 1000.0 @@ -110,28 +108,22 @@ class IPCInterfaceTest(BitcoinTestFramework): with node.assert_debug_log(expected_msgs=["BlockTemplate.waitNext", "IPC server post request"], timeout=2): promise = template.waitNext(ctx, waitoptions) await asyncio.sleep(0.1) - disconnected_log_check.enter_context(node.assert_debug_log(expected_msgs=["IPC server: socket disconnected"], timeout=2)) + disconnected_log_check.enter_context(node.assert_debug_log(expected_msgs=["IPC server: socket disconnected", "canceled while executing"], timeout=2)) del promise asyncio.run(capnp.run(async_routine())) # Wait for socket disconnected log message, then generate a block to - # cause the waitNext() call to return a new template. This will cause a - # crash and disconnect with error output. - disconnected_log_check.close() - try: + # cause the waitNext() call to return a new template. Look for a + # canceled IPC log message after waitNext returns. + with node.assert_debug_log(expected_msgs=["interrupted (canceled)"], timeout=2): + disconnected_log_check.close() self.generate(node, 1) - except (http.client.RemoteDisconnected, BrokenPipeError, ConnectionResetError): - pass - node.wait_until_stopped(expected_ret_code=(-11, -6, 1, 66), expected_stderr=re.compile("")) - self.start_node(0) def run_thread_busy_test(self): """Test behavior when sending multiple calls to the same server thread which used to cause a crash as reported - https://github.com/bitcoin/bitcoin/issues/33923 and currently causes a - thread busy error. A future change will make this just queue the calls - for execution and not trigger any error""" + https://github.com/bitcoin/bitcoin/issues/33923.""" node = self.nodes[0] self.log.info("Running thread busy test") timeout = self.rpc_timeout * 1000.0 @@ -151,27 +143,26 @@ class IPCInterfaceTest(BitcoinTestFramework): waitoptions.feeThreshold = 1 # Make multiple waitNext calls where the first will start to - # execute, the second will be posted waiting to execute, and the - # third will fail to execute because the execution thread is busy. + # execute, and the second and third will be posted waiting to + # execute. Previously, the third call would fail calling + # mp::Waiter::post() because the waiting function slot is occupied, + # but now posts are queued. with node.assert_debug_log(expected_msgs=["BlockTemplate.waitNext", "IPC server post request"], timeout=2): promise1 = template.waitNext(ctx, waitoptions) await asyncio.sleep(0.1) with node.assert_debug_log(expected_msgs=["BlockTemplate.waitNext", "IPC server post request"], timeout=2): promise2 = template.waitNext(ctx, waitoptions) await asyncio.sleep(0.1) - try: - await template.waitNext(ctx, waitoptions) - except capnp.lib.capnp.KjException as e: - assert_equal(e.description, "remote exception: std::exception: thread busy") - assert_equal(e.type, "FAILED") - else: - raise AssertionError("Expected thread busy exception") + with node.assert_debug_log(expected_msgs=["BlockTemplate.waitNext", "IPC server post request"], timeout=2): + promise3 = template.waitNext(ctx, waitoptions) + await asyncio.sleep(0.1) # Generate a new block to make the active waitNext calls return, then clean up. with node.assert_debug_log(expected_msgs=["IPC server send response"], timeout=2): self.generate(node, 1, sync_fun=self.no_op) await ((await promise1).result).destroy(ctx) await ((await promise2).result).destroy(ctx) + await ((await promise3).result).destroy(ctx) await template.destroy(ctx) asyncio.run(capnp.run(async_routine())) diff --git a/test/functional/interface_ipc_mining.py b/test/functional/interface_ipc_mining.py index 2221d462879..72919b0cb50 100755 --- a/test/functional/interface_ipc_mining.py +++ b/test/functional/interface_ipc_mining.py @@ -6,7 +6,7 @@ import asyncio from contextlib import AsyncExitStack from io import BytesIO -import re +import platform from test_framework.blocktools import NULL_OUTPOINT from test_framework.messages import ( MAX_BLOCK_WEIGHT, @@ -245,17 +245,15 @@ class IPCMiningTest(BitcoinTestFramework): await mining.createNewBlock(ctx, opts) raise AssertionError("createNewBlock unexpectedly succeeded") except capnp.lib.capnp.KjException as e: - if e.type == "DISCONNECTED": - # The remote exception isn't caught currently and leads to a - # std::terminate call. Just detect and restart in this case. - # This bug is fixed with - # https://github.com/bitcoin-core/libmultiprocess/pull/218 - assert_equal(e.description, "Peer disconnected.") - self.nodes[0].wait_until_stopped(expected_ret_code=(-11, -6, 1, 66), expected_stderr=re.compile("")) - self.start_node(0) + if e.description == "remote exception: unknown non-KJ exception of type: kj::Exception": + # macOS + REDUCE_EXPORTS bug: Cap'n Proto fails to recognize + # its own exception type and returns a generic error instead. + # https://github.com/bitcoin/bitcoin/pull/34422#discussion_r2863852691 + # Assert this only occurs on Darwin until fixed. + assert_equal(platform.system(), "Darwin") else: assert_equal(e.description, "remote exception: std::exception: block_reserved_weight (0) must be at least 2000 weight units") - assert_equal(e.type, "FAILED") + assert_equal(e.type, "FAILED") asyncio.run(capnp.run(async_routine()))