Skip to content

Commit d7db836

Browse files
gophergogocaleb2h
authored andcommitted
Bind server connection callbacks per connection via adapter (#212)
McpServer registered itself on every connection and relied on a global current_connection_ pointer to identify which connection fired a close event. That pointer was stale whenever more than one connection was active, so session teardown ran against the wrong session (or none). Introduce ConnectionLifecycleCallbacks — a tiny adapter bound to a single (server, connection) pair. Each connection gets its own instance, held in lifecycle_callbacks_ until close. The close path now knows exactly which connection is dying and tears down its session directly. Both the connection and the adapter are handed to the dispatcher's deferred-delete queue on close, since we are inside the adapter's own onEvent() when we drop them. Adds a dispatcher-thread assert at the entry so any off-thread caller trips immediately.
1 parent 374acc7 commit d7db836

2 files changed

Lines changed: 112 additions & 29 deletions

File tree

include/mcp/server/mcp_server.h

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -827,6 +827,45 @@ class McpServer : public application::ApplicationBase,
827827
void stopBackgroundTasks();
828828

829829
private:
830+
// Per-connection lifecycle callback adapter.
831+
//
832+
// Why: ConnectionCallbacks::onEvent carries no Connection identity. Before
833+
// this adapter, McpServer registered itself on every connection and then
834+
// fell back to a global `current_connection_` pointer to figure out which
835+
// one closed — which is stale whenever multiple connections are active.
836+
// Each adapter binds (server, connection) at construction so the close
837+
// path knows exactly which connection is dying.
838+
//
839+
// Also DeferredDeletable: the adapter is registered on the connection, so
840+
// destroying it synchronously while we are inside its own onEvent() would
841+
// be a use-after-free. The server hands it to dispatcher.deferredDelete()
842+
// alongside the connection itself.
843+
class ConnectionLifecycleCallbacks : public network::ConnectionCallbacks,
844+
public event::DeferredDeletable {
845+
public:
846+
ConnectionLifecycleCallbacks(McpServer& server,
847+
network::Connection* connection)
848+
: server_(server), connection_(connection) {}
849+
850+
void onEvent(network::ConnectionEvent event) override {
851+
server_.onConnectionLifecycleEvent(connection_, event);
852+
}
853+
void onAboveWriteBufferHighWatermark() override {}
854+
void onBelowWriteBufferLowWatermark() override {}
855+
856+
private:
857+
McpServer& server_;
858+
network::Connection* connection_;
859+
};
860+
861+
// Dispatcher-thread only: all entries added/removed inside dispatcher.
862+
std::unordered_map<network::Connection*,
863+
std::unique_ptr<ConnectionLifecycleCallbacks>>
864+
lifecycle_callbacks_;
865+
866+
void onConnectionLifecycleEvent(network::Connection* connection,
867+
network::ConnectionEvent event);
868+
830869
// Internal callbacks class to bridge McpProtocolCallbacks to McpServer
831870
// Following production pattern: separate callback interface from main class
832871
class ServerProtocolCallbacks : public McpProtocolCallbacks {

src/server/mcp_server.cc

Lines changed: 73 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include "mcp/server/mcp_server.h"
88

99
#include <algorithm>
10+
#include <cassert>
1011
#include <future>
1112
#include <sstream>
1213

@@ -762,47 +763,85 @@ void McpServer::onResponse(const jsonrpc::Response& response) {
762763
}
763764

764765
void McpServer::onConnectionEvent(network::ConnectionEvent event) {
765-
// Handle connection events
766+
// Stats-only path. Per-connection cleanup lives in
767+
// onConnectionLifecycleEvent, invoked by the ConnectionLifecycleCallbacks
768+
// adapter which carries the closing connection's identity. This function
769+
// still runs for stdio transports (reached via ServerProtocolCallbacks)
770+
// where there is no active_connections_ entry to unwind.
766771
switch (event) {
767772
case network::ConnectionEvent::Connected:
768773
case network::ConnectionEvent::ConnectedZeroRtt:
769774
server_stats_.connections_total++;
770775
server_stats_.connections_active++;
771776
break;
772-
773777
case network::ConnectionEvent::RemoteClose:
774778
case network::ConnectionEvent::LocalClose:
775779
server_stats_.connections_active--;
780+
break;
781+
}
782+
}
776783

777-
// Clean up session for this connection
778-
if (session_manager_ && current_connection_) {
779-
// Remove session associated with the closed connection
780-
session_manager_->removeSessionByConnection(current_connection_);
781-
782-
// Remove from connection-session mapping
783-
// Following production pattern: use lock since this map may be accessed
784-
// from multiple dispatcher threads
785-
{
786-
std::lock_guard<std::mutex> lock(connection_sessions_mutex_);
787-
connection_sessions_.erase(current_connection_);
788-
}
784+
void McpServer::onConnectionLifecycleEvent(network::Connection* connection,
785+
network::ConnectionEvent event) {
786+
// Connection events fire from the connection's own callback loop, which is
787+
// the dispatcher thread. Enforcing this here catches any accidental
788+
// off-thread callers introduced later.
789+
assert(main_dispatcher_ && main_dispatcher_->isThreadSafe() &&
790+
"onConnectionLifecycleEvent off dispatcher");
791+
// Runs in dispatcher thread via the per-connection adapter.
792+
switch (event) {
793+
case network::ConnectionEvent::Connected:
794+
case network::ConnectionEvent::ConnectedZeroRtt:
795+
server_stats_.connections_total++;
796+
server_stats_.connections_active++;
797+
return;
798+
case network::ConnectionEvent::RemoteClose:
799+
case network::ConnectionEvent::LocalClose:
800+
break;
801+
}
789802

790-
// Remove connection from active list
791-
// Following production pattern: all in dispatcher thread, no mutex
792-
// needed
793-
active_connections_.remove_if(
794-
[this](const network::ConnectionPtr& conn) {
795-
return conn.get() == current_connection_;
796-
});
803+
server_stats_.connections_active--;
797804

798-
// Clear the current connection
799-
current_connection_ = nullptr;
805+
// Tear down session state keyed by the actual closing connection, not a
806+
// global current_connection_ which may point at a different session.
807+
if (session_manager_) {
808+
session_manager_->removeSessionByConnection(connection);
809+
}
810+
{
811+
std::lock_guard<std::mutex> lock(connection_sessions_mutex_);
812+
connection_sessions_.erase(connection);
813+
}
800814

801-
// Decrement connection count
802-
num_connections_--;
815+
// Hand both the ConnectionPtr and the lifecycle adapter to the dispatcher's
816+
// deferred-delete queue. We are currently inside the adapter's onEvent(),
817+
// and the connection is iterating its own callback list — destroying either
818+
// synchronously would be a use-after-free. deferredDelete drains after the
819+
// current callback stack unwinds.
820+
//
821+
// Skip container unwinding if we're past shutdown: during ~McpServer the
822+
// containers themselves are being destroyed, and mutating them from a
823+
// close-triggered callback would be UB. Shutdown already sets this flag
824+
// before the destructor starts tearing members down.
825+
if (server_running_) {
826+
for (auto it = active_connections_.begin();
827+
it != active_connections_.end(); ++it) {
828+
if (it->get() == connection) {
829+
main_dispatcher_->deferredDelete(std::move(*it));
830+
active_connections_.erase(it);
831+
break;
803832
}
804-
break;
833+
}
834+
auto cb_it = lifecycle_callbacks_.find(connection);
835+
if (cb_it != lifecycle_callbacks_.end()) {
836+
main_dispatcher_->deferredDelete(std::move(cb_it->second));
837+
lifecycle_callbacks_.erase(cb_it);
838+
}
839+
}
840+
841+
if (current_connection_ == connection) {
842+
current_connection_ = nullptr;
805843
}
844+
num_connections_--;
806845
}
807846

808847
void McpServer::onError(const Error& error) {
@@ -1223,9 +1262,14 @@ void McpServer::onNewConnection(network::ConnectionPtr&& connection) {
12231262
return;
12241263
}
12251264

1226-
// Add ourselves as connection callbacks to track lifecycle
1227-
// This allows us to clean up session when connection closes
1228-
connection->addConnectionCallbacks(*this);
1265+
// Register a per-connection lifecycle adapter so the close event tells us
1266+
// exactly which connection is dying. The adapter is held alive by the
1267+
// lifecycle_callbacks_ map until the close path hands it to the dispatcher
1268+
// for deferred deletion.
1269+
auto lifecycle_cb =
1270+
std::make_unique<ConnectionLifecycleCallbacks>(*this, conn_ptr);
1271+
connection->addConnectionCallbacks(*lifecycle_cb);
1272+
lifecycle_callbacks_[conn_ptr] = std::move(lifecycle_cb);
12291273

12301274
// Store connection-to-session mapping
12311275
// Following production pattern: listener owns connections

0 commit comments

Comments
 (0)