Skip to content

Commit 3ef8e5c

Browse files
committed
proxy: add local connection limit to ListenConnections
1 parent 3edbe8f commit 3ef8e5c

5 files changed

Lines changed: 229 additions & 12 deletions

File tree

doc/usage.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ _libmultiprocess_ is a library and code generator that allows calling C++ class
1010

1111
The `*.capnp` data definition files are consumed by the _libmultiprocess_ code generator and each `X.capnp` file generates `X.capnp.c++`, `X.capnp.h`, `X.capnp.proxy-client.c++`, `X.capnp.proxy-server.c++`, `X.capnp.proxy-types.c++`, `X.capnp.proxy-types.h`, and `X.capnp.proxy.h` output files. The generated files include `mp::ProxyClient<Interface>` and `mp::ProxyServer<Interface>` class specializations for all the interfaces in the `.capnp` files. These allow methods on C++ objects in one process to be called from other processes over IPC sockets.
1212

13-
The `ProxyServer` objects help translate IPC requests from a socket to method calls on a local object. The `ProxyServer` objects are just used internally by the `mp::ServeStream(loop, socket, wrapped_object)` and `mp::ListenConnections(loop, socket, wrapped_object)` functions, and aren't exposed externally. The `ProxyClient` classes are exposed, and returned from the `mp::ConnectStream(loop, socket)` function and meant to be used directly. The classes implement methods described in `.capnp` definitions, and whenever any method is called, a request with the method arguments is sent over the associated IPC connection, and the corresponding `wrapped_object` method on the other end of the connection is called, with the `ProxyClient` method blocking until it returns and forwarding back any return value to the `ProxyClient` method caller.
13+
The `ProxyServer` objects help translate IPC requests from a socket to method calls on a local object. The `ProxyServer` objects are just used internally by the `mp::ServeStream(loop, socket, wrapped_object)` and `mp::ListenConnections(loop, socket, wrapped_object[, max_connections])` functions, and aren't exposed externally. The `ProxyClient` classes are exposed, and returned from the `mp::ConnectStream(loop, socket)` function and meant to be used directly. The classes implement methods described in `.capnp` definitions, and whenever any method is called, a request with the method arguments is sent over the associated IPC connection, and the corresponding `wrapped_object` method on the other end of the connection is called, with the `ProxyClient` method blocking until it returns and forwarding back any return value to the `ProxyClient` method caller.
1414

1515
## Example
1616

doc/versions.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,14 @@ Library versions are tracked with simple
77
Versioning policy is described in the [version.h](../include/mp/version.h)
88
include.
99

10-
## v10
10+
## v11
1111
- Current unstable version.
12+
- Adds an optional per-listener `max_connections` parameter to `ListenConnections()`
13+
so servers can stop accepting new connections when a local connection cap is reached,
14+
and resume accepting after existing connections disconnect.
15+
16+
## [v10.0](https://github.com/bitcoin-core/libmultiprocess/commits/v10.0)
17+
- Prior unstable version before local listener connection-limit support.
1218

1319
## [v9.0](https://github.com/bitcoin-core/libmultiprocess/commits/v9.0)
1420
- Fixes race conditions where worker thread could be used after destruction, where getParams() could be called after request cancel, and where m_on_cancel could be called after request finishes.

include/mp/proxy-io.h

Lines changed: 44 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@
1313
#include <capnp/rpc-twoparty.h>
1414

1515
#include <assert.h>
16+
#include <algorithm>
1617
#include <condition_variable>
18+
#include <cstdlib>
1719
#include <functional>
1820
#include <kj/function.h>
1921
#include <map>
@@ -820,8 +822,19 @@ std::unique_ptr<ProxyClient<InitInterface>> ConnectStream(EventLoop& loop, int f
820822
//! handles requests from the stream by calling the init object. Embed the
821823
//! ProxyServer in a Connection object that is stored and erased if
822824
//! disconnected. This should be called from the event loop thread.
825+
template <typename InitInterface, typename InitImpl, typename OnDisconnect>
826+
void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init, OnDisconnect&& on_disconnect);
827+
823828
template <typename InitInterface, typename InitImpl>
824829
void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init)
830+
{
831+
_Serve<InitInterface>(loop, kj::mv(stream), init, [] {});
832+
}
833+
834+
//! Internal helper supporting additional disconnect actions for listeners that
835+
//! need to resume accepting connections when capacity becomes available.
836+
template <typename InitInterface, typename InitImpl, typename OnDisconnect>
837+
void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init, OnDisconnect&& on_disconnect)
825838
{
826839
loop.m_incoming_connections.emplace_front(loop, kj::mv(stream), [&](Connection& connection) {
827840
// Disable deleter so proxy server object doesn't attempt to delete the
@@ -831,23 +844,45 @@ void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init
831844
});
832845
auto it = loop.m_incoming_connections.begin();
833846
MP_LOG(loop, Log::Info) << "IPC server: socket connected.";
834-
it->onDisconnect([&loop, it] {
847+
it->onDisconnect([&loop, it, on_disconnect = std::forward<OnDisconnect>(on_disconnect)]() mutable {
835848
MP_LOG(loop, Log::Info) << "IPC server: socket disconnected.";
836849
loop.m_incoming_connections.erase(it);
850+
on_disconnect();
837851
});
838852
}
839853

840854
//! Given connection receiver and an init object, handle incoming connections by
841855
//! calling _Serve, to create ProxyServer objects and forward requests to the
842856
//! init object.
857+
struct ListenState
858+
{
859+
explicit ListenState(kj::Own<kj::ConnectionReceiver>&& listener_, std::optional<size_t> max_connections_)
860+
: listener(kj::mv(listener_)), max_connections(max_connections_) {}
861+
862+
kj::Own<kj::ConnectionReceiver> listener;
863+
std::optional<size_t> max_connections;
864+
size_t active_connections{0};
865+
bool accept_pending{false};
866+
};
867+
843868
template <typename InitInterface, typename InitImpl>
844-
void _Listen(EventLoop& loop, kj::Own<kj::ConnectionReceiver>&& listener, InitImpl& init)
869+
void _Listen(EventLoop& loop, InitImpl& init, const std::shared_ptr<ListenState>& state)
845870
{
846-
auto* ptr = listener.get();
871+
if (state->accept_pending) return;
872+
if (state->max_connections && state->active_connections >= *state->max_connections) return;
873+
874+
state->accept_pending = true;
875+
auto* ptr = state->listener.get();
847876
loop.m_task_set->add(ptr->accept().then(
848-
[&loop, &init, listener = kj::mv(listener)](kj::Own<kj::AsyncIoStream>&& stream) mutable {
849-
_Serve<InitInterface>(loop, kj::mv(stream), init);
850-
_Listen<InitInterface>(loop, kj::mv(listener), init);
877+
[&loop, &init, state](kj::Own<kj::AsyncIoStream>&& stream) mutable {
878+
state->accept_pending = false;
879+
++state->active_connections;
880+
_Serve<InitInterface>(loop, kj::mv(stream), init, [&loop, &init, state] {
881+
assert(state->active_connections > 0);
882+
--state->active_connections;
883+
_Listen<InitInterface>(loop, init, state);
884+
});
885+
_Listen<InitInterface>(loop, init, state);
851886
}));
852887
}
853888

@@ -863,12 +898,12 @@ void ServeStream(EventLoop& loop, int fd, InitImpl& init)
863898
//! Given listening socket file descriptor and an init object, handle incoming
864899
//! connections and requests by calling methods on the Init object.
865900
template <typename InitInterface, typename InitImpl>
866-
void ListenConnections(EventLoop& loop, int fd, InitImpl& init)
901+
void ListenConnections(EventLoop& loop, int fd, InitImpl& init, std::optional<size_t> max_connections = std::nullopt)
867902
{
868903
loop.sync([&]() {
869-
_Listen<InitInterface>(loop,
904+
_Listen<InitInterface>(loop, init, std::make_shared<ListenState>(
870905
loop.m_io_context.lowLevelProvider->wrapListenSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
871-
init);
906+
max_connections));
872907
});
873908
}
874909

include/mp/version.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
//! pointing at the prior merge commit. The /doc/versions.md file should also be
2525
//! updated, noting any significant or incompatible changes made since the
2626
//! previous version.
27-
#define MP_MAJOR_VERSION 10
27+
#define MP_MAJOR_VERSION 11
2828

2929
//! Minor version number. Should be incremented in stable branches after
3030
//! backporting changes. The /doc/versions.md file should also be updated to

test/mp/test/test.cpp

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include <chrono>
1313
#include <condition_variable>
1414
#include <cstdint>
15+
#include <cstdlib>
1516
#include <cstring>
1617
#include <functional>
1718
#include <future>
@@ -27,19 +28,174 @@
2728
#include <mp/proxy-io.h>
2829
#include <mp/util.h>
2930
#include <mp/version.h>
31+
#include <mutex>
3032
#include <optional>
3133
#include <set>
34+
#include <sys/socket.h>
35+
#include <sys/un.h>
3236
#include <stdexcept>
3337
#include <string>
3438
#include <string_view>
3539
#include <thread>
3640
#include <type_traits>
41+
#include <unistd.h>
3742
#include <utility>
3843
#include <vector>
3944

4045
namespace mp {
4146
namespace test {
4247

48+
namespace {
49+
50+
class UnixListener
51+
{
52+
public:
53+
UnixListener()
54+
{
55+
char dir_template[] = "/tmp/mptest-listener-XXXXXX";
56+
char* dir = mkdtemp(dir_template);
57+
KJ_REQUIRE(dir != nullptr);
58+
m_dir = dir;
59+
m_path = m_dir + "/socket";
60+
61+
m_fd = socket(AF_UNIX, SOCK_STREAM, 0);
62+
KJ_REQUIRE(m_fd >= 0);
63+
64+
sockaddr_un addr{};
65+
addr.sun_family = AF_UNIX;
66+
KJ_REQUIRE(m_path.size() < sizeof(addr.sun_path));
67+
std::strncpy(addr.sun_path, m_path.c_str(), sizeof(addr.sun_path) - 1);
68+
KJ_REQUIRE(bind(m_fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) == 0);
69+
KJ_REQUIRE(listen(m_fd, SOMAXCONN) == 0);
70+
}
71+
72+
~UnixListener()
73+
{
74+
if (m_fd >= 0) close(m_fd);
75+
if (!m_path.empty()) unlink(m_path.c_str());
76+
if (!m_dir.empty()) rmdir(m_dir.c_str());
77+
}
78+
79+
int release()
80+
{
81+
int fd = m_fd;
82+
m_fd = -1;
83+
return fd;
84+
}
85+
86+
int Connect() const
87+
{
88+
int fd = socket(AF_UNIX, SOCK_STREAM, 0);
89+
KJ_REQUIRE(fd >= 0);
90+
91+
sockaddr_un addr{};
92+
addr.sun_family = AF_UNIX;
93+
KJ_REQUIRE(m_path.size() < sizeof(addr.sun_path));
94+
std::strncpy(addr.sun_path, m_path.c_str(), sizeof(addr.sun_path) - 1);
95+
KJ_REQUIRE(connect(fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) == 0);
96+
return fd;
97+
}
98+
99+
private:
100+
int m_fd{-1};
101+
std::string m_dir;
102+
std::string m_path;
103+
};
104+
105+
class ClientSetup
106+
{
107+
public:
108+
explicit ClientSetup(int fd)
109+
: thread([this, fd] {
110+
EventLoop loop("mptest-client", [](mp::LogMessage log) {
111+
if (log.level == mp::Log::Raise) throw std::runtime_error(log.message);
112+
});
113+
client_promise.set_value(ConnectStream<messages::FooInterface>(loop, fd));
114+
loop.loop();
115+
})
116+
{
117+
client = client_promise.get_future().get();
118+
}
119+
120+
~ClientSetup()
121+
{
122+
client.reset();
123+
thread.join();
124+
}
125+
126+
std::promise<std::unique_ptr<ProxyClient<messages::FooInterface>>> client_promise;
127+
std::unique_ptr<ProxyClient<messages::FooInterface>> client;
128+
std::thread thread;
129+
};
130+
131+
class ListenSetup
132+
{
133+
public:
134+
explicit ListenSetup(size_t max_connections)
135+
: thread([this, max_connections] {
136+
EventLoop loop("mptest-server", [this](mp::LogMessage log) {
137+
if (log.level == mp::Log::Raise) throw std::runtime_error(log.message);
138+
if (log.message.find("IPC server: socket connected.") != std::string::npos) {
139+
std::lock_guard<std::mutex> lock(counter_mutex);
140+
++connected_count;
141+
counter_cv.notify_all();
142+
} else if (log.message.find("IPC server: socket disconnected.") != std::string::npos) {
143+
std::lock_guard<std::mutex> lock(counter_mutex);
144+
++disconnected_count;
145+
counter_cv.notify_all();
146+
}
147+
});
148+
FooImplementation foo;
149+
ListenConnections<messages::FooInterface>(loop, listener.release(), foo, max_connections);
150+
ready_promise.set_value();
151+
loop.loop();
152+
})
153+
{
154+
ready_promise.get_future().get();
155+
}
156+
157+
~ListenSetup()
158+
{
159+
thread.join();
160+
}
161+
162+
size_t ConnectedCount()
163+
{
164+
std::lock_guard<std::mutex> lock(counter_mutex);
165+
return connected_count;
166+
}
167+
168+
void WaitForConnectedCount(size_t expected_count)
169+
{
170+
std::unique_lock<std::mutex> lock(counter_mutex);
171+
const auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5);
172+
bool matched = counter_cv.wait_until(lock, deadline, [&] {
173+
return connected_count >= expected_count;
174+
});
175+
KJ_REQUIRE(matched);
176+
}
177+
178+
void WaitForDisconnectedCount(size_t expected_count)
179+
{
180+
std::unique_lock<std::mutex> lock(counter_mutex);
181+
const auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5);
182+
bool matched = counter_cv.wait_until(lock, deadline, [&] {
183+
return disconnected_count >= expected_count;
184+
});
185+
KJ_REQUIRE(matched);
186+
}
187+
188+
UnixListener listener;
189+
std::promise<void> ready_promise;
190+
std::thread thread;
191+
std::mutex counter_mutex;
192+
std::condition_variable counter_cv;
193+
size_t connected_count{0};
194+
size_t disconnected_count{0};
195+
};
196+
197+
} // namespace
198+
43199
/** Check version.h header values */
44200
constexpr auto kMP_MAJOR_VERSION{MP_MAJOR_VERSION};
45201
constexpr auto kMP_MINOR_VERSION{MP_MINOR_VERSION};
@@ -481,5 +637,25 @@ KJ_TEST("Make simultaneous IPC calls on single remote thread")
481637
KJ_EXPECT(expected == 400);
482638
}
483639

640+
KJ_TEST("ListenConnections enforces a local connection limit")
641+
{
642+
ListenSetup setup(/*max_connections=*/1);
643+
644+
auto client1 = std::make_unique<ClientSetup>(setup.listener.Connect());
645+
setup.WaitForConnectedCount(1);
646+
KJ_EXPECT(client1->client->add(1, 2) == 3);
647+
648+
auto client2 = std::make_unique<ClientSetup>(setup.listener.Connect());
649+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
650+
KJ_EXPECT(setup.ConnectedCount() == 1);
651+
652+
client1.reset();
653+
setup.WaitForDisconnectedCount(1);
654+
setup.WaitForConnectedCount(2);
655+
656+
KJ_EXPECT(client2->client->add(2, 3) == 5);
657+
client2.reset();
658+
}
659+
484660
} // namespace test
485661
} // namespace mp

0 commit comments

Comments
 (0)