proxy: add local connection limit to ListenConnections#269
Conversation
|
The following sections might be updated with supplementary metadata relevant to reviewers and maintainers. ReviewsSee the guideline for information on the review process.
If your review is incorrectly listed, please copy-paste ConflictsReviewers, this pull request conflicts with the following ones:
If you consider this pull request important, please also help to review the conflicting pull requests. Ideally, start with the one that should be merged first. |
3ef8e5c to
84ed607
Compare
ryanofsky
left a comment
There was a problem hiding this comment.
Approach ACK 84ed607. Implementation of local connection limit here looks almost exactly like I would have expected.
I do think it would be helpful to see a draft PR in the bitcoin repo using this API (it should be fine to make libmultiprocess changes there and let the lint CI job fail so you don't need to mess with subtrees) because the approach in bitcoin/bitcoin#34978 of adding a global connection limit option isn't exactly compatible with the implementation here of implementing a per-address connection limit.
I'd personally prefer using per-address limits over introducing a global limit but both approaches seem reasonable
| void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init, OnDisconnect&& on_disconnect); | ||
|
|
||
| template <typename InitInterface, typename InitImpl> | ||
| void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init) |
There was a problem hiding this comment.
In commit "proxy: add local connection limit to ListenConnections" (84ed607)
Since _Serve is an internal function only called in a few places, I think it would be less confusing if it was not overloaded, and just always required an on_disconnect parameter, even if it requires a little extra verbosity at some call sites.
There was a problem hiding this comment.
Done. Simplified this now so _Serve always takes an on_disconnect callback instead of overloading it. Since it is only used internally
| kj::Own<kj::ConnectionReceiver> listener; | ||
| std::optional<size_t> max_connections; | ||
| size_t active_connections{0}; | ||
| bool accept_pending{false}; |
There was a problem hiding this comment.
In commit "proxy: add local connection limit to ListenConnections" (84ed607)
Curious if this accept_pending variable is actually necessary or if code could just compare active_connections and max_connections when deciding whether to listen. Would prefer to avoid redundancy in the state representation if possible even if makes individual checks more a little more verbose.
If accept_pending really is necessary would be a good to have a short comment about why.
There was a problem hiding this comment.
I kept accept_pending, but added a short comment explaining why it is needed here. active_connections only counts accepted connections, so without a separate flag nested _Listen() calls could post multiple pending accept() calls before active_connections is incremented.
There was a problem hiding this comment.
FWIW I tested the code without accept_pending field and hence its check at _Listen removed, the tests passed with no issues. Then, asked Claude to generate a test that exercises it and produced this:
diff --git a/test/mp/test/listen_tests.cpp b/test/mp/test/listen_tests.cpp
index b367938..78298c7 100644
--- a/test/mp/test/listen_tests.cpp
+++ b/test/mp/test/listen_tests.cpp
@@ -24,8 +24,10 @@
#include <string>
#include <sys/socket.h>
#include <sys/un.h>
+#include <kj/exception.h>
#include <thread>
#include <unistd.h>
+#include <vector>
namespace mp {
namespace test {
@@ -112,11 +114,39 @@ public:
std::thread thread;
};
+//! kj::ExceptionCallback that captures KJ_LOG output into an external sink.
+//! Must be instantiated on the thread whose KJ logs you want to capture; it
+//! installs itself onto that thread's ExceptionCallback stack via its base
+//! constructor and removes itself in the destructor.
+class CaptureLogCallback : public kj::ExceptionCallback
+{
+public:
+ CaptureLogCallback(std::mutex& mu, std::string& sink) : m_mu(mu), m_sink(sink) {}
+
+ void logMessage(kj::LogSeverity severity, const char* file, int line, int contextDepth,
+ kj::String&& text) override
+ {
+ {
+ std::lock_guard<std::mutex> lock(m_mu);
+ m_sink.append(text.cStr(), text.size());
+ m_sink.push_back('\n');
+ }
+ // Still let the default callback emit to stderr so test debug output
+ // isn't silenced for other observers.
+ kj::ExceptionCallback::logMessage(severity, file, line, contextDepth, kj::mv(text));
+ }
+
+private:
+ std::mutex& m_mu;
+ std::string& m_sink;
+};
+
class ListenSetup
{
public:
explicit ListenSetup(std::optional<size_t> max_connections = std::nullopt)
: capped_listener(max_connections.has_value()), thread([this, max_connections] {
+ CaptureLogCallback log_capture(captured_log_mutex, captured_log);
EventLoop loop("mptest-server", [this](mp::LogMessage log) {
if (log.level == mp::Log::Raise) throw std::runtime_error(log.message);
if (log.message.find("IPC server: socket connected.") != std::string::npos) {
@@ -144,6 +174,18 @@ public:
~ListenSetup()
{
+ forceShutdown();
+ thread.join();
+ }
+
+ //! Synchronously tear down the event loop's task set so any pending accept
+ //! promises are destroyed now (rather than when the destructor runs later).
+ //! This makes it possible to assert on captured KJ log output before the
+ //! ListenSetup goes out of scope. Idempotent.
+ void forceShutdown()
+ {
+ if (shutdown_done) return;
+ shutdown_done = true;
if (capped_listener) {
EventLoop* loop;
{
@@ -152,7 +194,6 @@ public:
}
if (loop) loop->sync([&] { loop->m_task_set.reset(); });
}
- thread.join();
}
size_t ConnectedCount()
@@ -184,11 +225,15 @@ public:
UnixListener listener;
std::promise<void> ready_promise;
bool capped_listener{false};
+ bool shutdown_done{false};
std::mutex counter_mutex;
std::condition_variable counter_cv;
EventLoop* event_loop{nullptr};
size_t connected_count{0};
size_t disconnected_count{0};
+ //! KJ log output captured from the server thread via CaptureLogCallback.
+ std::mutex captured_log_mutex;
+ std::string captured_log;
std::thread thread;
};
@@ -245,6 +290,34 @@ KJ_TEST("ListenConnections keeps capped listeners alive before reaching the limi
KJ_EXPECT(client2->client->add(2, 3) == 5);
}
+// Without `accept_pending`, cascaded close handlers each post a duplicate
+// accept(). KJ silently serializes them so the cap isn't exceeded, but the
+// extra pending promises are destroyed at cleanup and logged as
+// "PromiseFulfiller was destroyed without fulfilling the promise."
+// This test fails when accept_pending is removed and passes when it's intact.
+KJ_TEST("ListenConnections does not leak accept promises during disconnect burst")
+{
+ constexpr size_t kCap = 2;
+ ListenSetup setup(/*max_connections=*/kCap);
+
+ std::vector<std::unique_ptr<ClientSetup>> filling;
+ filling.reserve(kCap);
+ for (size_t i = 0; i < kCap; ++i) {
+ filling.push_back(std::make_unique<ClientSetup>(setup.listener.Connect()));
+ }
+ setup.WaitForConnectedCount(kCap);
+
+ filling.clear();
+ setup.WaitForDisconnectedCount(kCap);
+
+ // Trigger m_task_set.reset() now so any leaked accept promises get destroyed
+ // before we read captured_log.
+ setup.forceShutdown();
+
+ std::lock_guard<std::mutex> lock(setup.captured_log_mutex);
+ KJ_EXPECT(setup.captured_log.find("PromiseFulfiller was destroyed") == std::string::npos);
+}
+
} // namespace
} // namespace test
} // namespace mpBasically what this does is install a kj::ExceptionCallback in the server thread to capture the log "PromiseFulfiller was destroyed" generated by KJ runtime if cascaded disconnects each call _Listen and post a fresh accept() promise. The test fails with accept_pending removed and pass with it back.
There was a problem hiding this comment.
IMO it's not so obvious why this field is needed here, this patch basically guarantee the same outcome and also pass the test generated by Claude above:
diff --git a/include/mp/proxy-io.h b/include/mp/proxy-io.h
index 78924c6..c002811 100644
--- a/include/mp/proxy-io.h
+++ b/include/mp/proxy-io.h
@@ -851,11 +851,6 @@ struct ListenState
kj::Own<kj::ConnectionReceiver> listener;
std::optional<size_t> max_connections;
size_t active_connections{0};
- //! Tracks whether accept() has already been posted. This is needed because
- //! active_connections only counts accepted connections, so without a
- //! separate flag, nested _Listen() calls could queue multiple pending
- //! accepts before active_connections increases.
- bool accept_pending{false};
};
template <typename InitInterface, typename InitImpl>
@@ -866,9 +861,12 @@ void _ServeAccepted(EventLoop& loop, InitImpl& init, const std::shared_ptr<Liste
{
++state->active_connections;
_Serve<InitInterface>(loop, kj::mv(stream), init, [&loop, &init, state] {
+ const bool was_at_cap = state->max_connections && state->active_connections == *state->max_connections;
assert(state->active_connections > 0);
--state->active_connections;
- _Listen<InitInterface>(loop, init, state);
+ if (was_at_cap) {
+ _Listen<InitInterface>(loop, init, state);
+ }
});
}
@@ -885,15 +883,12 @@ inline std::unique_ptr<EventLoopRef> _MakeCappedListenerRef(EventLoop& loop, con
template <typename InitInterface, typename InitImpl>
void _Listen(EventLoop& loop, InitImpl& init, const std::shared_ptr<ListenState>& state)
{
- if (state->accept_pending) return;
if (_ListenAtCapacity(*state)) return;
- state->accept_pending = true;
auto* ptr = state->listener.get();
auto accept_ref{_MakeCappedListenerRef(loop, *state)};
loop.m_task_set->add(ptr->accept().then(
[&loop, &init, state, accept_ref = std::move(accept_ref)](kj::Own<kj::AsyncIoStream>&& stream) mutable {
- state->accept_pending = false;
_ServeAccepted<InitInterface>(loop, init, state, kj::mv(stream));
_Listen<InitInterface>(loop, init, state);
}));There was a problem hiding this comment.
I think this is a better invariant than tracking accept_pending.
There should already be one pending accept whenever the capped listener is below capacity, and disconnects only need to post a new accept when they transition the listener from full to below full, because that is the only state where nocaccept was pending.
So checking this before decrementing active_connections avoids the duplicate-accept case without adding another state variable.
Taken and decided to use resume_accept for the local boolean instead
|
|
||
| namespace { | ||
|
|
||
| class UnixListener |
There was a problem hiding this comment.
In commit "proxy: add local connection limit to ListenConnections" (84ed607)
Would be good to introduce the test in a separate commit and maybe separate file if it doesn't share much code with existing test.
IMO would be nice if it test was introduced in an intial commit then updated after connection limits are added so it's easier to see how connection limits are tested separately from connection setup.
There was a problem hiding this comment.
Split the listener coverage out into a dedicated listen_tests.cpp file and reordered the history so the baseline ListenConnections() test is introduced first, then extended in follow-up commit with the local connection limit coverage.
57e7070 to
8511c68
Compare
|
Thanks for the review @ryanofsky . I addressed the cleanup points in the latest push:
I’m also planning to put together a draft Bitcoin Core PR using this API so the per-address approach can be evaluated downstream against the current global-limit direction. |
|
I put together the downstream draft using this API here bitcoin/bitcoin#35037 It uses per |
|
This PR is now ready for review |
xyzconstant
left a comment
There was a problem hiding this comment.
Code review ACK 8511c68
Reviewed each commit separately, compiled and ran tests. The changes look good to me, only left a couple of inline nits noting a compilation error + failing tests in the first commit.
| } | ||
| }); | ||
| FooImplementation foo; | ||
| ListenConnections<messages::FooInterface>(loop, listener.release(), foo, max_connections); |
| KJ_EXPECT(client->client->add(1, 2) == 3); | ||
| } | ||
|
|
||
| KJ_TEST("ListenConnections enforces a local connection limit") |
There was a problem hiding this comment.
nit: Unlike in ListenConnections's call issue (compilation error), this will still fail for this commit (8c47a3a). Following ryanosfky's reasoning (#269 (comment)) I believe this suite would be better introduced in 8511c68 so the test lands with the feature it exercises.
8511c68 to
b36e98b
Compare
|
Thanks for the review @xyzconstant . Fixed both commit-structure issues: the first test commit now only adds baseline |
|
Also tightened the capped listener behavior. It now stops posting accepts once the limit is reached, and keeps capped pending accepts alive so later clients can connect after an idle gap, including before the cap has been reached. Added coverage for the reconnect cases as well |
b36e98b to
19e1386
Compare
| [&loop, &init, state, accept_ref = std::move(accept_ref)](kj::Own<kj::AsyncIoStream>&& stream) mutable { | ||
| state->accept_pending = false; | ||
| _ServeAccepted<InitInterface>(loop, init, state, kj::mv(stream)); | ||
| if (_ListenAtCapacity(*state)) return; |
There was a problem hiding this comment.
In commit "proxy: add local connection limit to ListenConnections" (19e1386)
Not sure if I'm missing something but this check here seems redundant with the same _ListenAtCapacity check at the start of _Listen (line 889).
I tested commenting this line out and left the upper-level check (and vice-versa) and the tests passed with no issues. I'd suggest dropping any of these duplicates.
There was a problem hiding this comment.
Good catch, thanks. Dropped the lower _ListenAtCapacity() check since _Listen() already handles the capacity check before posting another accept.
19e1386 to
b0207dd
Compare
| [&loop, &init, state, accept_ref = std::move(accept_ref)](kj::Own<kj::AsyncIoStream>&& stream) mutable { | ||
| state->accept_pending = false; | ||
| _ServeAccepted<InitInterface>(loop, init, state, kj::mv(stream)); | ||
| _Listen<InitInterface>(loop, init, state); |
There was a problem hiding this comment.
With the accept_pending check in place, this _Listen call will never be reached.
NOTE: if you apply this patch here (comment), then it will be needed here because we can't rely on the second _Listen call in _ServeAccepted which is executed conditionally after active_connections reached the cap.
|
Thanks for the update @enirox001! I've been playing around with the PR code more throughly this time and have a different take on Overall the code is factually correct and works as expected. And I think it could be merged (despite my latest thoughts on |
3f8823b to
7e51ab2
Compare
Thanks for taking another look @xyzconstant I dropped |
|
re-ACK 7e51ab2 Thanks for the updates @enirox001! |
| public: | ||
| explicit ClientSetup(int fd) | ||
| : thread([this, fd] { | ||
| EventLoop loop("mptest-client", [](mp::LogMessage log) { |
There was a problem hiding this comment.
In commit "test: add dedicated ListenConnections coverage" (d0cff01)
Here and below, it might be good to add KJ_LOG(INFO, log.level, log.message); to show debug logs when --verbose is used, like the other test file.
I think it could also be good to add a more generic test setup class that can be reused in tests and deduplicate the EventLoop thread/join/promise code that is now repeated in 3 classes. It could have std::function hooks for running code before loop.loop(), or when messages are logged. But this is more of an idea for a followup, current code seems fine.
There was a problem hiding this comment.
Done. Added KJ_LOG(INFO, log.level, log.message); to the logging callbacks of EventLoop in both ClientSetup and ListenSetup to ensure debug logs print correctly under --verbose .
Regarding the generic test setup helper, that is a great suggestion to deduplicate the event loop/thread boilerplates. I agree it would be a clean follow-up improvement.
| : thread([this] { | ||
| EventLoop loop("mptest-server", [this](mp::LogMessage log) { | ||
| if (log.level == mp::Log::Raise) throw std::runtime_error(log.message); | ||
| if (log.message.find("IPC server: socket connected.") != std::string::npos) { |
There was a problem hiding this comment.
In commit "test: add dedicated ListenConnections coverage" (d0cff01)
Maybe it would be good to add a testing hook for this (grep for testing_hook_ for examples), instead of checking for a log message.
Checking for a log message does seem ok though, and may be even better if the goal is to catch regressions since it doesn't require changing test and non-test code at the same time.
There was a problem hiding this comment.
Done. I added a testing_hook_connected member to EventLoop and invoked it inside _Serve .
ListenSetup now hooks into this callback to count connections, which makes the tests more robust and consistent with the other testing_hook hooks in the codebase.
|
|
||
| std::promise<std::unique_ptr<ProxyClient<messages::FooInterface>>> client_promise; | ||
| std::unique_ptr<ProxyClient<messages::FooInterface>> client; | ||
| std::thread thread; |
There was a problem hiding this comment.
In commit "test: add dedicated ListenConnections coverage" (d0cff01)
Other test class has a comment that may be useful here and below as well
//! Thread variable should be after other struct members so the thread does
//! not start until the other members are initialized.There was a problem hiding this comment.
Thanks, added this comment where appropriate.
| #include <mp/test/foo.capnp.h> | ||
| #include <mp/test/foo.capnp.proxy.h> | ||
|
|
||
| #include <string.h> // NOLINT(modernize-deprecated-headers) |
There was a problem hiding this comment.
In commit "test: add dedicated ListenConnections coverage" (d0cff01)
Unclear what reason is for this change. Probably should be a separate commit if it's necessary
There was a problem hiding this comment.
Thanks, removed this header.
| //! updated, noting any significant or incompatible changes made since the | ||
| //! previous version. | ||
| #define MP_MAJOR_VERSION 10 | ||
| #define MP_MAJOR_VERSION 11 |
There was a problem hiding this comment.
In commit "test: add dedicated ListenConnections coverage" (d0cff01)
Looks like this needs to be rebased since current version is already 11, and this should be bumping from 11>12.
Also would suggest bumping version in an initial new commit before the other changes, instead of alongside them. (For example see b15d63e from #274.) Splitting should the make the PR a little easier to review, and also allow creating a v11.0 tag that contains every commit MP_MAJOR_VERSION 11 set, and points at a merge.
There was a problem hiding this comment.
Done, split the version bumping from the other changes, and included it in the initial commit before other commits
| } | ||
|
|
||
| template <typename InitInterface, typename InitImpl> | ||
| void _Listen(EventLoop& loop, InitImpl& init, const std::shared_ptr<ListenState>& state) |
There was a problem hiding this comment.
In commit "proxy: add local connection limit to ListenConnections" (7e51ab2)
This function used to have a code comment "//! Given connection receiver and an init object, handle..." that looks like it became detached. Would be good to move it down next this function and update it ("Given init object and a state object containing a connection receiver, handle...").
It might also be clearer to turn this into a ListenState method. (And if doing that, rename ListenState to Listener, rename listener to m_receiver, rename state to self, make ListenAtCapacity another method, etc). Just an idea though, I haven't studied this code enough yet.
There was a problem hiding this comment.
Done. The comment has been moved down above the _Listen function, and its wording has been updated to reflect the new init and state parameters as suggested.
Regarding refactoring _Listen into a member method of ListenState. I decided to keep it as a free function instead.
The _Listen template function registers asynchronous handlers via ptr->accept().then(...) , turning it into a method would require ListenState to inherit from std::enable_shared_from_this to pass self safely into the lambdas.
I think keeping it as a simple struct/free function avoids this boilerplate and remains consistent with the other helper functions in proxy-io.h
|
|
||
| template <typename InitInterface, typename InitImpl> | ||
| void _Listen(EventLoop& loop, kj::Own<kj::ConnectionReceiver>&& listener, InitImpl& init) | ||
| void _ServeAccepted(EventLoop& loop, InitImpl& init, const std::shared_ptr<ListenState>& state, kj::Own<kj::AsyncIoStream>&& stream) |
There was a problem hiding this comment.
In commit "proxy: add local connection limit to ListenConnections" (7e51ab2)
Having an extra _ServeAccepted function that is only called one place seems to make this code more complicated for little benefit. Any reason not to call _Serve directly from _Listen like previous code did?
There was a problem hiding this comment.
I agree that this is redundant. I removed the _ServeAccepted helper function entirely and placed the logic directly inside the _Listen callback. This reduces indirection and keeps the flow clean.
| KJ_EXPECT(client1->client->add(1, 2) == 3); | ||
|
|
||
| auto client2 = std::make_unique<ClientSetup>(setup.listener.Connect()); | ||
| std::this_thread::sleep_for(std::chrono::milliseconds(100)); |
There was a problem hiding this comment.
In commit "proxy: add local connection limit to ListenConnections" (7e51ab2)
Note: Sleep seems racy but harmless since that it shouldn't ever cause the test to fail when it should succeed, only to potentially succeed when it should fail. I can't think of a better way to check that a connection is NOT accepted other than maybe adding a testing_hook that exposes listening state, though so this might be the best approach for now.
There was a problem hiding this comment.
Since asserting that the second connection was not accepted inherently requires a brief window of time to pass,
The short sleep seems to be the most straightforward and reliable approach to test this behavior without adding state tracking hooks. I've left the sleep in place.
|
|
||
| inline std::unique_ptr<EventLoopRef> _MakeCappedListenerRef(EventLoop& loop, const ListenState& state) | ||
| { | ||
| return state.max_connections && *state.max_connections > 0 ? std::make_unique<EventLoopRef>(loop) : nullptr; |
There was a problem hiding this comment.
In commit "proxy: add local connection limit to ListenConnections" (7e51ab2)
Would be helpful to have a code comment explaining this logic. I don't think I understand why it avoiding creating an EventLoopRef if there is no connection limit. I'm also not clear on why passing accept_ref around separately is needed. Naively I think I'd just expect ListenState to have an EventLoopRef member.
There was a problem hiding this comment.
Thanks, I added explanatory comments to both _MakeCappedListenerRef. I avoid holding EventLoopRef in the uncapped case because the loop is always listening, and holding a ref would prevent it from automatically exiting when all client connections close.
Passing accept_ref in the callback instead of storing it as a member would bind the ref's lifetime directly to the active accept() task. This means that when the listener reaches capacity, we return early and don't register a new task, which automatically destroys the ref and lets the loop exit.
7e51ab2 to
458cd5a
Compare
Add a separate listen_tests.cpp file with reusable UnixListener, ClientSetup, and ListenSetup helpers for exercising ListenConnections() with real Unix domain sockets. The new test covers the baseline behavior that ListenConnections() accepts an incoming connection and serves requests over it. Keeping this coverage separate from the existing general proxy tests makes the socket listener setup easier to review and provides a clearer place to extend listener-specific behavior in follow-up commits.
458cd5a to
cc36777
Compare
Add an optional max_connections parameter to ListenConnections() so a listener can stop accepting new connections after reaching a local connection cap and resume accepting after an existing connection disconnects. Implement the limit with listener-local state tracking the listening socket, maximum number of active connections, and whether an async accept() has already been posted. This keeps the limit scoped to the individual listener instead of introducing global EventLoop state. Extend the dedicated listener test coverage to verify that with max_connections=1 the first client is accepted normally, a second client is not accepted while the first remains connected, and the second client is accepted after the first disconnects.
cc36777 to
1e8776f
Compare
|
Thanks for the review @ryanofsky, made the following changes in the latest push:
|
This adds an optional local connection limit to
ListenConnections().Previously,
ListenConnections()would accept incoming connections indefinitely. This branch adds an optionalmax_connectionsparameter so a listener can stop accepting new connections once a per-listener cap is reached, and resume accepting when an existing connection disconnects.The limit is local to the listener instead of global to the
EventLoop. This keeps the state and behavior scoped to the listening socket, and is closer to the direction discussed downstream for per--ipcbindlimits.This also adds a test covering the behavior with
max_connections=1, verifying that:This is a follow-up to the IPC FD reservation work