|
| 1 | +/** |
| 2 | + * Real-IO integration test for the SSE server transport. |
| 3 | + * |
| 4 | + * Verifies the first leg of the round-trip promised in PR #215's test |
| 5 | + * plan: a GET on the configured SSE path returns an HTTP 200 stream that |
| 6 | + * opens with an `event: endpoint` frame carrying a /callback/{id} URL |
| 7 | + * the client can POST to. That exercises: |
| 8 | + * |
| 9 | + * - HttpSseFilterChainFactory server-mode chain construction |
| 10 | + * - HttpSseJsonRpcProtocolFilter's `onHeaders` handshake path, which |
| 11 | + * writes the HTTP prelude + the endpoint event inline via |
| 12 | + * connection().write() |
| 13 | + * - SseSessionRegistry::registerSession being called against the live |
| 14 | + * server connection |
| 15 | + * |
| 16 | + * Design: |
| 17 | + * - No McpServer bootstrap; the factory + a ConnectionImpl around a |
| 18 | + * real TCP socketpair is the smallest harness that exercises the |
| 19 | + * real write path (not a mock) while keeping the test self-contained. |
| 20 | + * - Dispatcher-thread invariant: every mutation of the connection and |
| 21 | + * filter chain runs inside executeInDispatcher() because |
| 22 | + * ConnectionImpl's lifecycle methods assert isThreadSafe(). |
| 23 | + * - Cleanup closes the connection on the dispatcher thread too — tearing |
| 24 | + * down ConnectionImpl from the test thread would trip its destructor |
| 25 | + * assert. |
| 26 | + * |
| 27 | + * The POST /callback/{id} → 202 → response-routed-through-SSE leg of the |
| 28 | + * round-trip requires either a full McpServer bootstrap or a test-only |
| 29 | + * connection-tracking filter and is tracked as a follow-up. |
| 30 | + */ |
| 31 | + |
| 32 | +#include <chrono> |
| 33 | +#include <regex> |
| 34 | +#include <string> |
| 35 | +#include <thread> |
| 36 | + |
| 37 | +#include <gtest/gtest.h> |
| 38 | + |
| 39 | +#include "mcp/buffer.h" |
| 40 | +#include "mcp/filter/http_sse_filter_chain_factory.h" |
| 41 | +#include "mcp/mcp_connection_manager.h" |
| 42 | +#include "mcp/network/connection_impl.h" |
| 43 | +#include "mcp/network/socket_impl.h" |
| 44 | +#include "mcp/network/transport_socket.h" |
| 45 | +#include "mcp/stream_info/stream_info_impl.h" |
| 46 | +#include "mcp/types.h" |
| 47 | + |
| 48 | +#include "real_io_test_base.h" |
| 49 | + |
| 50 | +namespace mcp { |
| 51 | +namespace integration { |
| 52 | +namespace { |
| 53 | + |
| 54 | +using namespace std::chrono_literals; |
| 55 | + |
| 56 | +// Minimal McpProtocolCallbacks that just records what it saw. The SSE |
| 57 | +// handshake path never reaches these methods (onHeaders writes the |
| 58 | +// endpoint event before any JSON-RPC parsing happens), so they're here |
| 59 | +// only to satisfy the interface. |
| 60 | +class RecordingCallbacks : public McpProtocolCallbacks { |
| 61 | + public: |
| 62 | + void onRequest(const jsonrpc::Request& request) override { |
| 63 | + requests_.push_back(request); |
| 64 | + } |
| 65 | + void onNotification(const jsonrpc::Notification& n) override { |
| 66 | + notifications_.push_back(n); |
| 67 | + } |
| 68 | + void onResponse(const jsonrpc::Response&) override {} |
| 69 | + void onConnectionEvent(network::ConnectionEvent) override {} |
| 70 | + void onError(const Error&) override {} |
| 71 | + |
| 72 | + std::vector<jsonrpc::Request> requests_; |
| 73 | + std::vector<jsonrpc::Notification> notifications_; |
| 74 | +}; |
| 75 | + |
| 76 | +class SseTransportRoundTripTest : public test::RealIoTestBase { |
| 77 | + protected: |
| 78 | + // Build a server-side ConnectionImpl wrapped around one end of a real |
| 79 | + // TCP socketpair, with the factory's full HTTP+SSE filter chain |
| 80 | + // attached. The test holds the peer IoHandle so it can simulate a raw |
| 81 | + // HTTP client: write GET bytes in, read the server response out. |
| 82 | + struct Harness { |
| 83 | + std::shared_ptr<filter::HttpSseFilterChainFactory> factory; |
| 84 | + std::unique_ptr<network::ServerConnection> conn; |
| 85 | + network::IoHandlePtr peer; |
| 86 | + std::shared_ptr<stream_info::StreamInfo> stream_info; |
| 87 | + }; |
| 88 | + |
| 89 | + Harness makeHarness(RecordingCallbacks& callbacks, |
| 90 | + const std::string& sse_path = "/sse", |
| 91 | + const std::string& rpc_path = "/mcp", |
| 92 | + const std::string& external_url = "") { |
| 93 | + auto pair = createSocketPair(); |
| 94 | + |
| 95 | + auto local = network::Address::parseInternetAddress("127.0.0.1", 0); |
| 96 | + auto remote = network::Address::parseInternetAddress("127.0.0.1", 0); |
| 97 | + auto socket = std::make_unique<network::ConnectionSocketImpl>( |
| 98 | + std::move(pair.first), local, remote); |
| 99 | + auto transport = std::make_unique<network::RawBufferTransportSocket>(); |
| 100 | + auto stream_info = std::make_shared<stream_info::StreamInfoImpl>(); |
| 101 | + |
| 102 | + auto conn = network::ConnectionImpl::createServerConnection( |
| 103 | + *dispatcher_, std::move(socket), std::move(transport), *stream_info); |
| 104 | + |
| 105 | + auto factory = std::make_shared<filter::HttpSseFilterChainFactory>( |
| 106 | + *dispatcher_, callbacks, |
| 107 | + /*is_server=*/true, |
| 108 | + /*http_path=*/rpc_path, |
| 109 | + /*http_host=*/"localhost", |
| 110 | + /*use_sse=*/true, |
| 111 | + /*sse_path=*/sse_path, |
| 112 | + /*rpc_path=*/rpc_path, |
| 113 | + /*external_url=*/external_url); |
| 114 | + |
| 115 | + // Attach the factory's chain to the server connection and arm reads. |
| 116 | + // Equivalent to what TcpActiveListener::createConnection does for |
| 117 | + // real accepted sockets. |
| 118 | + auto* conn_impl = static_cast<network::ConnectionImpl*>(conn.get()); |
| 119 | + // createFilterChain() returning false would mean the factory couldn't |
| 120 | + // assemble the HTTP+SSE chain at all — nothing downstream is meaningful |
| 121 | + // if that happens, so fail loudly right here. |
| 122 | + EXPECT_TRUE(factory->createFilterChain(conn_impl->filterManager())) |
| 123 | + << "factory declined to build a filter chain"; |
| 124 | + conn_impl->filterManager().initializeReadFilters(); |
| 125 | + |
| 126 | + return Harness{std::move(factory), std::move(conn), std::move(pair.second), |
| 127 | + std::move(stream_info)}; |
| 128 | + } |
| 129 | + |
| 130 | + // Simulate the HTTP client: push bytes onto the peer IoHandle so they |
| 131 | + // arrive on the server connection's read path. |
| 132 | + void writeClientBytes(network::IoHandle& peer, const std::string& data) { |
| 133 | + OwnedBuffer buf; |
| 134 | + buf.add(data); |
| 135 | + auto r = peer.write(buf); |
| 136 | + ASSERT_TRUE(r.ok()) << "peer.write failed: errno=" << errno; |
| 137 | + } |
| 138 | + |
| 139 | + // Read whatever the server has written back onto the peer socket, |
| 140 | + // polling up to `budget` milliseconds. Returns as soon as we've got |
| 141 | + // something and nothing more is buffered — the loopback pair is |
| 142 | + // effectively instant once the dispatcher pumps the write event. |
| 143 | + std::string drainPeer(network::IoHandle& peer, |
| 144 | + std::chrono::milliseconds budget = 2000ms) { |
| 145 | + std::string out; |
| 146 | + const auto deadline = std::chrono::steady_clock::now() + budget; |
| 147 | + while (std::chrono::steady_clock::now() < deadline) { |
| 148 | + OwnedBuffer buf; |
| 149 | + auto r = peer.read(buf, /*max_length=*/4096); |
| 150 | + if (r.ok() && *r > 0) { |
| 151 | + out.append(buf.toString()); |
| 152 | + } else if (!out.empty()) { |
| 153 | + return out; |
| 154 | + } else { |
| 155 | + std::this_thread::sleep_for(5ms); |
| 156 | + } |
| 157 | + } |
| 158 | + return out; |
| 159 | + } |
| 160 | + |
| 161 | + // Tear down the connection and factory on the dispatcher thread. |
| 162 | + // ConnectionImpl's destructor asserts isThreadSafe(), and the factory's |
| 163 | + // shared filter vector transitively owns the SSE-codec filter whose |
| 164 | + // destructor calls SseSessionRegistry::removeSession — that assert also |
| 165 | + // fires if it runs off the dispatcher thread. |
| 166 | + void closeOnDispatcher( |
| 167 | + std::unique_ptr<network::ServerConnection> conn, |
| 168 | + std::shared_ptr<filter::HttpSseFilterChainFactory> factory) { |
| 169 | + executeInDispatcher([&]() { |
| 170 | + conn->close(network::ConnectionCloseType::NoFlush); |
| 171 | + conn.reset(); |
| 172 | + factory.reset(); |
| 173 | + }); |
| 174 | + } |
| 175 | +}; |
| 176 | + |
| 177 | +// The SSE handshake: client GETs the configured SSE path, server writes |
| 178 | +// HTTP 200 + `event: endpoint\ndata: <callback_url>\n\n`. Verifies both |
| 179 | +// the HTTP status line and the event framing, and that the callback URL |
| 180 | +// is shaped the way PR #215 promised (has `/callback/` plus a non-empty |
| 181 | +// session ID). |
| 182 | +TEST_F(SseTransportRoundTripTest, SseGetAnnouncesCallbackEndpoint) { |
| 183 | + RecordingCallbacks callbacks; |
| 184 | + |
| 185 | + std::unique_ptr<network::ServerConnection> conn; |
| 186 | + network::IoHandlePtr peer; |
| 187 | + std::shared_ptr<filter::HttpSseFilterChainFactory> factory; |
| 188 | + |
| 189 | + executeInDispatcher([&]() { |
| 190 | + auto h = makeHarness(callbacks); |
| 191 | + conn = std::move(h.conn); |
| 192 | + peer = std::move(h.peer); |
| 193 | + factory = std::move(h.factory); |
| 194 | + |
| 195 | + // Push the raw GET request onto the peer socket. The server |
| 196 | + // dispatcher will see a read event and drive it through the filter |
| 197 | + // chain; onHeaders writes the handshake bytes back. |
| 198 | + writeClientBytes( |
| 199 | + *peer, |
| 200 | + "GET /sse HTTP/1.1\r\n" |
| 201 | + "Host: localhost\r\n" |
| 202 | + "Accept: text/event-stream\r\n" |
| 203 | + "\r\n"); |
| 204 | + }); |
| 205 | + |
| 206 | + const std::string wire = drainPeer(*peer); |
| 207 | + |
| 208 | + // HTTP status line: the filter writes a 200 OK prelude before the |
| 209 | + // endpoint event. If this fails, the handshake didn't run at all. |
| 210 | + EXPECT_NE(wire.find("HTTP/1.1 200"), std::string::npos) |
| 211 | + << "expected HTTP 200 status line, got: " << wire; |
| 212 | + EXPECT_NE(wire.find("Content-Type: text/event-stream"), std::string::npos) |
| 213 | + << "expected SSE content-type header, got: " << wire; |
| 214 | + |
| 215 | + // Endpoint frame: `event: endpoint\ndata: <url>\n\n`. Parse the URL |
| 216 | + // out of it rather than hard-coding — the session ID is generated by |
| 217 | + // SseSessionRegistry so we can only check its shape. |
| 218 | + std::smatch m; |
| 219 | + std::regex endpoint_re(R"(event:\s*endpoint\s*\ndata:\s*([^\r\n]+))"); |
| 220 | + ASSERT_TRUE(std::regex_search(wire, m, endpoint_re)) |
| 221 | + << "no endpoint event in handshake bytes: " << wire; |
| 222 | + |
| 223 | + // No external_url set → factory advertises a relative URL of the |
| 224 | + // form `callback/<id>`. With external_url set, it's absolute and |
| 225 | + // contains `/callback/<id>`. Accept either shape. |
| 226 | + const std::string callback_url = m[1].str(); |
| 227 | + EXPECT_NE(callback_url.find("callback/"), std::string::npos) |
| 228 | + << "endpoint URL should contain callback/, got: " << callback_url; |
| 229 | + |
| 230 | + // Session ID is the tail after the final "callback/" — should be |
| 231 | + // non-empty regardless of whether the URL is relative or absolute. |
| 232 | + const std::string callback_marker = "callback/"; |
| 233 | + auto cb_pos = callback_url.rfind(callback_marker); |
| 234 | + ASSERT_NE(cb_pos, std::string::npos); |
| 235 | + const std::string session_id = |
| 236 | + callback_url.substr(cb_pos + callback_marker.size()); |
| 237 | + EXPECT_FALSE(session_id.empty()) |
| 238 | + << "session ID in callback URL is empty: " << callback_url; |
| 239 | + |
| 240 | + closeOnDispatcher(std::move(conn), std::move(factory)); |
| 241 | +} |
| 242 | + |
| 243 | +// When `external_url` is configured (reverse-proxy deployment), the |
| 244 | +// callback URL advertised in the endpoint event should use that base |
| 245 | +// instead of being derived from the Host header. This exercises the |
| 246 | +// McpServerConfig → factory wiring landed in PR #215. |
| 247 | +TEST_F(SseTransportRoundTripTest, ExternalUrlIsAdvertisedInEndpointEvent) { |
| 248 | + RecordingCallbacks callbacks; |
| 249 | + |
| 250 | + std::unique_ptr<network::ServerConnection> conn; |
| 251 | + network::IoHandlePtr peer; |
| 252 | + std::shared_ptr<filter::HttpSseFilterChainFactory> factory; |
| 253 | + |
| 254 | + executeInDispatcher([&]() { |
| 255 | + auto h = makeHarness(callbacks, /*sse_path=*/"/sse", /*rpc_path=*/"/mcp", |
| 256 | + /*external_url=*/"https://proxy.example.com/mcp"); |
| 257 | + conn = std::move(h.conn); |
| 258 | + peer = std::move(h.peer); |
| 259 | + factory = std::move(h.factory); |
| 260 | + |
| 261 | + // Host header intentionally set to something different from the |
| 262 | + // external URL — if the factory fell back to Host-derived URLs, |
| 263 | + // we'd see "localhost" in the callback URL instead of the proxy. |
| 264 | + writeClientBytes( |
| 265 | + *peer, |
| 266 | + "GET /sse HTTP/1.1\r\n" |
| 267 | + "Host: internal-host:8080\r\n" |
| 268 | + "\r\n"); |
| 269 | + }); |
| 270 | + |
| 271 | + const std::string wire = drainPeer(*peer); |
| 272 | + |
| 273 | + std::smatch m; |
| 274 | + std::regex endpoint_re(R"(event:\s*endpoint\s*\ndata:\s*([^\r\n]+))"); |
| 275 | + ASSERT_TRUE(std::regex_search(wire, m, endpoint_re)) |
| 276 | + << "no endpoint event: " << wire; |
| 277 | + |
| 278 | + const std::string callback_url = m[1].str(); |
| 279 | + EXPECT_NE(callback_url.find("proxy.example.com"), std::string::npos) |
| 280 | + << "external_url should be in advertised callback URL, got: " |
| 281 | + << callback_url; |
| 282 | + EXPECT_EQ(callback_url.find("internal-host"), std::string::npos) |
| 283 | + << "Host header leaked into callback URL instead of external_url: " |
| 284 | + << callback_url; |
| 285 | + |
| 286 | + closeOnDispatcher(std::move(conn), std::move(factory)); |
| 287 | +} |
| 288 | + |
| 289 | +// Configuring a non-default SSE path (e.g. /events for a legacy client) |
| 290 | +// should change only where the GET is accepted, not the /callback/{id} |
| 291 | +// shape of the announced endpoint. Guards against a regression where |
| 292 | +// the handshake path got hardcoded alongside the config field. |
| 293 | +TEST_F(SseTransportRoundTripTest, ConfiguredSsePathIsHonored) { |
| 294 | + RecordingCallbacks callbacks; |
| 295 | + |
| 296 | + std::unique_ptr<network::ServerConnection> conn; |
| 297 | + network::IoHandlePtr peer; |
| 298 | + std::shared_ptr<filter::HttpSseFilterChainFactory> factory; |
| 299 | + |
| 300 | + executeInDispatcher([&]() { |
| 301 | + auto h = makeHarness(callbacks, /*sse_path=*/"/events", /*rpc_path=*/"/rpc", |
| 302 | + /*external_url=*/""); |
| 303 | + conn = std::move(h.conn); |
| 304 | + peer = std::move(h.peer); |
| 305 | + factory = std::move(h.factory); |
| 306 | + |
| 307 | + writeClientBytes( |
| 308 | + *peer, |
| 309 | + "GET /events HTTP/1.1\r\n" |
| 310 | + "Host: localhost\r\n" |
| 311 | + "\r\n"); |
| 312 | + }); |
| 313 | + |
| 314 | + const std::string wire = drainPeer(*peer); |
| 315 | + EXPECT_NE(wire.find("HTTP/1.1 200"), std::string::npos) |
| 316 | + << "configured SSE path /events did not return 200, got: " << wire; |
| 317 | + EXPECT_NE(wire.find("event: endpoint"), std::string::npos) |
| 318 | + << "no endpoint event on configured /events path, got: " << wire; |
| 319 | + |
| 320 | + closeOnDispatcher(std::move(conn), std::move(factory)); |
| 321 | +} |
| 322 | + |
| 323 | +} // namespace |
| 324 | +} // namespace integration |
| 325 | +} // namespace mcp |
0 commit comments