Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion doc/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ _libmultiprocess_ is a library and code generator that allows calling C++ class

The `*.capnp` data definition files are consumed by the _libmultiprocess_ code generator and each `X.capnp` file generates `X.capnp.c++`, `X.capnp.h`, `X.capnp.proxy-client.c++`, `X.capnp.proxy-server.c++`, `X.capnp.proxy-types.c++`, `X.capnp.proxy-types.h`, and `X.capnp.proxy.h` output files. The generated files include `mp::ProxyClient<Interface>` and `mp::ProxyServer<Interface>` class specializations for all the interfaces in the `.capnp` files. These allow methods on C++ objects in one process to be called from other processes over IPC sockets.

The `ProxyServer` objects help translate IPC requests from a socket to method calls on a local object. The `ProxyServer` objects are just used internally by the `mp::ServeStream(loop, socket, wrapped_object)` and `mp::ListenConnections(loop, socket, wrapped_object)` functions, and aren't exposed externally. The `ProxyClient` classes are exposed, and returned from the `mp::ConnectStream(loop, socket)` function and meant to be used directly. The classes implement methods described in `.capnp` definitions, and whenever any method is called, a request with the method arguments is sent over the associated IPC connection, and the corresponding `wrapped_object` method on the other end of the connection is called, with the `ProxyClient` method blocking until it returns and forwarding back any return value to the `ProxyClient` method caller.
The `ProxyServer` objects help translate IPC requests from a socket to method calls on a local object. The `ProxyServer` objects are just used internally by the `mp::ServeStream(loop, socket, wrapped_object)` and `mp::ListenConnections(loop, socket, wrapped_object[, max_connections])` functions, and aren't exposed externally. The `ProxyClient` classes are exposed, and returned from the `mp::ConnectStream(loop, socket)` function and meant to be used directly. The classes implement methods described in `.capnp` definitions, and whenever any method is called, a request with the method arguments is sent over the associated IPC connection, and the corresponding `wrapped_object` method on the other end of the connection is called, with the `ProxyClient` method blocking until it returns and forwarding back any return value to the `ProxyClient` method caller.

## Example

Expand Down
12 changes: 11 additions & 1 deletion doc/versions.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,18 @@ Library versions are tracked with simple
Versioning policy is described in the [version.h](../include/mp/version.h)
include.

## v11
## v12
- Current unstable version.
- Adds an optional per-listener `max_connections` parameter to `ListenConnections()`
so servers can stop accepting new connections when a local connection cap is reached,
and resume accepting after existing connections disconnect.

## [v11.0](https://github.com/bitcoin-core/libmultiprocess/commits/v11.0)
- Tolerates unexpected exceptions in event loop `post()` callbacks.
- Tolerates exceptions from remote destroy during cleanup in `ProxyClient`.
- Supports primitive `std::optional` struct fields in the code generator (`mpgen`).
- Adds `TypeName()` and improves debug log coverage for Proxy object lifecycle.
- Updates build compatibility with recent Nix and CMake versions.

## [v10.0](https://github.com/bitcoin-core/libmultiprocess/commits/v10.0)
- Increases spawn test timeout to avoid spurious failures.
Expand Down
85 changes: 70 additions & 15 deletions include/mp/proxy-io.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
#include <capnp/rpc-twoparty.h>

#include <assert.h>
#include <algorithm>
#include <condition_variable>
#include <cstdlib>
#include <functional>
#include <kj/function.h>
#include <map>
Expand Down Expand Up @@ -356,6 +358,9 @@ class EventLoop

//! Hook called on the worker thread just before returning results.
std::function<void()> testing_hook_async_request_done;

//! Hook called on the server thread when the client has connected.
std::function<void()> testing_hook_connected;
};

//! Single element task queue used to handle recursive capnp calls. (If the
Expand Down Expand Up @@ -831,8 +836,8 @@ std::unique_ptr<ProxyClient<InitInterface>> ConnectStream(EventLoop& loop, int f
//! handles requests from the stream by calling the init object. Embed the
//! ProxyServer in a Connection object that is stored and erased if
//! disconnected. This should be called from the event loop thread.
template <typename InitInterface, typename InitImpl>
void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init)
template <typename InitInterface, typename InitImpl, typename OnDisconnect>
void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init, OnDisconnect&& on_disconnect)
{
loop.m_incoming_connections.emplace_front(loop, kj::mv(stream), [&](Connection& connection) {
// Disable deleter so proxy server object doesn't attempt to delete the
Expand All @@ -842,23 +847,70 @@ void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init
});
auto it = loop.m_incoming_connections.begin();
MP_LOG(loop, Log::Info) << "IPC server: socket connected.";
it->onDisconnect([&loop, it] {
if (loop.testing_hook_connected) loop.testing_hook_connected();
it->onDisconnect([&loop, it, on_disconnect = std::forward<OnDisconnect>(on_disconnect)]() mutable {
MP_LOG(loop, Log::Info) << "IPC server: socket disconnected.";
loop.m_incoming_connections.erase(it);
on_disconnect();
});
}

//! Given connection receiver and an init object, handle incoming connections by
//! calling _Serve, to create ProxyServer objects and forward requests to the
//! init object.
struct ListenState
{
explicit ListenState(kj::Own<kj::ConnectionReceiver>&& listener_, std::optional<size_t> max_connections_)
: listener(kj::mv(listener_)), max_connections(max_connections_) {}

kj::Own<kj::ConnectionReceiver> listener;
std::optional<size_t> max_connections;
size_t active_connections{0};
};

template <typename InitInterface, typename InitImpl>
void _Listen(EventLoop& loop, InitImpl& init, const std::shared_ptr<ListenState>& state);

inline bool _ListenAtCapacity(const ListenState& state)
{
return state.max_connections && state.active_connections >= *state.max_connections;
}

//! Return an EventLoopRef if this is a capped listener, to keep the event loop
//! alive while waiting for connections.
//!
//! For uncapped listeners, we do not hold an EventLoopRef because the event
//! loop should automatically exit when the last active client disconnects (when
//! m_num_clients drops to 0).
//!
//! For capped listeners, we stop calling ptr->accept() when at capacity. Since
//! no accept task remains pending in the task set, we must hold an EventLoopRef
//! while listening to keep the event loop alive until the limit is reached.
inline std::unique_ptr<EventLoopRef> _MakeCappedListenerRef(EventLoop& loop, const ListenState& state)
{
return state.max_connections && *state.max_connections > 0 ? std::make_unique<EventLoopRef>(loop) : nullptr;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In commit "proxy: add local connection limit to ListenConnections" (7e51ab2)

Would be helpful to have a code comment explaining this logic. I don't think I understand why it avoiding creating an EventLoopRef if there is no connection limit. I'm also not clear on why passing accept_ref around separately is needed. Naively I think I'd just expect ListenState to have an EventLoopRef member.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I added explanatory comments to both _MakeCappedListenerRef. I avoid holding EventLoopRef in the uncapped case because the loop is always listening, and holding a ref would prevent it from automatically exiting when all client connections close.

Passing accept_ref in the callback instead of storing it as a member would bind the ref's lifetime directly to the active accept() task. This means that when the listener reaches capacity, we return early and don't register a new task, which automatically destroys the ref and lets the loop exit.

}

//! Given init object and a state object containing a connection receiver, handle
//! incoming connections by calling _Serve, to create ProxyServer objects and
//! forward requests to the init object.
template <typename InitInterface, typename InitImpl>
void _Listen(EventLoop& loop, kj::Own<kj::ConnectionReceiver>&& listener, InitImpl& init)
void _Listen(EventLoop& loop, InitImpl& init, const std::shared_ptr<ListenState>& state)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In commit "proxy: add local connection limit to ListenConnections" (7e51ab2)

This function used to have a code comment "//! Given connection receiver and an init object, handle..." that looks like it became detached. Would be good to move it down next this function and update it ("Given init object and a state object containing a connection receiver, handle...").

It might also be clearer to turn this into a ListenState method. (And if doing that, rename ListenState to Listener, rename listener to m_receiver, rename state to self, make ListenAtCapacity another method, etc). Just an idea though, I haven't studied this code enough yet.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. The comment has been moved down above the _Listen function, and its wording has been updated to reflect the new init and state parameters as suggested.

Regarding refactoring _Listen into a member method of ListenState. I decided to keep it as a free function instead.

The _Listen template function registers asynchronous handlers via ptr->accept().then(...) , turning it into a method would require ListenState to inherit from std::enable_shared_from_this to pass self safely into the lambdas.

I think keeping it as a simple struct/free function avoids this boilerplate and remains consistent with the other helper functions in proxy-io.h

{
auto* ptr = listener.get();
if (_ListenAtCapacity(*state)) return;

auto* ptr = state->listener.get();
// Bind the EventLoopRef lifetime to the pending accept task. If the listener
// reaches capacity, _Listen returns early, destroying the accept_ref and
// letting the event loop client count decrement.
auto accept_ref{_MakeCappedListenerRef(loop, *state)};
loop.m_task_set->add(ptr->accept().then(
[&loop, &init, listener = kj::mv(listener)](kj::Own<kj::AsyncIoStream>&& stream) mutable {
_Serve<InitInterface>(loop, kj::mv(stream), init);
_Listen<InitInterface>(loop, kj::mv(listener), init);
[&loop, &init, state, accept_ref = std::move(accept_ref)](kj::Own<kj::AsyncIoStream>&& stream) mutable {
++state->active_connections;
_Serve<InitInterface>(loop, kj::mv(stream), init, [&loop, &init, state] {
const bool resume_accept{_ListenAtCapacity(*state)};
assert(state->active_connections > 0);
--state->active_connections;
if (resume_accept) _Listen<InitInterface>(loop, init, state);
});
_Listen<InitInterface>(loop, init, state);
Copy link
Copy Markdown
Contributor

@xyzconstant xyzconstant May 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the accept_pending check in place, this _Listen call will never be reached.

NOTE: if you apply this patch here (comment), then it will be needed here because we can't rely on the second _Listen call in _ServeAccepted which is executed conditionally after active_connections reached the cap.

}));
}

Expand All @@ -868,18 +920,21 @@ template <typename InitInterface, typename InitImpl>
void ServeStream(EventLoop& loop, int fd, InitImpl& init)
{
_Serve<InitInterface>(
loop, loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP), init);
loop,
loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
init,
[] {});
}

//! Given listening socket file descriptor and an init object, handle incoming
//! connections and requests by calling methods on the Init object.
template <typename InitInterface, typename InitImpl>
void ListenConnections(EventLoop& loop, int fd, InitImpl& init)
void ListenConnections(EventLoop& loop, int fd, InitImpl& init, std::optional<size_t> max_connections = std::nullopt)
{
loop.sync([&]() {
_Listen<InitInterface>(loop,
_Listen<InitInterface>(loop, init, std::make_shared<ListenState>(
loop.m_io_context.lowLevelProvider->wrapListenSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
init);
max_connections));
});
}

Expand Down
3 changes: 3 additions & 0 deletions include/mp/proxy-types.h
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,9 @@ struct CapRequestTraits<::capnp::Request<_Params, _Results>>
template <typename Client>
void clientDestroy(Client& client)
{
// Lock is needed because the sync cleanup callback on the event loop
// thread can concurrently set m_context.connection to nullptr.
Lock lock{client.m_context.loop->m_mutex};
if (client.m_context.connection) {
MP_LOG(*client.m_context.loop, Log::Debug) << "IPC client destroy " << CxxTypeName(client);
} else {
Expand Down
2 changes: 1 addition & 1 deletion include/mp/version.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
//! pointing at the prior merge commit. The /doc/versions.md file should also be
//! updated, noting any significant or incompatible changes made since the
//! previous version.
#define MP_MAJOR_VERSION 11
#define MP_MAJOR_VERSION 12

//! Minor version number. Should be incremented in stable branches after
//! backporting changes. The /doc/versions.md file should also be updated to
Expand Down
1 change: 1 addition & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ if(BUILD_TESTING AND TARGET CapnProto::kj-test)
${MP_PROXY_HDRS}
mp/test/foo-types.h
mp/test/foo.h
mp/test/listen_tests.cpp
mp/test/spawn_tests.cpp
mp/test/test.cpp
)
Expand Down
Loading
Loading