-
Notifications
You must be signed in to change notification settings - Fork 51
proxy: add local connection limit to ListenConnections #269
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,7 +13,9 @@ | |
| #include <capnp/rpc-twoparty.h> | ||
|
|
||
| #include <assert.h> | ||
| #include <algorithm> | ||
| #include <condition_variable> | ||
| #include <cstdlib> | ||
| #include <functional> | ||
| #include <kj/function.h> | ||
| #include <map> | ||
|
|
@@ -356,6 +358,9 @@ class EventLoop | |
|
|
||
| //! Hook called on the worker thread just before returning results. | ||
| std::function<void()> testing_hook_async_request_done; | ||
|
|
||
| //! Hook called on the server thread when the client has connected. | ||
| std::function<void()> testing_hook_connected; | ||
| }; | ||
|
|
||
| //! Single element task queue used to handle recursive capnp calls. (If the | ||
|
|
@@ -831,8 +836,8 @@ std::unique_ptr<ProxyClient<InitInterface>> ConnectStream(EventLoop& loop, int f | |
| //! handles requests from the stream by calling the init object. Embed the | ||
| //! ProxyServer in a Connection object that is stored and erased if | ||
| //! disconnected. This should be called from the event loop thread. | ||
| template <typename InitInterface, typename InitImpl> | ||
| void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init) | ||
| template <typename InitInterface, typename InitImpl, typename OnDisconnect> | ||
| void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init, OnDisconnect&& on_disconnect) | ||
| { | ||
| loop.m_incoming_connections.emplace_front(loop, kj::mv(stream), [&](Connection& connection) { | ||
| // Disable deleter so proxy server object doesn't attempt to delete the | ||
|
|
@@ -842,23 +847,70 @@ void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init | |
| }); | ||
| auto it = loop.m_incoming_connections.begin(); | ||
| MP_LOG(loop, Log::Info) << "IPC server: socket connected."; | ||
| it->onDisconnect([&loop, it] { | ||
| if (loop.testing_hook_connected) loop.testing_hook_connected(); | ||
| it->onDisconnect([&loop, it, on_disconnect = std::forward<OnDisconnect>(on_disconnect)]() mutable { | ||
| MP_LOG(loop, Log::Info) << "IPC server: socket disconnected."; | ||
| loop.m_incoming_connections.erase(it); | ||
| on_disconnect(); | ||
| }); | ||
| } | ||
|
|
||
| //! Given connection receiver and an init object, handle incoming connections by | ||
| //! calling _Serve, to create ProxyServer objects and forward requests to the | ||
| //! init object. | ||
| struct ListenState | ||
| { | ||
| explicit ListenState(kj::Own<kj::ConnectionReceiver>&& listener_, std::optional<size_t> max_connections_) | ||
| : listener(kj::mv(listener_)), max_connections(max_connections_) {} | ||
|
|
||
| kj::Own<kj::ConnectionReceiver> listener; | ||
| std::optional<size_t> max_connections; | ||
| size_t active_connections{0}; | ||
| }; | ||
|
|
||
| template <typename InitInterface, typename InitImpl> | ||
| void _Listen(EventLoop& loop, InitImpl& init, const std::shared_ptr<ListenState>& state); | ||
|
|
||
| inline bool _ListenAtCapacity(const ListenState& state) | ||
| { | ||
| return state.max_connections && state.active_connections >= *state.max_connections; | ||
| } | ||
|
|
||
| //! Return an EventLoopRef if this is a capped listener, to keep the event loop | ||
| //! alive while waiting for connections. | ||
| //! | ||
| //! For uncapped listeners, we do not hold an EventLoopRef because the event | ||
| //! loop should automatically exit when the last active client disconnects (when | ||
| //! m_num_clients drops to 0). | ||
| //! | ||
| //! For capped listeners, we stop calling ptr->accept() when at capacity. Since | ||
| //! no accept task remains pending in the task set, we must hold an EventLoopRef | ||
| //! while listening to keep the event loop alive until the limit is reached. | ||
| inline std::unique_ptr<EventLoopRef> _MakeCappedListenerRef(EventLoop& loop, const ListenState& state) | ||
| { | ||
| return state.max_connections && *state.max_connections > 0 ? std::make_unique<EventLoopRef>(loop) : nullptr; | ||
| } | ||
|
|
||
| //! Given init object and a state object containing a connection receiver, handle | ||
| //! incoming connections by calling _Serve, to create ProxyServer objects and | ||
| //! forward requests to the init object. | ||
| template <typename InitInterface, typename InitImpl> | ||
| void _Listen(EventLoop& loop, kj::Own<kj::ConnectionReceiver>&& listener, InitImpl& init) | ||
| void _Listen(EventLoop& loop, InitImpl& init, const std::shared_ptr<ListenState>& state) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In commit "proxy: add local connection limit to ListenConnections" (7e51ab2) This function used to have a code comment "//! Given connection receiver and an init object, handle..." that looks like it became detached. Would be good to move it down next this function and update it ("Given init object and a state object containing a connection receiver, handle..."). It might also be clearer to turn this into a
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. The comment has been moved down above the Regarding refactoring The I think keeping it as a simple struct/free function avoids this boilerplate and remains consistent with the other helper functions in |
||
| { | ||
| auto* ptr = listener.get(); | ||
| if (_ListenAtCapacity(*state)) return; | ||
|
|
||
| auto* ptr = state->listener.get(); | ||
| // Bind the EventLoopRef lifetime to the pending accept task. If the listener | ||
| // reaches capacity, _Listen returns early, destroying the accept_ref and | ||
| // letting the event loop client count decrement. | ||
| auto accept_ref{_MakeCappedListenerRef(loop, *state)}; | ||
| loop.m_task_set->add(ptr->accept().then( | ||
| [&loop, &init, listener = kj::mv(listener)](kj::Own<kj::AsyncIoStream>&& stream) mutable { | ||
| _Serve<InitInterface>(loop, kj::mv(stream), init); | ||
| _Listen<InitInterface>(loop, kj::mv(listener), init); | ||
| [&loop, &init, state, accept_ref = std::move(accept_ref)](kj::Own<kj::AsyncIoStream>&& stream) mutable { | ||
| ++state->active_connections; | ||
| _Serve<InitInterface>(loop, kj::mv(stream), init, [&loop, &init, state] { | ||
| const bool resume_accept{_ListenAtCapacity(*state)}; | ||
| assert(state->active_connections > 0); | ||
| --state->active_connections; | ||
| if (resume_accept) _Listen<InitInterface>(loop, init, state); | ||
| }); | ||
| _Listen<InitInterface>(loop, init, state); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With the NOTE: if you apply this patch here (comment), then it will be needed here because we can't rely on the second |
||
| })); | ||
| } | ||
|
|
||
|
|
@@ -868,18 +920,21 @@ template <typename InitInterface, typename InitImpl> | |
| void ServeStream(EventLoop& loop, int fd, InitImpl& init) | ||
| { | ||
| _Serve<InitInterface>( | ||
| loop, loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP), init); | ||
| loop, | ||
| loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP), | ||
| init, | ||
| [] {}); | ||
| } | ||
|
|
||
| //! Given listening socket file descriptor and an init object, handle incoming | ||
| //! connections and requests by calling methods on the Init object. | ||
| template <typename InitInterface, typename InitImpl> | ||
| void ListenConnections(EventLoop& loop, int fd, InitImpl& init) | ||
| void ListenConnections(EventLoop& loop, int fd, InitImpl& init, std::optional<size_t> max_connections = std::nullopt) | ||
| { | ||
| loop.sync([&]() { | ||
| _Listen<InitInterface>(loop, | ||
| _Listen<InitInterface>(loop, init, std::make_shared<ListenState>( | ||
| loop.m_io_context.lowLevelProvider->wrapListenSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP), | ||
| init); | ||
| max_connections)); | ||
| }); | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In commit "proxy: add local connection limit to ListenConnections" (7e51ab2)
Would be helpful to have a code comment explaining this logic. I don't think I understand why it avoiding creating an EventLoopRef if there is no connection limit. I'm also not clear on why passing accept_ref around separately is needed. Naively I think I'd just expect ListenState to have an EventLoopRef member.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I added explanatory comments to both
_MakeCappedListenerRef. I avoid holdingEventLoopRefin the uncapped case because the loop is always listening, and holding arefwould prevent it from automatically exiting when all client connections close.Passing
accept_refin the callback instead of storing it as a member would bind the ref's lifetime directly to the activeaccept()task. This means that when the listener reaches capacity, we return early and don't register a new task, which automatically destroys the ref and lets the loop exit.