Skip to content

Commit 458cd5a

Browse files
committed
proxy: add local connection limit to ListenConnections
Add an optional max_connections parameter to ListenConnections() so a listener can stop accepting new connections after reaching a local connection cap and resume accepting after an existing connection disconnects. Implement the limit with listener-local state tracking the listening socket, maximum number of active connections, and whether an async accept() has already been posted. This keeps the limit scoped to the individual listener instead of introducing global EventLoop state. Extend the dedicated listener test coverage to verify that with max_connections=1 the first client is accepted normally, a second client is not accepted while the first remains connected, and the second client is accepted after the first disconnects.
1 parent 79b2e13 commit 458cd5a

5 files changed

Lines changed: 156 additions & 20 deletions

File tree

doc/usage.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ _libmultiprocess_ is a library and code generator that allows calling C++ class
1010

1111
The `*.capnp` data definition files are consumed by the _libmultiprocess_ code generator and each `X.capnp` file generates `X.capnp.c++`, `X.capnp.h`, `X.capnp.proxy-client.c++`, `X.capnp.proxy-server.c++`, `X.capnp.proxy-types.c++`, `X.capnp.proxy-types.h`, and `X.capnp.proxy.h` output files. The generated files include `mp::ProxyClient<Interface>` and `mp::ProxyServer<Interface>` class specializations for all the interfaces in the `.capnp` files. These allow methods on C++ objects in one process to be called from other processes over IPC sockets.
1212

13-
The `ProxyServer` objects help translate IPC requests from a socket to method calls on a local object. The `ProxyServer` objects are just used internally by the `mp::ServeStream(loop, socket, wrapped_object)` and `mp::ListenConnections(loop, socket, wrapped_object)` functions, and aren't exposed externally. The `ProxyClient` classes are exposed, and returned from the `mp::ConnectStream(loop, socket)` function and meant to be used directly. The classes implement methods described in `.capnp` definitions, and whenever any method is called, a request with the method arguments is sent over the associated IPC connection, and the corresponding `wrapped_object` method on the other end of the connection is called, with the `ProxyClient` method blocking until it returns and forwarding back any return value to the `ProxyClient` method caller.
13+
The `ProxyServer` objects help translate IPC requests from a socket to method calls on a local object. The `ProxyServer` objects are just used internally by the `mp::ServeStream(loop, socket, wrapped_object)` and `mp::ListenConnections(loop, socket, wrapped_object[, max_connections])` functions, and aren't exposed externally. The `ProxyClient` classes are exposed, and returned from the `mp::ConnectStream(loop, socket)` function and meant to be used directly. The classes implement methods described in `.capnp` definitions, and whenever any method is called, a request with the method arguments is sent over the associated IPC connection, and the corresponding `wrapped_object` method on the other end of the connection is called, with the `ProxyClient` method blocking until it returns and forwarding back any return value to the `ProxyClient` method caller.
1414

1515
## Example
1616

doc/versions.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ include.
99

1010
## v12
1111
- Current unstable version.
12+
- Adds an optional per-listener `max_connections` parameter to `ListenConnections()`
13+
so servers can stop accepting new connections when a local connection cap is reached,
14+
and resume accepting after existing connections disconnect.
1215

1316
## [v11.0](https://github.com/bitcoin-core/libmultiprocess/commits/v11.0)
1417
- Tolerates unexpected exceptions in event loop `post()` callbacks.

include/mp/proxy-io.h

Lines changed: 67 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#ifndef MP_PROXY_IO_H
66
#define MP_PROXY_IO_H
77

8+
#include <kj/common.h>
89
#include <mp/proxy.h>
910
#include <mp/util.h>
1011

@@ -13,7 +14,9 @@
1314
#include <capnp/rpc-twoparty.h>
1415

1516
#include <assert.h>
17+
#include <algorithm>
1618
#include <condition_variable>
19+
#include <cstdlib>
1720
#include <functional>
1821
#include <kj/function.h>
1922
#include <map>
@@ -834,8 +837,8 @@ std::unique_ptr<ProxyClient<InitInterface>> ConnectStream(EventLoop& loop, int f
834837
//! handles requests from the stream by calling the init object. Embed the
835838
//! ProxyServer in a Connection object that is stored and erased if
836839
//! disconnected. This should be called from the event loop thread.
837-
template <typename InitInterface, typename InitImpl>
838-
void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init)
840+
template <typename InitInterface, typename InitImpl, typename OnDisconnect>
841+
void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init, OnDisconnect&& on_disconnect)
839842
{
840843
loop.m_incoming_connections.emplace_front(loop, kj::mv(stream), [&](Connection& connection) {
841844
// Disable deleter so proxy server object doesn't attempt to delete the
@@ -846,23 +849,69 @@ void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init
846849
auto it = loop.m_incoming_connections.begin();
847850
MP_LOG(loop, Log::Info) << "IPC server: socket connected.";
848851
if (loop.testing_hook_connected) loop.testing_hook_connected();
849-
it->onDisconnect([&loop, it] {
852+
it->onDisconnect([&loop, it, on_disconnect = std::forward<OnDisconnect>(on_disconnect)]() mutable {
850853
MP_LOG(loop, Log::Info) << "IPC server: socket disconnected.";
851854
loop.m_incoming_connections.erase(it);
855+
on_disconnect();
852856
});
853857
}
854858

855-
//! Given connection receiver and an init object, handle incoming connections by
856-
//! calling _Serve, to create ProxyServer objects and forward requests to the
857-
//! init object.
859+
struct ListenState
860+
{
861+
explicit ListenState(kj::Own<kj::ConnectionReceiver>&& listener_, std::optional<size_t> max_connections_)
862+
: listener(kj::mv(listener_)), max_connections(max_connections_) {}
863+
864+
kj::Own<kj::ConnectionReceiver> listener;
865+
std::optional<size_t> max_connections;
866+
size_t active_connections{0};
867+
};
868+
858869
template <typename InitInterface, typename InitImpl>
859-
void _Listen(EventLoop& loop, kj::Own<kj::ConnectionReceiver>&& listener, InitImpl& init)
870+
void _Listen(EventLoop& loop, InitImpl& init, const std::shared_ptr<ListenState>& state);
871+
872+
inline bool _ListenAtCapacity(const ListenState& state)
860873
{
861-
auto* ptr = listener.get();
874+
return state.max_connections && state.active_connections >= *state.max_connections;
875+
}
876+
877+
//! Return an EventLoopRef if this is a capped listener, to keep the event loop
878+
//! alive while waiting for connections.
879+
//!
880+
//! For uncapped listeners, we do not hold an EventLoopRef because the event
881+
//! loop should automatically exit when the last active client disconnects (when
882+
//! m_num_clients drops to 0).
883+
//!
884+
//! For capped listeners, we stop calling ptr->accept() when at capacity. Since
885+
//! no accept task remains pending in the task set, we must hold an EventLoopRef
886+
//! while listening to keep the event loop alive until the limit is reached.
887+
inline std::unique_ptr<EventLoopRef> _MakeCappedListenerRef(EventLoop& loop, const ListenState& state)
888+
{
889+
return state.max_connections && *state.max_connections > 0 ? std::make_unique<EventLoopRef>(loop) : nullptr;
890+
}
891+
892+
//! Given init object and a state object containing a connection receiver, handle
893+
//! incoming connections by calling _Serve, to create ProxyServer objects and
894+
//! forward requests to the init object.
895+
template <typename InitInterface, typename InitImpl>
896+
void _Listen(EventLoop& loop, InitImpl& init, const std::shared_ptr<ListenState>& state)
897+
{
898+
if (_ListenAtCapacity(*state)) return;
899+
900+
auto* ptr = state->listener.get();
901+
// Bind the EventLoopRef lifetime to the pending accept task. If the listener
902+
// reaches capacity, _Listen returns early, destroying the accept_ref and
903+
// letting the event loop client count decrement.
904+
auto accept_ref{_MakeCappedListenerRef(loop, *state)};
862905
loop.m_task_set->add(ptr->accept().then(
863-
[&loop, &init, listener = kj::mv(listener)](kj::Own<kj::AsyncIoStream>&& stream) mutable {
864-
_Serve<InitInterface>(loop, kj::mv(stream), init);
865-
_Listen<InitInterface>(loop, kj::mv(listener), init);
906+
[&loop, &init, state, accept_ref = std::move(accept_ref)](kj::Own<kj::AsyncIoStream>&& stream) mutable {
907+
++state->active_connections;
908+
_Serve<InitInterface>(loop, kj::mv(stream), init, [&loop, &init, state] {
909+
const bool resume_accept{_ListenAtCapacity(*state)};
910+
assert(state->active_connections > 0);
911+
--state->active_connections;
912+
if (resume_accept) _Listen<InitInterface>(loop, init, state);
913+
});
914+
_Listen<InitInterface>(loop, init, state);
866915
}));
867916
}
868917

@@ -872,18 +921,21 @@ template <typename InitInterface, typename InitImpl>
872921
void ServeStream(EventLoop& loop, int fd, InitImpl& init)
873922
{
874923
_Serve<InitInterface>(
875-
loop, loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP), init);
924+
loop,
925+
loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
926+
init,
927+
[] {});
876928
}
877929

878930
//! Given listening socket file descriptor and an init object, handle incoming
879931
//! connections and requests by calling methods on the Init object.
880932
template <typename InitInterface, typename InitImpl>
881-
void ListenConnections(EventLoop& loop, int fd, InitImpl& init)
933+
void ListenConnections(EventLoop& loop, int fd, InitImpl& init, std::optional<size_t> max_connections = std::nullopt)
882934
{
883935
loop.sync([&]() {
884-
_Listen<InitInterface>(loop,
936+
_Listen<InitInterface>(loop, init, std::make_shared<ListenState>(
885937
loop.m_io_context.lowLevelProvider->wrapListenSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
886-
init);
938+
max_connections));
887939
});
888940
}
889941

test/mp/test/listen_tests.cpp

Lines changed: 84 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include <memory>
1919
#include <mp/proxy-io.h>
2020
#include <mutex>
21+
#include <optional>
2122
#include <ratio> // IWYU pragma: keep
2223
#include <stdexcept>
2324
#include <string>
@@ -118,19 +119,28 @@ class ClientSetup
118119
class ListenSetup
119120
{
120121
public:
121-
ListenSetup()
122-
: thread([this] {
122+
explicit ListenSetup(std::optional<size_t> max_connections = std::nullopt)
123+
: capped_listener(max_connections.has_value()), thread([this, max_connections] {
123124
EventLoop loop("mptest-server", [this](mp::LogMessage log) {
124125
KJ_LOG(INFO, log.level, log.message);
125126
if (log.level == mp::Log::Raise) throw std::runtime_error(log.message);
127+
if (log.message.find("IPC server: socket disconnected.") != std::string::npos) {
128+
std::lock_guard<std::mutex> lock(counter_mutex);
129+
++disconnected_count;
130+
counter_cv.notify_all();
131+
}
126132
});
127133
loop.testing_hook_connected = [&] {
128134
std::lock_guard<std::mutex> lock(counter_mutex);
129135
++connected_count;
130136
counter_cv.notify_all();
131137
};
138+
{
139+
std::lock_guard<std::mutex> lock(counter_mutex);
140+
event_loop = &loop;
141+
}
132142
FooImplementation foo;
133-
ListenConnections<messages::FooInterface>(loop, listener.release(), foo);
143+
ListenConnections<messages::FooInterface>(loop, listener.release(), foo, max_connections);
134144
ready_promise.set_value();
135145
loop.loop();
136146
})
@@ -140,9 +150,23 @@ class ListenSetup
140150

141151
~ListenSetup()
142152
{
153+
if (capped_listener) {
154+
EventLoop* loop;
155+
{
156+
std::lock_guard<std::mutex> lock(counter_mutex);
157+
loop = event_loop;
158+
}
159+
if (loop) loop->sync([&] { loop->m_task_set.reset(); });
160+
}
143161
thread.join();
144162
}
145163

164+
size_t ConnectedCount()
165+
{
166+
std::lock_guard<std::mutex> lock(counter_mutex);
167+
return connected_count;
168+
}
169+
146170
void WaitForConnectedCount(size_t expected_count)
147171
{
148172
std::unique_lock<std::mutex> lock(counter_mutex);
@@ -153,15 +177,27 @@ class ListenSetup
153177
KJ_REQUIRE(matched);
154178
}
155179

180+
void WaitForDisconnectedCount(size_t expected_count)
181+
{
182+
std::unique_lock<std::mutex> lock(counter_mutex);
183+
const auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5);
184+
const bool matched = counter_cv.wait_until(lock, deadline, [&] {
185+
return disconnected_count >= expected_count;
186+
});
187+
KJ_REQUIRE(matched);
188+
}
189+
156190
UnixListener listener;
157191
std::promise<void> ready_promise;
192+
bool capped_listener{false};
158193
std::mutex counter_mutex;
159194
std::condition_variable counter_cv;
195+
EventLoop* event_loop{nullptr};
160196
size_t connected_count{0};
197+
size_t disconnected_count{0};
161198
//! Thread variable should be after other struct members so the thread does
162199
//! not start until the other members are initialized.
163200
std::thread thread;
164-
165201
};
166202

167203
KJ_TEST("ListenConnections accepts incoming connections")
@@ -173,6 +209,50 @@ KJ_TEST("ListenConnections accepts incoming connections")
173209
KJ_EXPECT(client->client->add(1, 2) == 3);
174210
}
175211

212+
KJ_TEST("ListenConnections enforces a local connection limit")
213+
{
214+
ListenSetup setup(/*max_connections=*/1);
215+
216+
auto client1 = std::make_unique<ClientSetup>(setup.listener.Connect());
217+
setup.WaitForConnectedCount(1);
218+
KJ_EXPECT(client1->client->add(1, 2) == 3);
219+
220+
auto client2 = std::make_unique<ClientSetup>(setup.listener.Connect());
221+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
222+
KJ_EXPECT(setup.ConnectedCount() == 1);
223+
224+
client1.reset();
225+
setup.WaitForDisconnectedCount(1);
226+
setup.WaitForConnectedCount(2);
227+
228+
KJ_EXPECT(client2->client->add(2, 3) == 5);
229+
230+
client2.reset();
231+
setup.WaitForDisconnectedCount(2);
232+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
233+
234+
auto client3 = std::make_unique<ClientSetup>(setup.listener.Connect());
235+
setup.WaitForConnectedCount(3);
236+
KJ_EXPECT(client3->client->add(3, 4) == 7);
237+
}
238+
239+
KJ_TEST("ListenConnections keeps capped listeners alive before reaching the limit")
240+
{
241+
ListenSetup setup(/*max_connections=*/2);
242+
243+
auto client1 = std::make_unique<ClientSetup>(setup.listener.Connect());
244+
setup.WaitForConnectedCount(1);
245+
KJ_EXPECT(client1->client->add(1, 2) == 3);
246+
247+
client1.reset();
248+
setup.WaitForDisconnectedCount(1);
249+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
250+
251+
auto client2 = std::make_unique<ClientSetup>(setup.listener.Connect());
252+
setup.WaitForConnectedCount(2);
253+
KJ_EXPECT(client2->client->add(2, 3) == 5);
254+
}
255+
176256
} // namespace
177257
} // namespace test
178258
} // namespace mp

test/mp/test/test.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include <chrono>
1313
#include <condition_variable>
1414
#include <cstdint>
15+
#include <cstring>
1516
#include <functional>
1617
#include <future>
1718
#include <kj/async.h>

0 commit comments

Comments
 (0)