Skip to content

proxy: add local connection limit to ListenConnections#269

Open
enirox001 wants to merge 3 commits into
bitcoin-core:masterfrom
enirox001:04-26-ipc-local-connection-limit
Open

proxy: add local connection limit to ListenConnections#269
enirox001 wants to merge 3 commits into
bitcoin-core:masterfrom
enirox001:04-26-ipc-local-connection-limit

Conversation

@enirox001
Copy link
Copy Markdown
Contributor

@enirox001 enirox001 commented Apr 8, 2026

This adds an optional local connection limit toListenConnections().

Previously, ListenConnections() would accept incoming connections indefinitely. This branch adds an optional max_connections parameter so a listener can stop accepting new connections once a per-listener cap is reached, and resume accepting when an existing connection disconnects.

The limit is local to the listener instead of global to the EventLoop. This keeps the state and behavior scoped to the listening socket, and is closer to the direction discussed downstream for per--ipcbind limits.

This also adds a test covering the behavior with max_connections=1, verifying that:

  • the first client is accepted normally
  • a second client is not accepted while the first remains connected
  • the second client is accepted after the first disconnects

This is a follow-up to the IPC FD reservation work

@DrahtBot
Copy link
Copy Markdown

DrahtBot commented Apr 8, 2026

The following sections might be updated with supplementary metadata relevant to reviewers and maintainers.

Reviews

See the guideline for information on the review process.

Type Reviewers
Approach ACK ryanofsky
Stale ACK xyzconstant

If your review is incorrectly listed, please copy-paste <!--meta-tag:bot-skip--> into the comment that the bot should ignore.

Conflicts

Reviewers, this pull request conflicts with the following ones:

  • #274 (Add nonunix platform support by ryanofsky)

If you consider this pull request important, please also help to review the conflicting pull requests. Ideally, start with the one that should be merged first.

@enirox001 enirox001 marked this pull request as draft April 8, 2026 10:01
@enirox001 enirox001 force-pushed the 04-26-ipc-local-connection-limit branch 3 times, most recently from 3ef8e5c to 84ed607 Compare April 8, 2026 10:53
Copy link
Copy Markdown
Collaborator

@ryanofsky ryanofsky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approach ACK 84ed607. Implementation of local connection limit here looks almost exactly like I would have expected.

I do think it would be helpful to see a draft PR in the bitcoin repo using this API (it should be fine to make libmultiprocess changes there and let the lint CI job fail so you don't need to mess with subtrees) because the approach in bitcoin/bitcoin#34978 of adding a global connection limit option isn't exactly compatible with the implementation here of implementing a per-address connection limit.

I'd personally prefer using per-address limits over introducing a global limit but both approaches seem reasonable

Comment thread include/mp/proxy-io.h Outdated
void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init, OnDisconnect&& on_disconnect);

template <typename InitInterface, typename InitImpl>
void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In commit "proxy: add local connection limit to ListenConnections" (84ed607)

Since _Serve is an internal function only called in a few places, I think it would be less confusing if it was not overloaded, and just always required an on_disconnect parameter, even if it requires a little extra verbosity at some call sites.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Simplified this now so _Serve always takes an on_disconnect callback instead of overloading it. Since it is only used internally

Comment thread include/mp/proxy-io.h Outdated
kj::Own<kj::ConnectionReceiver> listener;
std::optional<size_t> max_connections;
size_t active_connections{0};
bool accept_pending{false};
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In commit "proxy: add local connection limit to ListenConnections" (84ed607)

Curious if this accept_pending variable is actually necessary or if code could just compare active_connections and max_connections when deciding whether to listen. Would prefer to avoid redundancy in the state representation if possible even if makes individual checks more a little more verbose.

If accept_pending really is necessary would be a good to have a short comment about why.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept accept_pending, but added a short comment explaining why it is needed here. active_connections only counts accepted connections, so without a separate flag nested _Listen() calls could post multiple pending accept() calls before active_connections is incremented.

Copy link
Copy Markdown
Contributor

@xyzconstant xyzconstant May 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I tested the code without accept_pending field and hence its check at _Listen removed, the tests passed with no issues. Then, asked Claude to generate a test that exercises it and produced this:

diff --git a/test/mp/test/listen_tests.cpp b/test/mp/test/listen_tests.cpp
index b367938..78298c7 100644
--- a/test/mp/test/listen_tests.cpp
+++ b/test/mp/test/listen_tests.cpp
@@ -24,8 +24,10 @@
 #include <string>
 #include <sys/socket.h>
 #include <sys/un.h>
+#include <kj/exception.h>
 #include <thread>
 #include <unistd.h>
+#include <vector>
 
 namespace mp {
 namespace test {
@@ -112,11 +114,39 @@ public:
     std::thread thread;
 };
 
+//! kj::ExceptionCallback that captures KJ_LOG output into an external sink.
+//! Must be instantiated on the thread whose KJ logs you want to capture; it
+//! installs itself onto that thread's ExceptionCallback stack via its base
+//! constructor and removes itself in the destructor.
+class CaptureLogCallback : public kj::ExceptionCallback
+{
+public:
+    CaptureLogCallback(std::mutex& mu, std::string& sink) : m_mu(mu), m_sink(sink) {}
+
+    void logMessage(kj::LogSeverity severity, const char* file, int line, int contextDepth,
+                    kj::String&& text) override
+    {
+        {
+            std::lock_guard<std::mutex> lock(m_mu);
+            m_sink.append(text.cStr(), text.size());
+            m_sink.push_back('\n');
+        }
+        // Still let the default callback emit to stderr so test debug output
+        // isn't silenced for other observers.
+        kj::ExceptionCallback::logMessage(severity, file, line, contextDepth, kj::mv(text));
+    }
+
+private:
+    std::mutex& m_mu;
+    std::string& m_sink;
+};
+
 class ListenSetup
 {
 public:
     explicit ListenSetup(std::optional<size_t> max_connections = std::nullopt)
         : capped_listener(max_connections.has_value()), thread([this, max_connections] {
+              CaptureLogCallback log_capture(captured_log_mutex, captured_log);
               EventLoop loop("mptest-server", [this](mp::LogMessage log) {
                   if (log.level == mp::Log::Raise) throw std::runtime_error(log.message);
                   if (log.message.find("IPC server: socket connected.") != std::string::npos) {
@@ -144,6 +174,18 @@ public:
 
     ~ListenSetup()
     {
+        forceShutdown();
+        thread.join();
+    }
+
+    //! Synchronously tear down the event loop's task set so any pending accept
+    //! promises are destroyed now (rather than when the destructor runs later).
+    //! This makes it possible to assert on captured KJ log output before the
+    //! ListenSetup goes out of scope. Idempotent.
+    void forceShutdown()
+    {
+        if (shutdown_done) return;
+        shutdown_done = true;
         if (capped_listener) {
             EventLoop* loop;
             {
@@ -152,7 +194,6 @@ public:
             }
             if (loop) loop->sync([&] { loop->m_task_set.reset(); });
         }
-        thread.join();
     }
 
     size_t ConnectedCount()
@@ -184,11 +225,15 @@ public:
     UnixListener listener;
     std::promise<void> ready_promise;
     bool capped_listener{false};
+    bool shutdown_done{false};
     std::mutex counter_mutex;
     std::condition_variable counter_cv;
     EventLoop* event_loop{nullptr};
     size_t connected_count{0};
     size_t disconnected_count{0};
+    //! KJ log output captured from the server thread via CaptureLogCallback.
+    std::mutex captured_log_mutex;
+    std::string captured_log;
     std::thread thread;
 };
 
@@ -245,6 +290,34 @@ KJ_TEST("ListenConnections keeps capped listeners alive before reaching the limi
     KJ_EXPECT(client2->client->add(2, 3) == 5);
 }
 
+// Without `accept_pending`, cascaded close handlers each post a duplicate
+// accept(). KJ silently serializes them so the cap isn't exceeded, but the
+// extra pending promises are destroyed at cleanup and logged as
+// "PromiseFulfiller was destroyed without fulfilling the promise."
+// This test fails when accept_pending is removed and passes when it's intact.
+KJ_TEST("ListenConnections does not leak accept promises during disconnect burst")
+{
+    constexpr size_t kCap = 2;
+    ListenSetup setup(/*max_connections=*/kCap);
+
+    std::vector<std::unique_ptr<ClientSetup>> filling;
+    filling.reserve(kCap);
+    for (size_t i = 0; i < kCap; ++i) {
+        filling.push_back(std::make_unique<ClientSetup>(setup.listener.Connect()));
+    }
+    setup.WaitForConnectedCount(kCap);
+
+    filling.clear();
+    setup.WaitForDisconnectedCount(kCap);
+
+    // Trigger m_task_set.reset() now so any leaked accept promises get destroyed
+    // before we read captured_log.
+    setup.forceShutdown();
+
+    std::lock_guard<std::mutex> lock(setup.captured_log_mutex);
+    KJ_EXPECT(setup.captured_log.find("PromiseFulfiller was destroyed") == std::string::npos);
+}
+
 } // namespace
 } // namespace test
 } // namespace mp

Basically what this does is install a kj::ExceptionCallback in the server thread to capture the log "PromiseFulfiller was destroyed" generated by KJ runtime if cascaded disconnects each call _Listen and post a fresh accept() promise. The test fails with accept_pending removed and pass with it back.

Copy link
Copy Markdown
Contributor

@xyzconstant xyzconstant May 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO it's not so obvious why this field is needed here, this patch basically guarantee the same outcome and also pass the test generated by Claude above:

diff --git a/include/mp/proxy-io.h b/include/mp/proxy-io.h
index 78924c6..c002811 100644
--- a/include/mp/proxy-io.h
+++ b/include/mp/proxy-io.h
@@ -851,11 +851,6 @@ struct ListenState
     kj::Own<kj::ConnectionReceiver> listener;
     std::optional<size_t> max_connections;
     size_t active_connections{0};
-    //! Tracks whether accept() has already been posted. This is needed because
-    //! active_connections only counts accepted connections, so without a
-    //! separate flag, nested _Listen() calls could queue multiple pending
-    //! accepts before active_connections increases.
-    bool accept_pending{false};
 };
 
 template <typename InitInterface, typename InitImpl>
@@ -866,9 +861,12 @@ void _ServeAccepted(EventLoop& loop, InitImpl& init, const std::shared_ptr<Liste
 {
     ++state->active_connections;
     _Serve<InitInterface>(loop, kj::mv(stream), init, [&loop, &init, state] {
+        const bool was_at_cap = state->max_connections && state->active_connections == *state->max_connections;
         assert(state->active_connections > 0);
         --state->active_connections;
-        _Listen<InitInterface>(loop, init, state);
+        if (was_at_cap) {
+            _Listen<InitInterface>(loop, init, state);
+        }
     });
 }
 
@@ -885,15 +883,12 @@ inline std::unique_ptr<EventLoopRef> _MakeCappedListenerRef(EventLoop& loop, con
 template <typename InitInterface, typename InitImpl>
 void _Listen(EventLoop& loop, InitImpl& init, const std::shared_ptr<ListenState>& state)
 {
-    if (state->accept_pending) return;
     if (_ListenAtCapacity(*state)) return;
 
-    state->accept_pending = true;
     auto* ptr = state->listener.get();
     auto accept_ref{_MakeCappedListenerRef(loop, *state)};
     loop.m_task_set->add(ptr->accept().then(
         [&loop, &init, state, accept_ref = std::move(accept_ref)](kj::Own<kj::AsyncIoStream>&& stream) mutable {
-            state->accept_pending = false;
             _ServeAccepted<InitInterface>(loop, init, state, kj::mv(stream));
             _Listen<InitInterface>(loop, init, state);
         }));

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a better invariant than tracking accept_pending.

There should already be one pending accept whenever the capped listener is below capacity, and disconnects only need to post a new accept when they transition the listener from full to below full, because that is the only state where nocaccept was pending.

So checking this before decrementing active_connections avoids the duplicate-accept case without adding another state variable.

Taken and decided to use resume_accept for the local boolean instead

Comment thread test/mp/test/test.cpp Outdated

namespace {

class UnixListener
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In commit "proxy: add local connection limit to ListenConnections" (84ed607)

Would be good to introduce the test in a separate commit and maybe separate file if it doesn't share much code with existing test.

IMO would be nice if it test was introduced in an intial commit then updated after connection limits are added so it's easier to see how connection limits are tested separately from connection setup.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Split the listener coverage out into a dedicated listen_tests.cpp file and reordered the history so the baseline ListenConnections() test is introduced first, then extended in follow-up commit with the local connection limit coverage.

@enirox001 enirox001 force-pushed the 04-26-ipc-local-connection-limit branch 4 times, most recently from 57e7070 to 8511c68 Compare April 9, 2026 10:20
@enirox001
Copy link
Copy Markdown
Contributor Author

Thanks for the review @ryanofsky .

I addressed the cleanup points in the latest push:

  • _Serve is now explicit at all call sites
  • accept_pending is documented,
  • listener coverage now lives in a dedicated listen_tests.cpp file introduced in a separate commit before the local-limit extension.

I’m also planning to put together a draft Bitcoin Core PR using this API so the per-address approach can be evaluated downstream against the current global-limit direction.

@enirox001
Copy link
Copy Markdown
Contributor Author

enirox001 commented Apr 9, 2026

I put together the downstream draft using this API here bitcoin/bitcoin#35037

It uses per--ipcbind max-connections=<n> options, threads the parsed per-address limit into ListenConnections(), and adds downstream coverage for the local-limit behavior.

@enirox001
Copy link
Copy Markdown
Contributor Author

This PR is now ready for review

Copy link
Copy Markdown
Contributor

@xyzconstant xyzconstant left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code review ACK 8511c68

Reviewed each commit separately, compiled and ran tests. The changes look good to me, only left a couple of inline nits noting a compilation error + failing tests in the first commit.

}
});
FooImplementation foo;
ListenConnections<messages::FooInterface>(loop, listener.release(), foo, max_connections);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This will fail to compile mptest at commit 8c47a3a since the 4th parameter max_connections is not introduced in ListenConnections's signature until next commit. I'd fix the call for this commit and then adjust the setup accordingly in 8511c68.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, thanks

KJ_EXPECT(client->client->add(1, 2) == 3);
}

KJ_TEST("ListenConnections enforces a local connection limit")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Unlike in ListenConnections's call issue (compilation error), this will still fail for this commit (8c47a3a). Following ryanosfky's reasoning (#269 (comment)) I believe this suite would be better introduced in 8511c68 so the test lands with the feature it exercises.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@DrahtBot DrahtBot requested a review from ryanofsky May 13, 2026 03:15
@enirox001 enirox001 force-pushed the 04-26-ipc-local-connection-limit branch from 8511c68 to b36e98b Compare May 14, 2026 08:43
@enirox001
Copy link
Copy Markdown
Contributor Author

Thanks for the review @xyzconstant . Fixed both commit-structure issues: the first test commit now only adds baseline ListenConnections() coverage using the existing 3-argument API, and the max_connections setup/test now lands with the feature commit that introduces the new parameter

@enirox001
Copy link
Copy Markdown
Contributor Author

enirox001 commented May 14, 2026

Also tightened the capped listener behavior. It now stops posting accepts once the limit is reached, and keeps capped pending accepts alive so later clients can connect after an idle gap, including before the cap has been reached.

Added coverage for the reconnect cases as well

@enirox001 enirox001 force-pushed the 04-26-ipc-local-connection-limit branch from b36e98b to 19e1386 Compare May 14, 2026 08:51
Comment thread include/mp/proxy-io.h Outdated
[&loop, &init, state, accept_ref = std::move(accept_ref)](kj::Own<kj::AsyncIoStream>&& stream) mutable {
state->accept_pending = false;
_ServeAccepted<InitInterface>(loop, init, state, kj::mv(stream));
if (_ListenAtCapacity(*state)) return;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In commit "proxy: add local connection limit to ListenConnections" (19e1386)

Not sure if I'm missing something but this check here seems redundant with the same _ListenAtCapacity check at the start of _Listen (line 889).

I tested commenting this line out and left the upper-level check (and vice-versa) and the tests passed with no issues. I'd suggest dropping any of these duplicates.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, thanks. Dropped the lower _ListenAtCapacity() check since _Listen() already handles the capacity check before posting another accept.

@enirox001 enirox001 force-pushed the 04-26-ipc-local-connection-limit branch from 19e1386 to b0207dd Compare May 19, 2026 09:05
Comment thread include/mp/proxy-io.h
[&loop, &init, state, accept_ref = std::move(accept_ref)](kj::Own<kj::AsyncIoStream>&& stream) mutable {
state->accept_pending = false;
_ServeAccepted<InitInterface>(loop, init, state, kj::mv(stream));
_Listen<InitInterface>(loop, init, state);
Copy link
Copy Markdown
Contributor

@xyzconstant xyzconstant May 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the accept_pending check in place, this _Listen call will never be reached.

NOTE: if you apply this patch here (comment), then it will be needed here because we can't rely on the second _Listen call in _ServeAccepted which is executed conditionally after active_connections reached the cap.

@xyzconstant
Copy link
Copy Markdown
Contributor

Thanks for the update @enirox001!

I've been playing around with the PR code more throughly this time and have a different take on accept_pending now, so left a few comments.

Overall the code is factually correct and works as expected. And I think it could be merged (despite my latest thoughts on accept_pending). So I'll just re-ACK b0207dd

@enirox001 enirox001 force-pushed the 04-26-ipc-local-connection-limit branch 2 times, most recently from 3f8823b to 7e51ab2 Compare May 21, 2026 11:37
@enirox001
Copy link
Copy Markdown
Contributor Author

enirox001 commented May 21, 2026

I've been playing around with the PR code more throughly this time and have a different take on accept_pending now, so left a few comments.

Thanks for taking another look @xyzconstant

I dropped accept_pending and switched to the simpler invariant you suggested.

@xyzconstant
Copy link
Copy Markdown
Contributor

re-ACK 7e51ab2

Thanks for the updates @enirox001!

Copy link
Copy Markdown
Collaborator

@ryanofsky ryanofsky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finally spent some time looking at this (7e51ab2), and broadly the changes look very good. Left a few questions & suggestions and plan to review more

public:
explicit ClientSetup(int fd)
: thread([this, fd] {
EventLoop loop("mptest-client", [](mp::LogMessage log) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In commit "test: add dedicated ListenConnections coverage" (d0cff01)

Here and below, it might be good to add KJ_LOG(INFO, log.level, log.message); to show debug logs when --verbose is used, like the other test file.

I think it could also be good to add a more generic test setup class that can be reused in tests and deduplicate the EventLoop thread/join/promise code that is now repeated in 3 classes. It could have std::function hooks for running code before loop.loop(), or when messages are logged. But this is more of an idea for a followup, current code seems fine.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Added KJ_LOG(INFO, log.level, log.message); to the logging callbacks of EventLoop in both ClientSetup and ListenSetup to ensure debug logs print correctly under --verbose .

Regarding the generic test setup helper, that is a great suggestion to deduplicate the event loop/thread boilerplates. I agree it would be a clean follow-up improvement.

Comment thread test/mp/test/listen_tests.cpp Outdated
: thread([this] {
EventLoop loop("mptest-server", [this](mp::LogMessage log) {
if (log.level == mp::Log::Raise) throw std::runtime_error(log.message);
if (log.message.find("IPC server: socket connected.") != std::string::npos) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In commit "test: add dedicated ListenConnections coverage" (d0cff01)

Maybe it would be good to add a testing hook for this (grep for testing_hook_ for examples), instead of checking for a log message.

Checking for a log message does seem ok though, and may be even better if the goal is to catch regressions since it doesn't require changing test and non-test code at the same time.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I added a testing_hook_connected member to EventLoop and invoked it inside _Serve .

ListenSetup now hooks into this callback to count connections, which makes the tests more robust and consistent with the other testing_hook hooks in the codebase.


std::promise<std::unique_ptr<ProxyClient<messages::FooInterface>>> client_promise;
std::unique_ptr<ProxyClient<messages::FooInterface>> client;
std::thread thread;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In commit "test: add dedicated ListenConnections coverage" (d0cff01)

Other test class has a comment that may be useful here and below as well

    //! Thread variable should be after other struct members so the thread does
    //! not start until the other members are initialized.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, added this comment where appropriate.

Comment thread test/mp/test/test.cpp Outdated
#include <mp/test/foo.capnp.h>
#include <mp/test/foo.capnp.proxy.h>

#include <string.h> // NOLINT(modernize-deprecated-headers)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In commit "test: add dedicated ListenConnections coverage" (d0cff01)

Unclear what reason is for this change. Probably should be a separate commit if it's necessary

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, removed this header.

Comment thread include/mp/version.h Outdated
//! updated, noting any significant or incompatible changes made since the
//! previous version.
#define MP_MAJOR_VERSION 10
#define MP_MAJOR_VERSION 11
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In commit "test: add dedicated ListenConnections coverage" (d0cff01)

Looks like this needs to be rebased since current version is already 11, and this should be bumping from 11>12.

Also would suggest bumping version in an initial new commit before the other changes, instead of alongside them. (For example see b15d63e from #274.) Splitting should the make the PR a little easier to review, and also allow creating a v11.0 tag that contains every commit MP_MAJOR_VERSION 11 set, and points at a merge.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, split the version bumping from the other changes, and included it in the initial commit before other commits

Comment thread include/mp/proxy-io.h
}

template <typename InitInterface, typename InitImpl>
void _Listen(EventLoop& loop, InitImpl& init, const std::shared_ptr<ListenState>& state)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In commit "proxy: add local connection limit to ListenConnections" (7e51ab2)

This function used to have a code comment "//! Given connection receiver and an init object, handle..." that looks like it became detached. Would be good to move it down next this function and update it ("Given init object and a state object containing a connection receiver, handle...").

It might also be clearer to turn this into a ListenState method. (And if doing that, rename ListenState to Listener, rename listener to m_receiver, rename state to self, make ListenAtCapacity another method, etc). Just an idea though, I haven't studied this code enough yet.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. The comment has been moved down above the _Listen function, and its wording has been updated to reflect the new init and state parameters as suggested.

Regarding refactoring _Listen into a member method of ListenState. I decided to keep it as a free function instead.

The _Listen template function registers asynchronous handlers via ptr->accept().then(...) , turning it into a method would require ListenState to inherit from std::enable_shared_from_this to pass self safely into the lambdas.

I think keeping it as a simple struct/free function avoids this boilerplate and remains consistent with the other helper functions in proxy-io.h

Comment thread include/mp/proxy-io.h Outdated

template <typename InitInterface, typename InitImpl>
void _Listen(EventLoop& loop, kj::Own<kj::ConnectionReceiver>&& listener, InitImpl& init)
void _ServeAccepted(EventLoop& loop, InitImpl& init, const std::shared_ptr<ListenState>& state, kj::Own<kj::AsyncIoStream>&& stream)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In commit "proxy: add local connection limit to ListenConnections" (7e51ab2)

Having an extra _ServeAccepted function that is only called one place seems to make this code more complicated for little benefit. Any reason not to call _Serve directly from _Listen like previous code did?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this is redundant. I removed the _ServeAccepted helper function entirely and placed the logic directly inside the _Listen callback. This reduces indirection and keeps the flow clean.

KJ_EXPECT(client1->client->add(1, 2) == 3);

auto client2 = std::make_unique<ClientSetup>(setup.listener.Connect());
std::this_thread::sleep_for(std::chrono::milliseconds(100));
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In commit "proxy: add local connection limit to ListenConnections" (7e51ab2)

Note: Sleep seems racy but harmless since that it shouldn't ever cause the test to fail when it should succeed, only to potentially succeed when it should fail. I can't think of a better way to check that a connection is NOT accepted other than maybe adding a testing_hook that exposes listening state, though so this might be the best approach for now.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since asserting that the second connection was not accepted inherently requires a brief window of time to pass,

The short sleep seems to be the most straightforward and reliable approach to test this behavior without adding state tracking hooks. I've left the sleep in place.

Comment thread include/mp/proxy-io.h

inline std::unique_ptr<EventLoopRef> _MakeCappedListenerRef(EventLoop& loop, const ListenState& state)
{
return state.max_connections && *state.max_connections > 0 ? std::make_unique<EventLoopRef>(loop) : nullptr;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In commit "proxy: add local connection limit to ListenConnections" (7e51ab2)

Would be helpful to have a code comment explaining this logic. I don't think I understand why it avoiding creating an EventLoopRef if there is no connection limit. I'm also not clear on why passing accept_ref around separately is needed. Naively I think I'd just expect ListenState to have an EventLoopRef member.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I added explanatory comments to both _MakeCappedListenerRef. I avoid holding EventLoopRef in the uncapped case because the loop is always listening, and holding a ref would prevent it from automatically exiting when all client connections close.

Passing accept_ref in the callback instead of storing it as a member would bind the ref's lifetime directly to the active accept() task. This means that when the listener reaches capacity, we return early and don't register a new task, which automatically destroys the ref and lets the loop exit.

@DrahtBot DrahtBot requested a review from ryanofsky June 2, 2026 17:04
@enirox001 enirox001 force-pushed the 04-26-ipc-local-connection-limit branch from 7e51ab2 to 458cd5a Compare June 3, 2026 15:05
Add a separate listen_tests.cpp file with reusable UnixListener, ClientSetup, and ListenSetup helpers for exercising ListenConnections() with real Unix domain sockets.

The new test covers the baseline behavior that ListenConnections() accepts an incoming connection and serves requests over it. Keeping this coverage separate from the existing general proxy tests makes the socket listener setup easier to review and provides a clearer place to extend listener-specific behavior in follow-up commits.
@enirox001 enirox001 force-pushed the 04-26-ipc-local-connection-limit branch from 458cd5a to cc36777 Compare June 3, 2026 15:48
Add an optional max_connections parameter to ListenConnections() so a listener can stop accepting new connections after reaching a local connection cap and resume accepting after an existing connection disconnects.

Implement the limit with listener-local state tracking the listening socket, maximum number of active connections, and whether an async accept() has already been posted. This keeps the limit scoped to the individual listener instead of introducing global EventLoop state.

Extend the dedicated listener test coverage to verify that with max_connections=1 the first client is accepted normally, a second client is not accepted while the first remains connected, and the second client is accepted after the first disconnects.
@enirox001 enirox001 force-pushed the 04-26-ipc-local-connection-limit branch from cc36777 to 1e8776f Compare June 3, 2026 15:56
@enirox001
Copy link
Copy Markdown
Contributor Author

Thanks for the review @ryanofsky, made the following changes in the latest push:

  • Added the KJ_LOG for verbose logging
  • Added a testing_hook_connected hook to ListenSetup.
  • Added thread comments where appropriate
  • Removed the redundant string header
  • Added an initial commit to split the version bumping from other changes
  • Added back the removed comment in the _Listen helper function
  • Removed _ServeAccepted helper function and placed logic in _Listen callback
  • Added explanatory comments to _MakeCappedListenerRef

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants