Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions src/server/mcp_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -818,11 +818,16 @@ void McpServer::onConnectionLifecycleEvent(network::Connection* connection,
assert(main_dispatcher_ && main_dispatcher_->isThreadSafe() &&
"onConnectionLifecycleEvent off dispatcher");
// Runs in dispatcher thread via the per-connection adapter.
//
// Only close events reach us here in practice. ConnectionImpl does not
// raise Connected for server-accepted sockets (it is raised only from
// the client-side connect-completion path), so a Connected case would
// be dead code. The matching connections_total/connections_active
// increment lives in onNewConnection, which is the server-side
// equivalent checkpoint.
switch (event) {
case network::ConnectionEvent::Connected:
case network::ConnectionEvent::ConnectedZeroRtt:
server_stats_.connections_total++;
server_stats_.connections_active++;
return;
case network::ConnectionEvent::RemoteClose:
case network::ConnectionEvent::LocalClose:
Expand Down Expand Up @@ -1312,8 +1317,20 @@ void McpServer::onNewConnection(network::ConnectionPtr&& connection) {
// Set current connection for request processing context
current_connection_ = conn_ptr;

// Update connection count
// Update connection count.
//
// We bump the public stats here rather than wait for a Connected event
// via the per-connection lifecycle adapter, because ConnectionImpl
// never raises Connected for server-accepted sockets -- the event is
// only raised on the client-side connect-completion path. onNewConnection
// is the equivalent checkpoint for server: the socket is already
// accepted and the filter chain is built by the time we run here, and
// this call is on the dispatcher thread, so the atomic increments land
// on the same thread that the decrement in onConnectionLifecycleEvent
// runs on.
++num_connections_;
server_stats_.connections_total++;
server_stats_.connections_active++;

// Store the connection to keep it alive
// Following production pattern: server owns connections in dispatcher thread
Expand Down
120 changes: 120 additions & 0 deletions tests/integration/test_mcp_server_connection_lifecycle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,16 @@ class McpServerConnectionLifecycleTest : public ::testing::Test {

ASSERT_TRUE(waitForListenerReady(port_, 5s))
<< "Server did not begin accepting on port " << port_;

// The readiness probe in waitForListenerReady opens a TCP
// connection and closes it. The kernel accepts the SYN and queues
// the connection; the server-side onNewConnection may run slightly
// after this point and bump connections_total / connections_active.
// Wait for the counters to stop moving so tests that take a
// baseline snapshot see a stable starting value. Polling for
// "unchanged for one tick" is cheaper than guessing a sleep long
// enough to cover the slowest machine.
waitForConnectionStatsIdle(200ms, 2s);
}

void TearDown() override {
Expand Down Expand Up @@ -210,11 +220,121 @@ class McpServerConnectionLifecycleTest : public ::testing::Test {
return false;
}

// Poll until connections_total has not changed for `quiet_for` or
// the overall budget elapses. Used after SetUp to let any in-flight
// probe accepts land before a test snapshots the baseline.
void waitForConnectionStatsIdle(std::chrono::milliseconds quiet_for,
std::chrono::milliseconds budget) {
const auto deadline = std::chrono::steady_clock::now() + budget;
uint64_t last = server_->getServerStats().connections_total.load();
auto stable_since = std::chrono::steady_clock::now();
while (std::chrono::steady_clock::now() < deadline) {
std::this_thread::sleep_for(10ms);
uint64_t now_total = server_->getServerStats().connections_total.load();
if (now_total != last) {
last = now_total;
stable_since = std::chrono::steady_clock::now();
continue;
}
if (std::chrono::steady_clock::now() - stable_since >= quiet_for) {
return;
}
}
}

// Wait for connections_active to match `expected` within the budget.
// Poll because the counter is updated on the dispatcher thread in
// response to async events.
bool waitForActiveConnections(uint64_t expected,
std::chrono::milliseconds budget) {
const auto deadline = std::chrono::steady_clock::now() + budget;
while (std::chrono::steady_clock::now() < deadline) {
if (server_->getServerStats().connections_active.load() == expected) {
return true;
}
std::this_thread::sleep_for(10ms);
}
return server_->getServerStats().connections_active.load() == expected;
}

uint16_t port_{0};
std::unique_ptr<server::McpServer> server_;
std::thread server_thread_;
};

// connections_active / connections_total are maintained correctly for
// server-accepted TCP sockets. ConnectionImpl does not raise Connected
// for server sockets -- it is only raised on the client-side
// connect-completion path -- so the increment can't piggyback on the
// lifecycle adapter's Connected branch. Instead it lives in
// McpServer::onNewConnection. This test pins that: the counter has to
// go up on connect and back down on close, symmetrically, across
// multiple concurrent connections.
TEST_F(McpServerConnectionLifecycleTest, AcceptedConnectionsAreCounted) {
// Snapshot the baseline rather than assume zero. waitForListenerReady
// in SetUp opens and closes a probe socket, and the kernel may queue
// that as a server-accepted connection whose close does not reach
// onConnectionLifecycleEvent within the test budget (ConnectionImpl
// on a server socket registers the read-EOF listener only once the
// filter chain is wired up). Measuring deltas against whatever the
// server has already observed keeps the test honest without depending
// on the probe's teardown.
const auto& stats = server_->getServerStats();
const uint64_t base_active = stats.connections_active.load();
const uint64_t base_total = stats.connections_total.load();

auto a = openClient();
auto b = openClient();
auto c = openClient();
ASSERT_NE(a, nullptr);
ASSERT_NE(b, nullptr);
ASSERT_NE(c, nullptr);

// onNewConnection runs on the server's dispatcher thread in response
// to the listener's accept; poll until each of the three accepts has
// been counted (relative to the baseline).
ASSERT_TRUE(waitForActiveConnections(base_active + 3u, 2s))
<< "connections_active never reached baseline+3 after three "
"client connects";
EXPECT_GE(stats.connections_total.load(), base_total + 3u);

// Shut the server down. The drain path closes each live connection
// on the dispatcher thread (LocalClose), which runs through
// onConnectionLifecycleEvent and decrements connections_active. If
// the pre-fix code path were still in place the counter would not
// have been incremented on accept, so the drain's N decrements would
// wrap it into UINT64_MAX; observing it return cleanly to zero is a
// direct proof that increment and decrement agree.
//
// We use the drain path rather than client-initiated close because a
// raw TCP client that never sent any application bytes may not drive
// the server-side ConnectionImpl to observe the FIN promptly -- that
// is an HTTP filter behavior we don't want to couple the stats test
// to. The drain is deterministic and dispatcher-driven.
server_->shutdown();
if (server_thread_.joinable()) {
server_thread_.join();
}

EXPECT_EQ(stats.connections_active.load(), 0u)
<< "connections_active did not return to zero after server drain";

// total is monotonic -- never decremented -- and must reflect every
// accept we observed, regardless of what closed.
EXPECT_GE(stats.connections_total.load(), base_total + 3u);

// Drop the clients so the test process releases their fds cleanly.
// The server is already gone, so their reads would return EOF.
a->close();
a.reset();
b->close();
b.reset();
c->close();
c.reset();

// TearDown's shutdown() will no-op on the already-stopped server.
}

// Repeated connect/close cycles don't wedge the listener. Each cycle
// relies on the lifecycle adapter firing RemoteClose on the dispatcher
// and erasing the connection from active_connections_ /
Expand Down
Loading