From e5eba302cddfe7fe3ed020470f98311803a0b1b4 Mon Sep 17 00:00:00 2001 From: Enoch Azariah Date: Wed, 3 Jun 2026 14:32:34 +0100 Subject: [PATCH 1/4] doc/version: Bump version 11 > 12 --- doc/versions.md | 9 ++++++++- include/mp/version.h | 2 +- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/doc/versions.md b/doc/versions.md index 3cfa28e3..d1eb2c66 100644 --- a/doc/versions.md +++ b/doc/versions.md @@ -7,9 +7,16 @@ Library versions are tracked with simple Versioning policy is described in the [version.h](../include/mp/version.h) include. -## v11 +## v12 - Current unstable version. +## [v11.0](https://github.com/bitcoin-core/libmultiprocess/commits/v11.0) +- Tolerates unexpected exceptions in event loop `post()` callbacks. +- Tolerates exceptions from remote destroy during cleanup in `ProxyClient`. +- Supports primitive `std::optional` struct fields in the code generator (`mpgen`). +- Adds `TypeName()` and improves debug log coverage for Proxy object lifecycle. +- Updates build compatibility with recent Nix and CMake versions. + ## [v10.0](https://github.com/bitcoin-core/libmultiprocess/commits/v10.0) - Increases spawn test timeout to avoid spurious failures. - Uses `throwRecoverableException` instead of raw `throw` to improve runtime error messages in macOS builds. diff --git a/include/mp/version.h b/include/mp/version.h index 423ed460..4587a288 100644 --- a/include/mp/version.h +++ b/include/mp/version.h @@ -24,7 +24,7 @@ //! pointing at the prior merge commit. The /doc/versions.md file should also be //! updated, noting any significant or incompatible changes made since the //! previous version. -#define MP_MAJOR_VERSION 11 +#define MP_MAJOR_VERSION 12 //! Minor version number. Should be incremented in stable branches after //! backporting changes. The /doc/versions.md file should also be updated to From 40b2e97b272b67ccf4d4a5b9c9255af66699ec7d Mon Sep 17 00:00:00 2001 From: Enoch Azariah Date: Thu, 9 Apr 2026 10:49:30 +0100 Subject: [PATCH 2/4] test: add dedicated ListenConnections coverage Add a separate listen_tests.cpp file with reusable UnixListener, ClientSetup, and ListenSetup helpers for exercising ListenConnections() with real Unix domain sockets. The new test covers the baseline behavior that ListenConnections() accepts an incoming connection and serves requests over it. Keeping this coverage separate from the existing general proxy tests makes the socket listener setup easier to review and provides a clearer place to extend listener-specific behavior in follow-up commits. --- include/mp/proxy-io.h | 4 + test/CMakeLists.txt | 1 + test/mp/test/listen_tests.cpp | 179 ++++++++++++++++++++++++++++++++++ 3 files changed, 184 insertions(+) create mode 100644 test/mp/test/listen_tests.cpp diff --git a/include/mp/proxy-io.h b/include/mp/proxy-io.h index 092ea42e..e8cd3e7a 100644 --- a/include/mp/proxy-io.h +++ b/include/mp/proxy-io.h @@ -356,6 +356,9 @@ class EventLoop //! Hook called on the worker thread just before returning results. std::function testing_hook_async_request_done; + + //! Hook called on the server thread when the client has connected. + std::function testing_hook_connected; }; //! Single element task queue used to handle recursive capnp calls. (If the @@ -842,6 +845,7 @@ void _Serve(EventLoop& loop, kj::Own&& stream, InitImpl& init }); auto it = loop.m_incoming_connections.begin(); MP_LOG(loop, Log::Info) << "IPC server: socket connected."; + if (loop.testing_hook_connected) loop.testing_hook_connected(); it->onDisconnect([&loop, it] { MP_LOG(loop, Log::Info) << "IPC server: socket disconnected."; loop.m_incoming_connections.erase(it); diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 1f21ba44..13246293 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -26,6 +26,7 @@ if(BUILD_TESTING AND TARGET CapnProto::kj-test) ${MP_PROXY_HDRS} mp/test/foo-types.h mp/test/foo.h + mp/test/listen_tests.cpp mp/test/spawn_tests.cpp mp/test/test.cpp ) diff --git a/test/mp/test/listen_tests.cpp b/test/mp/test/listen_tests.cpp new file mode 100644 index 00000000..86e64217 --- /dev/null +++ b/test/mp/test/listen_tests.cpp @@ -0,0 +1,179 @@ +// Copyright (c) The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include // IWYU pragma: keep +#include +#include +#include +#include +#include +#include + +namespace mp { +namespace test { +namespace { + +class UnixListener +{ +public: + UnixListener() + { + char dir_template[] = "/tmp/mptest-listener-XXXXXX"; + char* dir = mkdtemp(dir_template); + KJ_REQUIRE(dir != nullptr); + m_dir = dir; + m_path = m_dir + "/socket"; + + m_fd = socket(AF_UNIX, SOCK_STREAM, 0); + KJ_REQUIRE(m_fd >= 0); + + sockaddr_un addr{}; + addr.sun_family = AF_UNIX; + KJ_REQUIRE(m_path.size() < sizeof(addr.sun_path)); + std::strncpy(addr.sun_path, m_path.c_str(), sizeof(addr.sun_path) - 1); + KJ_REQUIRE(bind(m_fd, reinterpret_cast(&addr), sizeof(addr)) == 0); + KJ_REQUIRE(listen(m_fd, SOMAXCONN) == 0); + } + + ~UnixListener() + { + if (m_fd >= 0) close(m_fd); + if (!m_path.empty()) unlink(m_path.c_str()); + if (!m_dir.empty()) rmdir(m_dir.c_str()); + } + + int release() + { + int fd = m_fd; + m_fd = -1; + return fd; + } + + int Connect() const + { + int fd = socket(AF_UNIX, SOCK_STREAM, 0); + KJ_REQUIRE(fd >= 0); + + sockaddr_un addr{}; + addr.sun_family = AF_UNIX; + KJ_REQUIRE(m_path.size() < sizeof(addr.sun_path)); + std::strncpy(addr.sun_path, m_path.c_str(), sizeof(addr.sun_path) - 1); + KJ_REQUIRE(connect(fd, reinterpret_cast(&addr), sizeof(addr)) == 0); + return fd; + } + +private: + int m_fd{-1}; + std::string m_dir; + std::string m_path; +}; + +class ClientSetup +{ +public: + explicit ClientSetup(int fd) + : thread([this, fd] { + EventLoop loop("mptest-client", [](mp::LogMessage log) { + KJ_LOG(INFO, log.level, log.message); + if (log.level == mp::Log::Raise) throw std::runtime_error(log.message); + }); + client_promise.set_value(ConnectStream(loop, fd)); + loop.loop(); + }) + { + client = client_promise.get_future().get(); + } + + ~ClientSetup() + { + client.reset(); + thread.join(); + } + + std::promise>> client_promise; + std::unique_ptr> client; + + //! Thread variable should be after other struct members so the thread does + //! not start until the other members are initialized. + std::thread thread; +}; + +class ListenSetup +{ +public: + ListenSetup() + : thread([this] { + EventLoop loop("mptest-server", [this](mp::LogMessage log) { + KJ_LOG(INFO, log.level, log.message); + if (log.level == mp::Log::Raise) throw std::runtime_error(log.message); + }); + loop.testing_hook_connected = [&] { + std::lock_guard lock(counter_mutex); + ++connected_count; + counter_cv.notify_all(); + }; + FooImplementation foo; + ListenConnections(loop, listener.release(), foo); + ready_promise.set_value(); + loop.loop(); + }) + { + ready_promise.get_future().get(); + } + + ~ListenSetup() + { + thread.join(); + } + + void WaitForConnectedCount(size_t expected_count) + { + std::unique_lock lock(counter_mutex); + const auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); + const bool matched = counter_cv.wait_until(lock, deadline, [&] { + return connected_count >= expected_count; + }); + KJ_REQUIRE(matched); + } + + UnixListener listener; + std::promise ready_promise; + std::mutex counter_mutex; + std::condition_variable counter_cv; + size_t connected_count{0}; + //! Thread variable should be after other struct members so the thread does + //! not start until the other members are initialized. + std::thread thread; + +}; + +KJ_TEST("ListenConnections accepts incoming connections") +{ + ListenSetup setup; + auto client = std::make_unique(setup.listener.Connect()); + + setup.WaitForConnectedCount(1); + KJ_EXPECT(client->client->add(1, 2) == 3); +} + +} // namespace +} // namespace test +} // namespace mp From 05ea22661d5425a3e16e2498359730537ce25269 Mon Sep 17 00:00:00 2001 From: Enoch Azariah Date: Thu, 9 Apr 2026 10:49:30 +0100 Subject: [PATCH 3/4] proxy: add local connection limit to ListenConnections Add an optional max_connections parameter to ListenConnections() so a listener can stop accepting new connections after reaching a local connection cap and resume accepting after an existing connection disconnects. Implement the limit with listener-local state tracking the listening socket, maximum number of active connections, and whether an async accept() has already been posted. This keeps the limit scoped to the individual listener instead of introducing global EventLoop state. Extend the dedicated listener test coverage to verify that with max_connections=1 the first client is accepted normally, a second client is not accepted while the first remains connected, and the second client is accepted after the first disconnects. --- doc/usage.md | 2 +- doc/versions.md | 3 ++ include/mp/proxy-io.h | 78 +++++++++++++++++++++++++------ include/mp/proxy-types.h | 9 +++- test/mp/test/listen_tests.cpp | 88 +++++++++++++++++++++++++++++++++-- 5 files changed, 159 insertions(+), 21 deletions(-) diff --git a/doc/usage.md b/doc/usage.md index f387db4d..df103e9f 100644 --- a/doc/usage.md +++ b/doc/usage.md @@ -10,7 +10,7 @@ _libmultiprocess_ is a library and code generator that allows calling C++ class The `*.capnp` data definition files are consumed by the _libmultiprocess_ code generator and each `X.capnp` file generates `X.capnp.c++`, `X.capnp.h`, `X.capnp.proxy-client.c++`, `X.capnp.proxy-server.c++`, `X.capnp.proxy-types.c++`, `X.capnp.proxy-types.h`, and `X.capnp.proxy.h` output files. The generated files include `mp::ProxyClient` and `mp::ProxyServer` class specializations for all the interfaces in the `.capnp` files. These allow methods on C++ objects in one process to be called from other processes over IPC sockets. -The `ProxyServer` objects help translate IPC requests from a socket to method calls on a local object. The `ProxyServer` objects are just used internally by the `mp::ServeStream(loop, socket, wrapped_object)` and `mp::ListenConnections(loop, socket, wrapped_object)` functions, and aren't exposed externally. The `ProxyClient` classes are exposed, and returned from the `mp::ConnectStream(loop, socket)` function and meant to be used directly. The classes implement methods described in `.capnp` definitions, and whenever any method is called, a request with the method arguments is sent over the associated IPC connection, and the corresponding `wrapped_object` method on the other end of the connection is called, with the `ProxyClient` method blocking until it returns and forwarding back any return value to the `ProxyClient` method caller. +The `ProxyServer` objects help translate IPC requests from a socket to method calls on a local object. The `ProxyServer` objects are just used internally by the `mp::ServeStream(loop, socket, wrapped_object)` and `mp::ListenConnections(loop, socket, wrapped_object[, max_connections])` functions, and aren't exposed externally. The `ProxyClient` classes are exposed, and returned from the `mp::ConnectStream(loop, socket)` function and meant to be used directly. The classes implement methods described in `.capnp` definitions, and whenever any method is called, a request with the method arguments is sent over the associated IPC connection, and the corresponding `wrapped_object` method on the other end of the connection is called, with the `ProxyClient` method blocking until it returns and forwarding back any return value to the `ProxyClient` method caller. ## Example diff --git a/doc/versions.md b/doc/versions.md index d1eb2c66..67b37651 100644 --- a/doc/versions.md +++ b/doc/versions.md @@ -9,6 +9,9 @@ include. ## v12 - Current unstable version. +- Adds an optional per-listener `max_connections` parameter to `ListenConnections()` + so servers can stop accepting new connections when a local connection cap is reached, + and resume accepting after existing connections disconnect. ## [v11.0](https://github.com/bitcoin-core/libmultiprocess/commits/v11.0) - Tolerates unexpected exceptions in event loop `post()` callbacks. diff --git a/include/mp/proxy-io.h b/include/mp/proxy-io.h index e8cd3e7a..1907a32a 100644 --- a/include/mp/proxy-io.h +++ b/include/mp/proxy-io.h @@ -13,7 +13,9 @@ #include #include +#include #include +#include #include #include #include @@ -834,8 +836,8 @@ std::unique_ptr> ConnectStream(EventLoop& loop, int f //! handles requests from the stream by calling the init object. Embed the //! ProxyServer in a Connection object that is stored and erased if //! disconnected. This should be called from the event loop thread. -template -void _Serve(EventLoop& loop, kj::Own&& stream, InitImpl& init) +template +void _Serve(EventLoop& loop, kj::Own&& stream, InitImpl& init, OnDisconnect&& on_disconnect) { loop.m_incoming_connections.emplace_front(loop, kj::mv(stream), [&](Connection& connection) { // Disable deleter so proxy server object doesn't attempt to delete the @@ -846,23 +848,66 @@ void _Serve(EventLoop& loop, kj::Own&& stream, InitImpl& init auto it = loop.m_incoming_connections.begin(); MP_LOG(loop, Log::Info) << "IPC server: socket connected."; if (loop.testing_hook_connected) loop.testing_hook_connected(); - it->onDisconnect([&loop, it] { + it->onDisconnect([&loop, it, on_disconnect = std::forward(on_disconnect)]() mutable { MP_LOG(loop, Log::Info) << "IPC server: socket disconnected."; loop.m_incoming_connections.erase(it); + on_disconnect(); }); } -//! Given connection receiver and an init object, handle incoming connections by -//! calling _Serve, to create ProxyServer objects and forward requests to the -//! init object. +struct ListenState +{ + explicit ListenState(kj::Own&& listener_, std::optional max_connections_) + : listener(kj::mv(listener_)), max_connections(max_connections_) {} + + kj::Own listener; + std::optional max_connections; + size_t active_connections{0}; +}; + +inline bool _ListenAtCapacity(const ListenState& state) +{ + return state.max_connections && state.active_connections >= *state.max_connections; +} + +//! Return an EventLoopRef if this is a capped listener, to keep the event loop +//! alive while waiting for connections. +//! +//! For uncapped listeners, we do not hold an EventLoopRef because the event +//! loop should automatically exit when the last active client disconnects (when +//! m_num_clients drops to 0). +//! +//! For capped listeners, we stop calling ptr->accept() when at capacity. Since +//! no accept task remains pending in the task set, we must hold an EventLoopRef +//! while listening to keep the event loop alive until the limit is reached. +inline std::unique_ptr _MakeCappedListenerRef(EventLoop& loop, const ListenState& state) +{ + return state.max_connections && *state.max_connections > 0 ? std::make_unique(loop) : nullptr; +} + +//! Given init object and a state object containing a connection receiver, handle +//! incoming connections by calling _Serve, to create ProxyServer objects and +//! forward requests to the init object. template -void _Listen(EventLoop& loop, kj::Own&& listener, InitImpl& init) +void _Listen(EventLoop& loop, InitImpl& init, const std::shared_ptr& state) { - auto* ptr = listener.get(); + if (_ListenAtCapacity(*state)) return; + + auto* ptr = state->listener.get(); + // Bind the EventLoopRef lifetime to the pending accept task. If the listener + // reaches capacity, _Listen returns early, destroying the accept_ref and + // letting the event loop client count decrement. + auto accept_ref{_MakeCappedListenerRef(loop, *state)}; loop.m_task_set->add(ptr->accept().then( - [&loop, &init, listener = kj::mv(listener)](kj::Own&& stream) mutable { - _Serve(loop, kj::mv(stream), init); - _Listen(loop, kj::mv(listener), init); + [&loop, &init, state, accept_ref = std::move(accept_ref)](kj::Own&& stream) mutable { + ++state->active_connections; + _Serve(loop, kj::mv(stream), init, [&loop, &init, state] { + const bool resume_accept{_ListenAtCapacity(*state)}; + assert(state->active_connections > 0); + --state->active_connections; + if (resume_accept) _Listen(loop, init, state); + }); + _Listen(loop, init, state); })); } @@ -872,18 +917,21 @@ template void ServeStream(EventLoop& loop, int fd, InitImpl& init) { _Serve( - loop, loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP), init); + loop, + loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP), + init, + [] {}); } //! Given listening socket file descriptor and an init object, handle incoming //! connections and requests by calling methods on the Init object. template -void ListenConnections(EventLoop& loop, int fd, InitImpl& init) +void ListenConnections(EventLoop& loop, int fd, InitImpl& init, std::optional max_connections = std::nullopt) { loop.sync([&]() { - _Listen(loop, + _Listen(loop, init, std::make_shared( loop.m_io_context.lowLevelProvider->wrapListenSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP), - init); + max_connections)); }); } diff --git a/include/mp/proxy-types.h b/include/mp/proxy-types.h index 1ccf423f..876df3db 100644 --- a/include/mp/proxy-types.h +++ b/include/mp/proxy-types.h @@ -655,7 +655,14 @@ struct CapRequestTraits<::capnp::Request<_Params, _Results>> template void clientDestroy(Client& client) { - MP_LOG(*client.m_context.loop, Log::Debug) << "IPC client destroy " << CxxTypeName(client); + // Lock is needed because the sync cleanup callback on the event loop + // thread can concurrently set m_context.connection to nullptr. + Lock lock{client.m_context.loop->m_mutex}; + if (client.m_context.connection) { + MP_LOG(*client.m_context.loop, Log::Debug) << "IPC client destroy " << CxxTypeName(client); + } else { + KJ_LOG(INFO, "IPC interrupted client destroy", CxxTypeName(client)); + } } template diff --git a/test/mp/test/listen_tests.cpp b/test/mp/test/listen_tests.cpp index 86e64217..362882e7 100644 --- a/test/mp/test/listen_tests.cpp +++ b/test/mp/test/listen_tests.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include // IWYU pragma: keep #include #include @@ -119,19 +120,28 @@ class ClientSetup class ListenSetup { public: - ListenSetup() - : thread([this] { + explicit ListenSetup(std::optional max_connections = std::nullopt) + : capped_listener(max_connections.has_value()), thread([this, max_connections] { EventLoop loop("mptest-server", [this](mp::LogMessage log) { KJ_LOG(INFO, log.level, log.message); if (log.level == mp::Log::Raise) throw std::runtime_error(log.message); + if (log.message.find("IPC server: socket disconnected.") != std::string::npos) { + std::lock_guard lock(counter_mutex); + ++disconnected_count; + counter_cv.notify_all(); + } }); loop.testing_hook_connected = [&] { std::lock_guard lock(counter_mutex); ++connected_count; counter_cv.notify_all(); }; + { + std::lock_guard lock(counter_mutex); + event_loop = &loop; + } FooImplementation foo; - ListenConnections(loop, listener.release(), foo); + ListenConnections(loop, listener.release(), foo, max_connections); ready_promise.set_value(); loop.loop(); }) @@ -141,9 +151,23 @@ class ListenSetup ~ListenSetup() { + if (capped_listener) { + EventLoop* loop; + { + std::lock_guard lock(counter_mutex); + loop = event_loop; + } + if (loop) loop->sync([&] { loop->m_task_set.reset(); }); + } thread.join(); } + size_t ConnectedCount() + { + std::lock_guard lock(counter_mutex); + return connected_count; + } + void WaitForConnectedCount(size_t expected_count) { std::unique_lock lock(counter_mutex); @@ -154,15 +178,27 @@ class ListenSetup KJ_REQUIRE(matched); } + void WaitForDisconnectedCount(size_t expected_count) + { + std::unique_lock lock(counter_mutex); + const auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); + const bool matched = counter_cv.wait_until(lock, deadline, [&] { + return disconnected_count >= expected_count; + }); + KJ_REQUIRE(matched); + } + UnixListener listener; std::promise ready_promise; + bool capped_listener{false}; std::mutex counter_mutex; std::condition_variable counter_cv; + EventLoop* event_loop{nullptr}; size_t connected_count{0}; + size_t disconnected_count{0}; //! Thread variable should be after other struct members so the thread does //! not start until the other members are initialized. std::thread thread; - }; KJ_TEST("ListenConnections accepts incoming connections") @@ -174,6 +210,50 @@ KJ_TEST("ListenConnections accepts incoming connections") KJ_EXPECT(client->client->add(1, 2) == 3); } +KJ_TEST("ListenConnections enforces a local connection limit") +{ + ListenSetup setup(/*max_connections=*/1); + + auto client1 = std::make_unique(setup.listener.Connect()); + setup.WaitForConnectedCount(1); + KJ_EXPECT(client1->client->add(1, 2) == 3); + + auto client2 = std::make_unique(setup.listener.Connect()); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + KJ_EXPECT(setup.ConnectedCount() == 1); + + client1.reset(); + setup.WaitForDisconnectedCount(1); + setup.WaitForConnectedCount(2); + + KJ_EXPECT(client2->client->add(2, 3) == 5); + + client2.reset(); + setup.WaitForDisconnectedCount(2); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + auto client3 = std::make_unique(setup.listener.Connect()); + setup.WaitForConnectedCount(3); + KJ_EXPECT(client3->client->add(3, 4) == 7); +} + +KJ_TEST("ListenConnections keeps capped listeners alive before reaching the limit") +{ + ListenSetup setup(/*max_connections=*/2); + + auto client1 = std::make_unique(setup.listener.Connect()); + setup.WaitForConnectedCount(1); + KJ_EXPECT(client1->client->add(1, 2) == 3); + + client1.reset(); + setup.WaitForDisconnectedCount(1); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + auto client2 = std::make_unique(setup.listener.Connect()); + setup.WaitForConnectedCount(2); + KJ_EXPECT(client2->client->add(2, 3) == 5); +} + } // namespace } // namespace test } // namespace mp From b8453b4b949ad16462342f085c86f793385d93de Mon Sep 17 00:00:00 2001 From: Enoch Azariah Date: Sun, 14 Jun 2026 07:55:12 +0100 Subject: [PATCH 4/4] mpgen: add missing includes for IWYU compliance Generated proxy-types source files use clientDestroy/serverDestroy and arrayPtr from kj/common.h, but only included them transitively. IWYU with --error requires direct includes, causing CI build failure. --- src/mp/gen.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/mp/gen.cpp b/src/mp/gen.cpp index c8db469d..b650e86f 100644 --- a/src/mp/gen.cpp +++ b/src/mp/gen.cpp @@ -352,7 +352,9 @@ static void Generate(kj::StringPtr src_prefix, cpp_types << "#include <" << include_path << ".h> // IWYU pragma: keep\n"; cpp_types << "#include <" << include_path << ".proxy.h>\n"; cpp_types << "#include <" << include_path << ".proxy-types.h> // IWYU pragma: keep\n"; - cpp_types << "#include <" << PROXY_TYPES << ">\n\n"; + cpp_types << "#include \n"; + cpp_types << "#include <" << PROXY_TYPES << ">\n"; + cpp_types << "#include \"mp/util.h\"\n\n"; cpp_types << "namespace mp {\n"; std::string guard = output_path;