|
| 1 | +/** |
| 2 | + * Unit tests for mcp::filter::SseSessionRegistry. |
| 3 | + * |
| 4 | + * The registry is the piece that maps an SSE session ID to the long- |
| 5 | + * lived SSE-stream connection, so a short-lived POST /callback/{id} |
| 6 | + * handler can hand its JSON-RPC response back through the matching |
| 7 | + * stream. These tests verify, in order of complexity: |
| 8 | + * |
| 9 | + * 1. Pure state-machine behavior (unique IDs, removal, lookups) — no |
| 10 | + * real Connection needed, so we store a null pointer and never |
| 11 | + * call sendResponse on those slots. |
| 12 | + * 2. sendResponse against a real socketpair-backed ConnectionImpl — |
| 13 | + * we read the bytes off the peer IoHandle and confirm they arrived |
| 14 | + * on the wire, which exercises the filter/write path rather than |
| 15 | + * mocking it out. |
| 16 | + * |
| 17 | + * Dispatcher invariants: every registry method asserts that it runs on |
| 18 | + * the dispatcher thread, so every test body is wrapped in |
| 19 | + * executeInDispatcher() to keep that contract honest. |
| 20 | + */ |
| 21 | + |
| 22 | +#include <chrono> |
| 23 | +#include <string> |
| 24 | +#include <thread> |
| 25 | + |
| 26 | +#include <gtest/gtest.h> |
| 27 | + |
| 28 | +#include "mcp/buffer.h" |
| 29 | +#include "mcp/filter/sse_session_registry.h" |
| 30 | +#include "mcp/network/connection_impl.h" |
| 31 | +#include "mcp/network/socket_impl.h" |
| 32 | +#include "mcp/network/transport_socket.h" |
| 33 | +#include "mcp/stream_info/stream_info_impl.h" |
| 34 | + |
| 35 | +#include "../integration/real_io_test_base.h" |
| 36 | + |
| 37 | +namespace mcp { |
| 38 | +namespace filter { |
| 39 | +namespace { |
| 40 | + |
| 41 | +using namespace std::chrono_literals; |
| 42 | + |
| 43 | +class SseSessionRegistryTest : public test::RealIoTestBase { |
| 44 | + protected: |
| 45 | + // Build a server-side ConnectionImpl wrapped around one end of a real |
| 46 | + // TCP socketpair, plus the raw peer IoHandle so the test can observe |
| 47 | + // bytes that the connection writes. Must be called from the |
| 48 | + // dispatcher thread because socket creation and ConnectionImpl |
| 49 | + // construction both assert it. |
| 50 | + struct Endpoint { |
| 51 | + std::unique_ptr<network::ServerConnection> conn; |
| 52 | + network::IoHandlePtr peer; |
| 53 | + std::shared_ptr<stream_info::StreamInfo> stream_info; |
| 54 | + }; |
| 55 | + |
| 56 | + Endpoint makeLiveConnection() { |
| 57 | + auto pair = createSocketPair(); |
| 58 | + auto local = network::Address::parseInternetAddress("127.0.0.1", 0); |
| 59 | + auto remote = network::Address::parseInternetAddress("127.0.0.1", 0); |
| 60 | + auto socket = std::make_unique<network::ConnectionSocketImpl>( |
| 61 | + std::move(pair.first), local, remote); |
| 62 | + auto transport = std::make_unique<network::RawBufferTransportSocket>(); |
| 63 | + auto stream_info = std::make_shared<stream_info::StreamInfoImpl>(); |
| 64 | + auto conn = network::ConnectionImpl::createServerConnection( |
| 65 | + *dispatcher_, std::move(socket), std::move(transport), *stream_info); |
| 66 | + return Endpoint{std::move(conn), std::move(pair.second), |
| 67 | + std::move(stream_info)}; |
| 68 | + } |
| 69 | + |
| 70 | + // Read up to `budget` milliseconds of whatever the peer has buffered. |
| 71 | + // IoHandle::read fills an OwnedBuffer; we drain it to a std::string so |
| 72 | + // tests can string-match the payload the registry routed. |
| 73 | + std::string drainPeer(network::IoHandle& peer, |
| 74 | + std::chrono::milliseconds budget = 1000ms) { |
| 75 | + std::string out; |
| 76 | + const auto deadline = std::chrono::steady_clock::now() + budget; |
| 77 | + while (std::chrono::steady_clock::now() < deadline) { |
| 78 | + OwnedBuffer buf; |
| 79 | + auto r = peer.read(buf, /*max_length=*/4096); |
| 80 | + if (r.ok() && *r > 0) { |
| 81 | + out.append(buf.toString()); |
| 82 | + } else if (!out.empty()) { |
| 83 | + // Got something and nothing more is coming right now — good |
| 84 | + // enough. Bail before the full budget elapses. |
| 85 | + return out; |
| 86 | + } else { |
| 87 | + std::this_thread::sleep_for(5ms); |
| 88 | + } |
| 89 | + } |
| 90 | + return out; |
| 91 | + } |
| 92 | +}; |
| 93 | + |
| 94 | +// ---------------------------------------------------------------------- |
| 95 | +// State-machine tests (no live Connection needed). |
| 96 | +// ---------------------------------------------------------------------- |
| 97 | + |
| 98 | +TEST_F(SseSessionRegistryTest, RegisterReturnsUniqueIds) { |
| 99 | + executeInDispatcher([this]() { |
| 100 | + SseSessionRegistry registry(*dispatcher_); |
| 101 | + |
| 102 | + const std::string a = registry.registerSession(nullptr); |
| 103 | + const std::string b = registry.registerSession(nullptr); |
| 104 | + const std::string c = registry.registerSession(nullptr); |
| 105 | + |
| 106 | + EXPECT_NE(a, b); |
| 107 | + EXPECT_NE(b, c); |
| 108 | + EXPECT_NE(a, c); |
| 109 | + EXPECT_EQ(registry.sessionCount(), 3u); |
| 110 | + EXPECT_TRUE(registry.hasSession(a)); |
| 111 | + EXPECT_TRUE(registry.hasSession(b)); |
| 112 | + EXPECT_TRUE(registry.hasSession(c)); |
| 113 | + }); |
| 114 | +} |
| 115 | + |
| 116 | +TEST_F(SseSessionRegistryTest, RemoveSessionClearsLookup) { |
| 117 | + executeInDispatcher([this]() { |
| 118 | + SseSessionRegistry registry(*dispatcher_); |
| 119 | + const std::string id = registry.registerSession(nullptr); |
| 120 | + ASSERT_TRUE(registry.hasSession(id)); |
| 121 | + |
| 122 | + registry.removeSession(id); |
| 123 | + EXPECT_FALSE(registry.hasSession(id)); |
| 124 | + EXPECT_EQ(registry.sessionCount(), 0u); |
| 125 | + }); |
| 126 | +} |
| 127 | + |
| 128 | +TEST_F(SseSessionRegistryTest, RemoveUnknownSessionIsSafeNoop) { |
| 129 | + executeInDispatcher([this]() { |
| 130 | + SseSessionRegistry registry(*dispatcher_); |
| 131 | + // Filter destructor can call removeSession after an already-cleaned |
| 132 | + // entry (e.g. same session removed twice on teardown). Must not |
| 133 | + // crash or change state. |
| 134 | + registry.removeSession("never_registered"); |
| 135 | + EXPECT_EQ(registry.sessionCount(), 0u); |
| 136 | + }); |
| 137 | +} |
| 138 | + |
| 139 | +TEST_F(SseSessionRegistryTest, SendResponseUnknownSessionReturnsFalse) { |
| 140 | + executeInDispatcher([this]() { |
| 141 | + SseSessionRegistry registry(*dispatcher_); |
| 142 | + // Contract: the POST /callback handler expects sendResponse to |
| 143 | + // return false if the SSE stream is already gone, so it can drop |
| 144 | + // the response rather than pretending it was delivered. |
| 145 | + EXPECT_FALSE(registry.sendResponse("vanished", "{}")); |
| 146 | + }); |
| 147 | +} |
| 148 | + |
| 149 | +TEST_F(SseSessionRegistryTest, RemovedSessionNoLongerRoutes) { |
| 150 | + executeInDispatcher([this]() { |
| 151 | + SseSessionRegistry registry(*dispatcher_); |
| 152 | + const std::string id = registry.registerSession(nullptr); |
| 153 | + registry.removeSession(id); |
| 154 | + // After removal, any late POST /callback must not dereference the |
| 155 | + // stale pointer. Verified by: sendResponse returns false and |
| 156 | + // doesn't touch the null connection. |
| 157 | + EXPECT_FALSE(registry.sendResponse(id, "{}")); |
| 158 | + }); |
| 159 | +} |
| 160 | + |
| 161 | +TEST_F(SseSessionRegistryTest, MultipleSessionsAreIndependent) { |
| 162 | + executeInDispatcher([this]() { |
| 163 | + SseSessionRegistry registry(*dispatcher_); |
| 164 | + const std::string a = registry.registerSession(nullptr); |
| 165 | + const std::string b = registry.registerSession(nullptr); |
| 166 | + ASSERT_EQ(registry.sessionCount(), 2u); |
| 167 | + |
| 168 | + // Dropping one leaves the other intact — a server with multiple |
| 169 | + // live clients must not lose the rest when one disconnects. |
| 170 | + registry.removeSession(a); |
| 171 | + EXPECT_FALSE(registry.hasSession(a)); |
| 172 | + EXPECT_TRUE(registry.hasSession(b)); |
| 173 | + EXPECT_EQ(registry.sessionCount(), 1u); |
| 174 | + }); |
| 175 | +} |
| 176 | + |
| 177 | +// ---------------------------------------------------------------------- |
| 178 | +// Live-write test (real ConnectionImpl over a TCP socketpair). |
| 179 | +// ---------------------------------------------------------------------- |
| 180 | + |
| 181 | +TEST_F(SseSessionRegistryTest, SendResponseWritesBytesToRegisteredConnection) { |
| 182 | + network::IoHandlePtr peer; |
| 183 | + std::unique_ptr<network::ServerConnection> conn; |
| 184 | + std::shared_ptr<stream_info::StreamInfo> stream_info; |
| 185 | + std::string session_id; |
| 186 | + |
| 187 | + executeInDispatcher([&]() { |
| 188 | + auto ep = makeLiveConnection(); |
| 189 | + peer = std::move(ep.peer); |
| 190 | + conn = std::move(ep.conn); |
| 191 | + stream_info = std::move(ep.stream_info); |
| 192 | + |
| 193 | + SseSessionRegistry registry(*dispatcher_); |
| 194 | + session_id = registry.registerSession(conn.get()); |
| 195 | + |
| 196 | + const std::string payload = |
| 197 | + R"({"jsonrpc":"2.0","id":1,"result":{"ok":true}})"; |
| 198 | + EXPECT_TRUE(registry.sendResponse(session_id, payload)); |
| 199 | + |
| 200 | + // Registry lives only for the duration of this lambda; the |
| 201 | + // connection outlives it. That mirrors production where the |
| 202 | + // factory (owning the registry) and the connection have |
| 203 | + // independent lifetimes — removeSession must be called if the |
| 204 | + // registry outlives the connection, but here we tear the |
| 205 | + // registry down first, which is always safe. |
| 206 | + }); |
| 207 | + |
| 208 | + // On the dispatcher thread the write queued bytes but the actual |
| 209 | + // send happens asynchronously when the socket reports writable. |
| 210 | + // Drain with a short budget — the loopback pair is effectively |
| 211 | + // instant once the write event fires. |
| 212 | + const std::string wire = drainPeer(*peer, 1000ms); |
| 213 | + EXPECT_NE(wire.find(R"("result":{"ok":true})"), std::string::npos) |
| 214 | + << "expected JSON payload on peer socket, got: " << wire; |
| 215 | + |
| 216 | + // Close the connection on the dispatcher thread so teardown is |
| 217 | + // ordered correctly — freeing ConnectionImpl from the test thread |
| 218 | + // would trip the isThreadSafe() assert in its destructor. |
| 219 | + executeInDispatcher([&]() { |
| 220 | + conn->close(network::ConnectionCloseType::NoFlush); |
| 221 | + conn.reset(); |
| 222 | + }); |
| 223 | +} |
| 224 | + |
| 225 | +} // namespace |
| 226 | +} // namespace filter |
| 227 | +} // namespace mcp |
0 commit comments