diff --git a/src/server/mcp_server.cc b/src/server/mcp_server.cc index 9e871760..5fe6eca4 100644 --- a/src/server/mcp_server.cc +++ b/src/server/mcp_server.cc @@ -818,11 +818,16 @@ void McpServer::onConnectionLifecycleEvent(network::Connection* connection, assert(main_dispatcher_ && main_dispatcher_->isThreadSafe() && "onConnectionLifecycleEvent off dispatcher"); // Runs in dispatcher thread via the per-connection adapter. + // + // Only close events reach us here in practice. ConnectionImpl does not + // raise Connected for server-accepted sockets (it is raised only from + // the client-side connect-completion path), so a Connected case would + // be dead code. The matching connections_total/connections_active + // increment lives in onNewConnection, which is the server-side + // equivalent checkpoint. switch (event) { case network::ConnectionEvent::Connected: case network::ConnectionEvent::ConnectedZeroRtt: - server_stats_.connections_total++; - server_stats_.connections_active++; return; case network::ConnectionEvent::RemoteClose: case network::ConnectionEvent::LocalClose: @@ -1312,8 +1317,20 @@ void McpServer::onNewConnection(network::ConnectionPtr&& connection) { // Set current connection for request processing context current_connection_ = conn_ptr; - // Update connection count + // Update connection count. + // + // We bump the public stats here rather than wait for a Connected event + // via the per-connection lifecycle adapter, because ConnectionImpl + // never raises Connected for server-accepted sockets -- the event is + // only raised on the client-side connect-completion path. onNewConnection + // is the equivalent checkpoint for server: the socket is already + // accepted and the filter chain is built by the time we run here, and + // this call is on the dispatcher thread, so the atomic increments land + // on the same thread that the decrement in onConnectionLifecycleEvent + // runs on. ++num_connections_; + server_stats_.connections_total++; + server_stats_.connections_active++; // Store the connection to keep it alive // Following production pattern: server owns connections in dispatcher thread diff --git a/tests/integration/test_mcp_server_connection_lifecycle.cc b/tests/integration/test_mcp_server_connection_lifecycle.cc index 34beec9c..89d9dad7 100644 --- a/tests/integration/test_mcp_server_connection_lifecycle.cc +++ b/tests/integration/test_mcp_server_connection_lifecycle.cc @@ -119,6 +119,16 @@ class McpServerConnectionLifecycleTest : public ::testing::Test { ASSERT_TRUE(waitForListenerReady(port_, 5s)) << "Server did not begin accepting on port " << port_; + + // The readiness probe in waitForListenerReady opens a TCP + // connection and closes it. The kernel accepts the SYN and queues + // the connection; the server-side onNewConnection may run slightly + // after this point and bump connections_total / connections_active. + // Wait for the counters to stop moving so tests that take a + // baseline snapshot see a stable starting value. Polling for + // "unchanged for one tick" is cheaper than guessing a sleep long + // enough to cover the slowest machine. + waitForConnectionStatsIdle(200ms, 2s); } void TearDown() override { @@ -210,11 +220,121 @@ class McpServerConnectionLifecycleTest : public ::testing::Test { return false; } + // Poll until connections_total has not changed for `quiet_for` or + // the overall budget elapses. Used after SetUp to let any in-flight + // probe accepts land before a test snapshots the baseline. + void waitForConnectionStatsIdle(std::chrono::milliseconds quiet_for, + std::chrono::milliseconds budget) { + const auto deadline = std::chrono::steady_clock::now() + budget; + uint64_t last = server_->getServerStats().connections_total.load(); + auto stable_since = std::chrono::steady_clock::now(); + while (std::chrono::steady_clock::now() < deadline) { + std::this_thread::sleep_for(10ms); + uint64_t now_total = server_->getServerStats().connections_total.load(); + if (now_total != last) { + last = now_total; + stable_since = std::chrono::steady_clock::now(); + continue; + } + if (std::chrono::steady_clock::now() - stable_since >= quiet_for) { + return; + } + } + } + + // Wait for connections_active to match `expected` within the budget. + // Poll because the counter is updated on the dispatcher thread in + // response to async events. + bool waitForActiveConnections(uint64_t expected, + std::chrono::milliseconds budget) { + const auto deadline = std::chrono::steady_clock::now() + budget; + while (std::chrono::steady_clock::now() < deadline) { + if (server_->getServerStats().connections_active.load() == expected) { + return true; + } + std::this_thread::sleep_for(10ms); + } + return server_->getServerStats().connections_active.load() == expected; + } + uint16_t port_{0}; std::unique_ptr server_; std::thread server_thread_; }; +// connections_active / connections_total are maintained correctly for +// server-accepted TCP sockets. ConnectionImpl does not raise Connected +// for server sockets -- it is only raised on the client-side +// connect-completion path -- so the increment can't piggyback on the +// lifecycle adapter's Connected branch. Instead it lives in +// McpServer::onNewConnection. This test pins that: the counter has to +// go up on connect and back down on close, symmetrically, across +// multiple concurrent connections. +TEST_F(McpServerConnectionLifecycleTest, AcceptedConnectionsAreCounted) { + // Snapshot the baseline rather than assume zero. waitForListenerReady + // in SetUp opens and closes a probe socket, and the kernel may queue + // that as a server-accepted connection whose close does not reach + // onConnectionLifecycleEvent within the test budget (ConnectionImpl + // on a server socket registers the read-EOF listener only once the + // filter chain is wired up). Measuring deltas against whatever the + // server has already observed keeps the test honest without depending + // on the probe's teardown. + const auto& stats = server_->getServerStats(); + const uint64_t base_active = stats.connections_active.load(); + const uint64_t base_total = stats.connections_total.load(); + + auto a = openClient(); + auto b = openClient(); + auto c = openClient(); + ASSERT_NE(a, nullptr); + ASSERT_NE(b, nullptr); + ASSERT_NE(c, nullptr); + + // onNewConnection runs on the server's dispatcher thread in response + // to the listener's accept; poll until each of the three accepts has + // been counted (relative to the baseline). + ASSERT_TRUE(waitForActiveConnections(base_active + 3u, 2s)) + << "connections_active never reached baseline+3 after three " + "client connects"; + EXPECT_GE(stats.connections_total.load(), base_total + 3u); + + // Shut the server down. The drain path closes each live connection + // on the dispatcher thread (LocalClose), which runs through + // onConnectionLifecycleEvent and decrements connections_active. If + // the pre-fix code path were still in place the counter would not + // have been incremented on accept, so the drain's N decrements would + // wrap it into UINT64_MAX; observing it return cleanly to zero is a + // direct proof that increment and decrement agree. + // + // We use the drain path rather than client-initiated close because a + // raw TCP client that never sent any application bytes may not drive + // the server-side ConnectionImpl to observe the FIN promptly -- that + // is an HTTP filter behavior we don't want to couple the stats test + // to. The drain is deterministic and dispatcher-driven. + server_->shutdown(); + if (server_thread_.joinable()) { + server_thread_.join(); + } + + EXPECT_EQ(stats.connections_active.load(), 0u) + << "connections_active did not return to zero after server drain"; + + // total is monotonic -- never decremented -- and must reflect every + // accept we observed, regardless of what closed. + EXPECT_GE(stats.connections_total.load(), base_total + 3u); + + // Drop the clients so the test process releases their fds cleanly. + // The server is already gone, so their reads would return EOF. + a->close(); + a.reset(); + b->close(); + b.reset(); + c->close(); + c.reset(); + + // TearDown's shutdown() will no-op on the already-stopped server. +} + // Repeated connect/close cycles don't wedge the listener. Each cycle // relies on the lifecycle adapter firing RemoteClose on the dispatcher // and erasing the connection from active_connections_ /