Skip to content

Commit b36e98b

Browse files
committed
proxy: add local connection limit to ListenConnections
Add an optional max_connections parameter to ListenConnections() so a listener can stop accepting new connections after reaching a local connection cap and resume accepting after an existing connection disconnects. Implement the limit with listener-local state tracking the listening socket, maximum number of active connections, and whether an async accept() has already been posted. This keeps the limit scoped to the individual listener instead of introducing global EventLoop state. Extend the dedicated listener test coverage to verify that with max_connections=1 the first client is accepted normally, a second client is not accepted while the first remains connected, and the second client is accepted after the first disconnects.
1 parent d0cff01 commit b36e98b

5 files changed

Lines changed: 147 additions & 18 deletions

File tree

doc/usage.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ _libmultiprocess_ is a library and code generator that allows calling C++ class
1010

1111
The `*.capnp` data definition files are consumed by the _libmultiprocess_ code generator and each `X.capnp` file generates `X.capnp.c++`, `X.capnp.h`, `X.capnp.proxy-client.c++`, `X.capnp.proxy-server.c++`, `X.capnp.proxy-types.c++`, `X.capnp.proxy-types.h`, and `X.capnp.proxy.h` output files. The generated files include `mp::ProxyClient<Interface>` and `mp::ProxyServer<Interface>` class specializations for all the interfaces in the `.capnp` files. These allow methods on C++ objects in one process to be called from other processes over IPC sockets.
1212

13-
The `ProxyServer` objects help translate IPC requests from a socket to method calls on a local object. The `ProxyServer` objects are just used internally by the `mp::ServeStream(loop, socket, wrapped_object)` and `mp::ListenConnections(loop, socket, wrapped_object)` functions, and aren't exposed externally. The `ProxyClient` classes are exposed, and returned from the `mp::ConnectStream(loop, socket)` function and meant to be used directly. The classes implement methods described in `.capnp` definitions, and whenever any method is called, a request with the method arguments is sent over the associated IPC connection, and the corresponding `wrapped_object` method on the other end of the connection is called, with the `ProxyClient` method blocking until it returns and forwarding back any return value to the `ProxyClient` method caller.
13+
The `ProxyServer` objects help translate IPC requests from a socket to method calls on a local object. The `ProxyServer` objects are just used internally by the `mp::ServeStream(loop, socket, wrapped_object)` and `mp::ListenConnections(loop, socket, wrapped_object[, max_connections])` functions, and aren't exposed externally. The `ProxyClient` classes are exposed, and returned from the `mp::ConnectStream(loop, socket)` function and meant to be used directly. The classes implement methods described in `.capnp` definitions, and whenever any method is called, a request with the method arguments is sent over the associated IPC connection, and the corresponding `wrapped_object` method on the other end of the connection is called, with the `ProxyClient` method blocking until it returns and forwarding back any return value to the `ProxyClient` method caller.
1414

1515
## Example
1616

doc/versions.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,14 @@ Library versions are tracked with simple
77
Versioning policy is described in the [version.h](../include/mp/version.h)
88
include.
99

10-
## v10
10+
## v11
1111
- Current unstable version.
12+
- Adds an optional per-listener `max_connections` parameter to `ListenConnections()`
13+
so servers can stop accepting new connections when a local connection cap is reached,
14+
and resume accepting after existing connections disconnect.
15+
16+
## [v10.0](https://github.com/bitcoin-core/libmultiprocess/commits/v10.0)
17+
- Prior unstable version before local listener connection-limit support.
1218

1319
## [v9.0](https://github.com/bitcoin-core/libmultiprocess/commits/v9.0)
1420
- Fixes race conditions where worker thread could be used after destruction, where getParams() could be called after request cancel, and where m_on_cancel could be called after request finishes.

include/mp/proxy-io.h

Lines changed: 72 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@
1313
#include <capnp/rpc-twoparty.h>
1414

1515
#include <assert.h>
16+
#include <algorithm>
1617
#include <condition_variable>
18+
#include <cstdlib>
1719
#include <functional>
1820
#include <kj/function.h>
1921
#include <map>
@@ -820,8 +822,8 @@ std::unique_ptr<ProxyClient<InitInterface>> ConnectStream(EventLoop& loop, int f
820822
//! handles requests from the stream by calling the init object. Embed the
821823
//! ProxyServer in a Connection object that is stored and erased if
822824
//! disconnected. This should be called from the event loop thread.
823-
template <typename InitInterface, typename InitImpl>
824-
void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init)
825+
template <typename InitInterface, typename InitImpl, typename OnDisconnect>
826+
void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init, OnDisconnect&& on_disconnect)
825827
{
826828
loop.m_incoming_connections.emplace_front(loop, kj::mv(stream), [&](Connection& connection) {
827829
// Disable deleter so proxy server object doesn't attempt to delete the
@@ -831,23 +833,78 @@ void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init
831833
});
832834
auto it = loop.m_incoming_connections.begin();
833835
MP_LOG(loop, Log::Info) << "IPC server: socket connected.";
834-
it->onDisconnect([&loop, it] {
836+
it->onDisconnect([&loop, it, on_disconnect = std::forward<OnDisconnect>(on_disconnect)]() mutable {
835837
MP_LOG(loop, Log::Info) << "IPC server: socket disconnected.";
836838
loop.m_incoming_connections.erase(it);
839+
on_disconnect();
837840
});
838841
}
839842

840843
//! Given connection receiver and an init object, handle incoming connections by
841844
//! calling _Serve, to create ProxyServer objects and forward requests to the
842845
//! init object.
846+
struct ListenState
847+
{
848+
explicit ListenState(kj::Own<kj::ConnectionReceiver>&& listener_, std::optional<size_t> max_connections_)
849+
: listener(kj::mv(listener_)), max_connections(max_connections_) {}
850+
851+
kj::Own<kj::ConnectionReceiver> listener;
852+
std::optional<size_t> max_connections;
853+
//! Keeps a capped listener alive after a slot opens and a new accept is
854+
//! posted, so future clients can connect after an idle gap.
855+
std::unique_ptr<EventLoopRef> resume_ref;
856+
size_t active_connections{0};
857+
//! Tracks whether accept() has already been posted. This is needed because
858+
//! active_connections only counts accepted connections, so without a
859+
//! separate flag, nested _Listen() calls could queue multiple pending
860+
//! accepts before active_connections increases.
861+
bool accept_pending{false};
862+
};
863+
864+
template <typename InitInterface, typename InitImpl>
865+
void _Listen(EventLoop& loop, InitImpl& init, const std::shared_ptr<ListenState>& state);
866+
867+
template <typename InitInterface, typename InitImpl>
868+
void _ServeAccepted(EventLoop& loop, InitImpl& init, const std::shared_ptr<ListenState>& state, kj::Own<kj::AsyncIoStream>&& stream)
869+
{
870+
++state->active_connections;
871+
_Serve<InitInterface>(loop, kj::mv(stream), init, [&loop, &init, state] {
872+
assert(state->active_connections > 0);
873+
--state->active_connections;
874+
_Listen<InitInterface>(loop, init, state);
875+
});
876+
}
877+
878+
inline bool _ListenAtCapacity(const ListenState& state)
879+
{
880+
return state.max_connections && state.active_connections >= *state.max_connections;
881+
}
882+
883+
inline void _KeepCappedListenerAlive(EventLoop& loop, ListenState& state)
884+
{
885+
if (state.max_connections && *state.max_connections > 0 && !state.resume_ref) {
886+
state.resume_ref = std::make_unique<EventLoopRef>(loop);
887+
}
888+
}
889+
843890
template <typename InitInterface, typename InitImpl>
844-
void _Listen(EventLoop& loop, kj::Own<kj::ConnectionReceiver>&& listener, InitImpl& init)
891+
void _Listen(EventLoop& loop, InitImpl& init, const std::shared_ptr<ListenState>& state)
845892
{
846-
auto* ptr = listener.get();
893+
if (state->accept_pending) return;
894+
if (_ListenAtCapacity(*state)) return;
895+
896+
state->accept_pending = true;
897+
auto* ptr = state->listener.get();
898+
auto resume_ref{std::move(state->resume_ref)};
847899
loop.m_task_set->add(ptr->accept().then(
848-
[&loop, &init, listener = kj::mv(listener)](kj::Own<kj::AsyncIoStream>&& stream) mutable {
849-
_Serve<InitInterface>(loop, kj::mv(stream), init);
850-
_Listen<InitInterface>(loop, kj::mv(listener), init);
900+
[&loop, &init, state, resume_ref = std::move(resume_ref)](kj::Own<kj::AsyncIoStream>&& stream) mutable {
901+
state->accept_pending = false;
902+
_ServeAccepted<InitInterface>(loop, init, state, kj::mv(stream));
903+
if (_ListenAtCapacity(*state)) {
904+
_KeepCappedListenerAlive(loop, *state);
905+
return;
906+
}
907+
_Listen<InitInterface>(loop, init, state);
851908
}));
852909
}
853910

@@ -857,18 +914,21 @@ template <typename InitInterface, typename InitImpl>
857914
void ServeStream(EventLoop& loop, int fd, InitImpl& init)
858915
{
859916
_Serve<InitInterface>(
860-
loop, loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP), init);
917+
loop,
918+
loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
919+
init,
920+
[] {});
861921
}
862922

863923
//! Given listening socket file descriptor and an init object, handle incoming
864924
//! connections and requests by calling methods on the Init object.
865925
template <typename InitInterface, typename InitImpl>
866-
void ListenConnections(EventLoop& loop, int fd, InitImpl& init)
926+
void ListenConnections(EventLoop& loop, int fd, InitImpl& init, std::optional<size_t> max_connections = std::nullopt)
867927
{
868928
loop.sync([&]() {
869-
_Listen<InitInterface>(loop,
929+
_Listen<InitInterface>(loop, init, std::make_shared<ListenState>(
870930
loop.m_io_context.lowLevelProvider->wrapListenSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
871-
init);
931+
max_connections));
872932
});
873933
}
874934

include/mp/version.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
//! pointing at the prior merge commit. The /doc/versions.md file should also be
2525
//! updated, noting any significant or incompatible changes made since the
2626
//! previous version.
27-
#define MP_MAJOR_VERSION 10
27+
#define MP_MAJOR_VERSION 11
2828

2929
//! Minor version number. Should be incremented in stable branches after
3030
//! backporting changes. The /doc/versions.md file should also be updated to

test/mp/test/listen_tests.cpp

Lines changed: 66 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include <memory>
1919
#include <mp/proxy-io.h>
2020
#include <mutex>
21+
#include <optional>
2122
#include <ratio> // IWYU pragma: keep
2223
#include <stdexcept>
2324
#include <string>
@@ -114,18 +115,26 @@ class ClientSetup
114115
class ListenSetup
115116
{
116117
public:
117-
ListenSetup()
118-
: thread([this] {
118+
explicit ListenSetup(std::optional<size_t> max_connections = std::nullopt)
119+
: capped_listener(max_connections.has_value()), thread([this, max_connections] {
119120
EventLoop loop("mptest-server", [this](mp::LogMessage log) {
120121
if (log.level == mp::Log::Raise) throw std::runtime_error(log.message);
121122
if (log.message.find("IPC server: socket connected.") != std::string::npos) {
122123
std::lock_guard<std::mutex> lock(counter_mutex);
123124
++connected_count;
124125
counter_cv.notify_all();
126+
} else if (log.message.find("IPC server: socket disconnected.") != std::string::npos) {
127+
std::lock_guard<std::mutex> lock(counter_mutex);
128+
++disconnected_count;
129+
counter_cv.notify_all();
125130
}
126131
});
132+
{
133+
std::lock_guard<std::mutex> lock(counter_mutex);
134+
event_loop = &loop;
135+
}
127136
FooImplementation foo;
128-
ListenConnections<messages::FooInterface>(loop, listener.release(), foo);
137+
ListenConnections<messages::FooInterface>(loop, listener.release(), foo, max_connections);
129138
ready_promise.set_value();
130139
loop.loop();
131140
})
@@ -135,9 +144,23 @@ class ListenSetup
135144

136145
~ListenSetup()
137146
{
147+
if (capped_listener) {
148+
EventLoop* loop;
149+
{
150+
std::lock_guard<std::mutex> lock(counter_mutex);
151+
loop = event_loop;
152+
}
153+
if (loop) loop->sync([&] { loop->m_task_set.reset(); });
154+
}
138155
thread.join();
139156
}
140157

158+
size_t ConnectedCount()
159+
{
160+
std::lock_guard<std::mutex> lock(counter_mutex);
161+
return connected_count;
162+
}
163+
141164
void WaitForConnectedCount(size_t expected_count)
142165
{
143166
std::unique_lock<std::mutex> lock(counter_mutex);
@@ -148,12 +171,25 @@ class ListenSetup
148171
KJ_REQUIRE(matched);
149172
}
150173

174+
void WaitForDisconnectedCount(size_t expected_count)
175+
{
176+
std::unique_lock<std::mutex> lock(counter_mutex);
177+
const auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5);
178+
const bool matched = counter_cv.wait_until(lock, deadline, [&] {
179+
return disconnected_count >= expected_count;
180+
});
181+
KJ_REQUIRE(matched);
182+
}
183+
151184
UnixListener listener;
152185
std::promise<void> ready_promise;
186+
bool capped_listener{false};
153187
std::thread thread;
154188
std::mutex counter_mutex;
155189
std::condition_variable counter_cv;
190+
EventLoop* event_loop{nullptr};
156191
size_t connected_count{0};
192+
size_t disconnected_count{0};
157193
};
158194

159195
KJ_TEST("ListenConnections accepts incoming connections")
@@ -165,6 +201,33 @@ KJ_TEST("ListenConnections accepts incoming connections")
165201
KJ_EXPECT(client->client->add(1, 2) == 3);
166202
}
167203

204+
KJ_TEST("ListenConnections enforces a local connection limit")
205+
{
206+
ListenSetup setup(/*max_connections=*/1);
207+
208+
auto client1 = std::make_unique<ClientSetup>(setup.listener.Connect());
209+
setup.WaitForConnectedCount(1);
210+
KJ_EXPECT(client1->client->add(1, 2) == 3);
211+
212+
auto client2 = std::make_unique<ClientSetup>(setup.listener.Connect());
213+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
214+
KJ_EXPECT(setup.ConnectedCount() == 1);
215+
216+
client1.reset();
217+
setup.WaitForDisconnectedCount(1);
218+
setup.WaitForConnectedCount(2);
219+
220+
KJ_EXPECT(client2->client->add(2, 3) == 5);
221+
222+
client2.reset();
223+
setup.WaitForDisconnectedCount(2);
224+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
225+
226+
auto client3 = std::make_unique<ClientSetup>(setup.listener.Connect());
227+
setup.WaitForConnectedCount(3);
228+
KJ_EXPECT(client3->client->add(3, 4) == 7);
229+
}
230+
168231
} // namespace
169232
} // namespace test
170233
} // namespace mp

0 commit comments

Comments
 (0)