Skip to content

Commit 84ed607

Browse files
committed
proxy: add local connection limit to ListenConnections
1 parent 3edbe8f commit 84ed607

5 files changed

Lines changed: 230 additions & 12 deletions

File tree

doc/usage.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ _libmultiprocess_ is a library and code generator that allows calling C++ class
1010

1111
The `*.capnp` data definition files are consumed by the _libmultiprocess_ code generator and each `X.capnp` file generates `X.capnp.c++`, `X.capnp.h`, `X.capnp.proxy-client.c++`, `X.capnp.proxy-server.c++`, `X.capnp.proxy-types.c++`, `X.capnp.proxy-types.h`, and `X.capnp.proxy.h` output files. The generated files include `mp::ProxyClient<Interface>` and `mp::ProxyServer<Interface>` class specializations for all the interfaces in the `.capnp` files. These allow methods on C++ objects in one process to be called from other processes over IPC sockets.
1212

13-
The `ProxyServer` objects help translate IPC requests from a socket to method calls on a local object. The `ProxyServer` objects are just used internally by the `mp::ServeStream(loop, socket, wrapped_object)` and `mp::ListenConnections(loop, socket, wrapped_object)` functions, and aren't exposed externally. The `ProxyClient` classes are exposed, and returned from the `mp::ConnectStream(loop, socket)` function and meant to be used directly. The classes implement methods described in `.capnp` definitions, and whenever any method is called, a request with the method arguments is sent over the associated IPC connection, and the corresponding `wrapped_object` method on the other end of the connection is called, with the `ProxyClient` method blocking until it returns and forwarding back any return value to the `ProxyClient` method caller.
13+
The `ProxyServer` objects help translate IPC requests from a socket to method calls on a local object. The `ProxyServer` objects are just used internally by the `mp::ServeStream(loop, socket, wrapped_object)` and `mp::ListenConnections(loop, socket, wrapped_object[, max_connections])` functions, and aren't exposed externally. The `ProxyClient` classes are exposed, and returned from the `mp::ConnectStream(loop, socket)` function and meant to be used directly. The classes implement methods described in `.capnp` definitions, and whenever any method is called, a request with the method arguments is sent over the associated IPC connection, and the corresponding `wrapped_object` method on the other end of the connection is called, with the `ProxyClient` method blocking until it returns and forwarding back any return value to the `ProxyClient` method caller.
1414

1515
## Example
1616

doc/versions.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,14 @@ Library versions are tracked with simple
77
Versioning policy is described in the [version.h](../include/mp/version.h)
88
include.
99

10-
## v10
10+
## v11
1111
- Current unstable version.
12+
- Adds an optional per-listener `max_connections` parameter to `ListenConnections()`
13+
so servers can stop accepting new connections when a local connection cap is reached,
14+
and resume accepting after existing connections disconnect.
15+
16+
## [v10.0](https://github.com/bitcoin-core/libmultiprocess/commits/v10.0)
17+
- Prior unstable version before local listener connection-limit support.
1218

1319
## [v9.0](https://github.com/bitcoin-core/libmultiprocess/commits/v9.0)
1420
- Fixes race conditions where worker thread could be used after destruction, where getParams() could be called after request cancel, and where m_on_cancel could be called after request finishes.

include/mp/proxy-io.h

Lines changed: 44 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@
1313
#include <capnp/rpc-twoparty.h>
1414

1515
#include <assert.h>
16+
#include <algorithm>
1617
#include <condition_variable>
18+
#include <cstdlib>
1719
#include <functional>
1820
#include <kj/function.h>
1921
#include <map>
@@ -820,8 +822,19 @@ std::unique_ptr<ProxyClient<InitInterface>> ConnectStream(EventLoop& loop, int f
820822
//! handles requests from the stream by calling the init object. Embed the
821823
//! ProxyServer in a Connection object that is stored and erased if
822824
//! disconnected. This should be called from the event loop thread.
825+
template <typename InitInterface, typename InitImpl, typename OnDisconnect>
826+
void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init, OnDisconnect&& on_disconnect);
827+
823828
template <typename InitInterface, typename InitImpl>
824829
void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init)
830+
{
831+
_Serve<InitInterface>(loop, kj::mv(stream), init, [] {});
832+
}
833+
834+
//! Internal helper supporting additional disconnect actions for listeners that
835+
//! need to resume accepting connections when capacity becomes available.
836+
template <typename InitInterface, typename InitImpl, typename OnDisconnect>
837+
void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init, OnDisconnect&& on_disconnect)
825838
{
826839
loop.m_incoming_connections.emplace_front(loop, kj::mv(stream), [&](Connection& connection) {
827840
// Disable deleter so proxy server object doesn't attempt to delete the
@@ -831,23 +844,45 @@ void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init
831844
});
832845
auto it = loop.m_incoming_connections.begin();
833846
MP_LOG(loop, Log::Info) << "IPC server: socket connected.";
834-
it->onDisconnect([&loop, it] {
847+
it->onDisconnect([&loop, it, on_disconnect = std::forward<OnDisconnect>(on_disconnect)]() mutable {
835848
MP_LOG(loop, Log::Info) << "IPC server: socket disconnected.";
836849
loop.m_incoming_connections.erase(it);
850+
on_disconnect();
837851
});
838852
}
839853

840854
//! Given connection receiver and an init object, handle incoming connections by
841855
//! calling _Serve, to create ProxyServer objects and forward requests to the
842856
//! init object.
857+
struct ListenState
858+
{
859+
explicit ListenState(kj::Own<kj::ConnectionReceiver>&& listener_, std::optional<size_t> max_connections_)
860+
: listener(kj::mv(listener_)), max_connections(max_connections_) {}
861+
862+
kj::Own<kj::ConnectionReceiver> listener;
863+
std::optional<size_t> max_connections;
864+
size_t active_connections{0};
865+
bool accept_pending{false};
866+
};
867+
843868
template <typename InitInterface, typename InitImpl>
844-
void _Listen(EventLoop& loop, kj::Own<kj::ConnectionReceiver>&& listener, InitImpl& init)
869+
void _Listen(EventLoop& loop, InitImpl& init, const std::shared_ptr<ListenState>& state)
845870
{
846-
auto* ptr = listener.get();
871+
if (state->accept_pending) return;
872+
if (state->max_connections && state->active_connections >= *state->max_connections) return;
873+
874+
state->accept_pending = true;
875+
auto* ptr = state->listener.get();
847876
loop.m_task_set->add(ptr->accept().then(
848-
[&loop, &init, listener = kj::mv(listener)](kj::Own<kj::AsyncIoStream>&& stream) mutable {
849-
_Serve<InitInterface>(loop, kj::mv(stream), init);
850-
_Listen<InitInterface>(loop, kj::mv(listener), init);
877+
[&loop, &init, state](kj::Own<kj::AsyncIoStream>&& stream) mutable {
878+
state->accept_pending = false;
879+
++state->active_connections;
880+
_Serve<InitInterface>(loop, kj::mv(stream), init, [&loop, &init, state] {
881+
assert(state->active_connections > 0);
882+
--state->active_connections;
883+
_Listen<InitInterface>(loop, init, state);
884+
});
885+
_Listen<InitInterface>(loop, init, state);
851886
}));
852887
}
853888

@@ -863,12 +898,12 @@ void ServeStream(EventLoop& loop, int fd, InitImpl& init)
863898
//! Given listening socket file descriptor and an init object, handle incoming
864899
//! connections and requests by calling methods on the Init object.
865900
template <typename InitInterface, typename InitImpl>
866-
void ListenConnections(EventLoop& loop, int fd, InitImpl& init)
901+
void ListenConnections(EventLoop& loop, int fd, InitImpl& init, std::optional<size_t> max_connections = std::nullopt)
867902
{
868903
loop.sync([&]() {
869-
_Listen<InitInterface>(loop,
904+
_Listen<InitInterface>(loop, init, std::make_shared<ListenState>(
870905
loop.m_io_context.lowLevelProvider->wrapListenSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
871-
init);
906+
max_connections));
872907
});
873908
}
874909

include/mp/version.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
//! pointing at the prior merge commit. The /doc/versions.md file should also be
2525
//! updated, noting any significant or incompatible changes made since the
2626
//! previous version.
27-
#define MP_MAJOR_VERSION 10
27+
#define MP_MAJOR_VERSION 11
2828

2929
//! Minor version number. Should be incremented in stable branches after
3030
//! backporting changes. The /doc/versions.md file should also be updated to

test/mp/test/test.cpp

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include <chrono>
1313
#include <condition_variable>
1414
#include <cstdint>
15+
#include <cstdlib>
1516
#include <cstring>
1617
#include <functional>
1718
#include <future>
@@ -27,19 +28,175 @@
2728
#include <mp/proxy-io.h>
2829
#include <mp/util.h>
2930
#include <mp/version.h>
31+
#include <mutex>
3032
#include <optional>
33+
#include <ratio> // IWYU pragma: keep
3134
#include <set>
35+
#include <sys/socket.h>
36+
#include <sys/un.h>
3237
#include <stdexcept>
3338
#include <string>
3439
#include <string_view>
3540
#include <thread>
3641
#include <type_traits>
42+
#include <unistd.h>
3743
#include <utility>
3844
#include <vector>
3945

4046
namespace mp {
4147
namespace test {
4248

49+
namespace {
50+
51+
class UnixListener
52+
{
53+
public:
54+
UnixListener()
55+
{
56+
char dir_template[] = "/tmp/mptest-listener-XXXXXX";
57+
char* dir = mkdtemp(dir_template);
58+
KJ_REQUIRE(dir != nullptr);
59+
m_dir = dir;
60+
m_path = m_dir + "/socket";
61+
62+
m_fd = socket(AF_UNIX, SOCK_STREAM, 0);
63+
KJ_REQUIRE(m_fd >= 0);
64+
65+
sockaddr_un addr{};
66+
addr.sun_family = AF_UNIX;
67+
KJ_REQUIRE(m_path.size() < sizeof(addr.sun_path));
68+
std::strncpy(addr.sun_path, m_path.c_str(), sizeof(addr.sun_path) - 1);
69+
KJ_REQUIRE(bind(m_fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) == 0);
70+
KJ_REQUIRE(listen(m_fd, SOMAXCONN) == 0);
71+
}
72+
73+
~UnixListener()
74+
{
75+
if (m_fd >= 0) close(m_fd);
76+
if (!m_path.empty()) unlink(m_path.c_str());
77+
if (!m_dir.empty()) rmdir(m_dir.c_str());
78+
}
79+
80+
int release()
81+
{
82+
int fd = m_fd;
83+
m_fd = -1;
84+
return fd;
85+
}
86+
87+
int Connect() const
88+
{
89+
int fd = socket(AF_UNIX, SOCK_STREAM, 0);
90+
KJ_REQUIRE(fd >= 0);
91+
92+
sockaddr_un addr{};
93+
addr.sun_family = AF_UNIX;
94+
KJ_REQUIRE(m_path.size() < sizeof(addr.sun_path));
95+
std::strncpy(addr.sun_path, m_path.c_str(), sizeof(addr.sun_path) - 1);
96+
KJ_REQUIRE(connect(fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) == 0);
97+
return fd;
98+
}
99+
100+
private:
101+
int m_fd{-1};
102+
std::string m_dir;
103+
std::string m_path;
104+
};
105+
106+
class ClientSetup
107+
{
108+
public:
109+
explicit ClientSetup(int fd)
110+
: thread([this, fd] {
111+
EventLoop loop("mptest-client", [](mp::LogMessage log) {
112+
if (log.level == mp::Log::Raise) throw std::runtime_error(log.message);
113+
});
114+
client_promise.set_value(ConnectStream<messages::FooInterface>(loop, fd));
115+
loop.loop();
116+
})
117+
{
118+
client = client_promise.get_future().get();
119+
}
120+
121+
~ClientSetup()
122+
{
123+
client.reset();
124+
thread.join();
125+
}
126+
127+
std::promise<std::unique_ptr<ProxyClient<messages::FooInterface>>> client_promise;
128+
std::unique_ptr<ProxyClient<messages::FooInterface>> client;
129+
std::thread thread;
130+
};
131+
132+
class ListenSetup
133+
{
134+
public:
135+
explicit ListenSetup(size_t max_connections)
136+
: thread([this, max_connections] {
137+
EventLoop loop("mptest-server", [this](mp::LogMessage log) {
138+
if (log.level == mp::Log::Raise) throw std::runtime_error(log.message);
139+
if (log.message.find("IPC server: socket connected.") != std::string::npos) {
140+
std::lock_guard<std::mutex> lock(counter_mutex);
141+
++connected_count;
142+
counter_cv.notify_all();
143+
} else if (log.message.find("IPC server: socket disconnected.") != std::string::npos) {
144+
std::lock_guard<std::mutex> lock(counter_mutex);
145+
++disconnected_count;
146+
counter_cv.notify_all();
147+
}
148+
});
149+
FooImplementation foo;
150+
ListenConnections<messages::FooInterface>(loop, listener.release(), foo, max_connections);
151+
ready_promise.set_value();
152+
loop.loop();
153+
})
154+
{
155+
ready_promise.get_future().get();
156+
}
157+
158+
~ListenSetup()
159+
{
160+
thread.join();
161+
}
162+
163+
size_t ConnectedCount()
164+
{
165+
std::lock_guard<std::mutex> lock(counter_mutex);
166+
return connected_count;
167+
}
168+
169+
void WaitForConnectedCount(size_t expected_count)
170+
{
171+
std::unique_lock<std::mutex> lock(counter_mutex);
172+
const auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5);
173+
bool matched = counter_cv.wait_until(lock, deadline, [&] {
174+
return connected_count >= expected_count;
175+
});
176+
KJ_REQUIRE(matched);
177+
}
178+
179+
void WaitForDisconnectedCount(size_t expected_count)
180+
{
181+
std::unique_lock<std::mutex> lock(counter_mutex);
182+
const auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5);
183+
bool matched = counter_cv.wait_until(lock, deadline, [&] {
184+
return disconnected_count >= expected_count;
185+
});
186+
KJ_REQUIRE(matched);
187+
}
188+
189+
UnixListener listener;
190+
std::promise<void> ready_promise;
191+
std::thread thread;
192+
std::mutex counter_mutex;
193+
std::condition_variable counter_cv;
194+
size_t connected_count{0};
195+
size_t disconnected_count{0};
196+
};
197+
198+
} // namespace
199+
43200
/** Check version.h header values */
44201
constexpr auto kMP_MAJOR_VERSION{MP_MAJOR_VERSION};
45202
constexpr auto kMP_MINOR_VERSION{MP_MINOR_VERSION};
@@ -481,5 +638,25 @@ KJ_TEST("Make simultaneous IPC calls on single remote thread")
481638
KJ_EXPECT(expected == 400);
482639
}
483640

641+
KJ_TEST("ListenConnections enforces a local connection limit")
642+
{
643+
ListenSetup setup(/*max_connections=*/1);
644+
645+
auto client1 = std::make_unique<ClientSetup>(setup.listener.Connect());
646+
setup.WaitForConnectedCount(1);
647+
KJ_EXPECT(client1->client->add(1, 2) == 3);
648+
649+
auto client2 = std::make_unique<ClientSetup>(setup.listener.Connect());
650+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
651+
KJ_EXPECT(setup.ConnectedCount() == 1);
652+
653+
client1.reset();
654+
setup.WaitForDisconnectedCount(1);
655+
setup.WaitForConnectedCount(2);
656+
657+
KJ_EXPECT(client2->client->add(2, 3) == 5);
658+
client2.reset();
659+
}
660+
484661
} // namespace test
485662
} // namespace mp

0 commit comments

Comments
 (0)