11/* *
22 * Real-IO integration test for the SSE server transport.
33 *
4- * Verifies the first leg of the round-trip promised in PR #215's test
5- * plan: a GET on the configured SSE path returns an HTTP 200 stream that
6- * opens with an `event: endpoint` frame carrying a /callback/{id} URL
7- * the client can POST to. That exercises:
4+ * Verifies the full round-trip promised in PR #215's test plan:
85 *
6+ * 1. GET on the configured SSE path returns an HTTP 200 stream that
7+ * opens with an `event: endpoint` frame carrying a /callback/{id}
8+ * URL the client can POST to.
9+ * 2. POST on that /callback/{id} returns 202 Accepted, and a JSON-RPC
10+ * response produced on the POST connection is rerouted through the
11+ * SseSessionRegistry onto the original SSE stream (instead of being
12+ * framed as HTTP bytes on the POST socket).
13+ *
14+ * Covered wire paths:
915 * - HttpSseFilterChainFactory server-mode chain construction
10- * - HttpSseJsonRpcProtocolFilter's `onHeaders` handshake path, which
11- * writes the HTTP prelude + the endpoint event inline via
12- * connection().write()
13- * - SseSessionRegistry::registerSession being called against the live
14- * server connection
16+ * - HttpSseJsonRpcProtocolFilter's `onHeaders` handshake for both
17+ * GET /sse (writes HTTP prelude + endpoint event inline via
18+ * connection().write()) and POST /callback/{id} (writes 202 inline
19+ * and tags the filter instance with sse_callback_session_id_)
20+ * - HttpSseJsonRpcProtocolFilter's `onWrite` interception that pulls
21+ * JSON-RPC bytes off the POST connection's write chain and hands
22+ * them to SseSessionRegistry::sendResponse on the matching SSE
23+ * connection
1524 *
1625 * Design:
17- * - No McpServer bootstrap; the factory + a ConnectionImpl around a
18- * real TCP socketpair is the smallest harness that exercises the
19- * real write path (not a mock) while keeping the test self-contained.
26+ * - No McpServer bootstrap; two ConnectionImpls over real TCP
27+ * socketpairs sharing a single HttpSseFilterChainFactory is the
28+ * smallest harness that exercises the real routing path (not a mock)
29+ * while keeping the test self-contained. The shared factory is the
30+ * key: it owns the SseSessionRegistry so the POST-connection's
31+ * filter instance can find the SSE-connection's filter instance.
2032 * - Dispatcher-thread invariant: every mutation of the connection and
2133 * filter chain runs inside executeInDispatcher() because
2234 * ConnectionImpl's lifecycle methods assert isThreadSafe().
23- * - Cleanup closes the connection on the dispatcher thread too — tearing
24- * down ConnectionImpl from the test thread would trip its destructor
25- * assert.
26- *
27- * The POST /callback/{id} → 202 → response-routed-through-SSE leg of the
28- * round-trip requires either a full McpServer bootstrap or a test-only
29- * connection-tracking filter and is tracked as a follow-up.
35+ * - Cleanup closes both connections and drops the factory on the
36+ * dispatcher thread — tearing down ConnectionImpl from the test
37+ * thread would trip its destructor assert, and the factory's
38+ * transitively-owned SSE filters call registry.removeSession() from
39+ * their destructors which also asserts dispatcher-thread.
3040 */
3141
3242#include < chrono>
@@ -86,12 +96,39 @@ class SseTransportRoundTripTest : public test::RealIoTestBase {
8696 std::shared_ptr<stream_info::StreamInfo> stream_info;
8797 };
8898
89- Harness makeHarness (RecordingCallbacks & callbacks,
99+ Harness makeHarness (McpProtocolCallbacks & callbacks,
90100 const std::string& sse_path = " /sse" ,
91101 const std::string& rpc_path = " /mcp" ,
92102 const std::string& external_url = " " ) {
93- auto pair = createSocketPair ();
103+ auto factory = std::make_shared<filter::HttpSseFilterChainFactory>(
104+ *dispatcher_, callbacks,
105+ /* is_server=*/ true ,
106+ /* http_path=*/ rpc_path,
107+ /* http_host=*/ " localhost" ,
108+ /* use_sse=*/ true ,
109+ /* sse_path=*/ sse_path,
110+ /* rpc_path=*/ rpc_path,
111+ /* external_url=*/ external_url);
112+
113+ auto inner = makeConnectionForFactory (factory);
114+ return Harness{std::move (factory), std::move (inner.conn ),
115+ std::move (inner.peer ), std::move (inner.stream_info )};
116+ }
94117
118+ // Lightweight shape for attaching additional connections to an existing
119+ // factory. The POST /callback leg of the round-trip needs two live
120+ // server connections sharing the same factory — both so their filter
121+ // instances share the factory's SseSessionRegistry, and so tearing the
122+ // factory down cleans up both filter chains at once.
123+ struct ExtraConnection {
124+ std::unique_ptr<network::ServerConnection> conn;
125+ network::IoHandlePtr peer;
126+ std::shared_ptr<stream_info::StreamInfo> stream_info;
127+ };
128+
129+ ExtraConnection makeConnectionForFactory (
130+ const std::shared_ptr<filter::HttpSseFilterChainFactory>& factory) {
131+ auto pair = createSocketPair ();
95132 auto local = network::Address::parseInternetAddress (" 127.0.0.1" , 0 );
96133 auto remote = network::Address::parseInternetAddress (" 127.0.0.1" , 0 );
97134 auto socket = std::make_unique<network::ConnectionSocketImpl>(
@@ -102,19 +139,6 @@ class SseTransportRoundTripTest : public test::RealIoTestBase {
102139 auto conn = network::ConnectionImpl::createServerConnection (
103140 *dispatcher_, std::move (socket), std::move (transport), *stream_info);
104141
105- auto factory = std::make_shared<filter::HttpSseFilterChainFactory>(
106- *dispatcher_, callbacks,
107- /* is_server=*/ true ,
108- /* http_path=*/ rpc_path,
109- /* http_host=*/ " localhost" ,
110- /* use_sse=*/ true ,
111- /* sse_path=*/ sse_path,
112- /* rpc_path=*/ rpc_path,
113- /* external_url=*/ external_url);
114-
115- // Attach the factory's chain to the server connection and arm reads.
116- // Equivalent to what TcpActiveListener::createConnection does for
117- // real accepted sockets.
118142 auto * conn_impl = static_cast <network::ConnectionImpl*>(conn.get ());
119143 // createFilterChain() returning false would mean the factory couldn't
120144 // assemble the HTTP+SSE chain at all — nothing downstream is meaningful
@@ -123,8 +147,8 @@ class SseTransportRoundTripTest : public test::RealIoTestBase {
123147 << " factory declined to build a filter chain" ;
124148 conn_impl->filterManager ().initializeReadFilters ();
125149
126- return Harness{ std::move (factory), std::move (conn), std::move (pair.second ),
127- std::move (stream_info)};
150+ return ExtraConnection{ std::move (conn), std::move (pair.second ),
151+ std::move (stream_info)};
128152 }
129153
130154 // Simulate the HTTP client: push bytes onto the peer IoHandle so they
@@ -320,6 +344,140 @@ TEST_F(SseTransportRoundTripTest, ConfiguredSsePathIsHonored) {
320344 closeOnDispatcher (std::move (conn), std::move (factory));
321345}
322346
347+ // Full round-trip: GET /sse registers a session, POST /callback/{id}
348+ // gets 202 Accepted, and a JSON-RPC response produced on the POST
349+ // connection is rerouted through the SseSessionRegistry onto the SSE
350+ // stream. This is the contract PR #215's test plan called out as
351+ // pending — if the onWrite interception regresses, the response will
352+ // either leak back onto the POST connection as HTTP bytes or disappear
353+ // entirely.
354+ TEST_F (SseTransportRoundTripTest, PostCallbackRoutesResponseThroughSseStream) {
355+ // Test-only callbacks that synthesize a server response the moment a
356+ // JSON-RPC request is parsed off the POST /callback body. The real
357+ // McpServer emits responses via the JSON-RPC filter's encoder; a raw
358+ // connection().write() reaches the same write chain and therefore
359+ // the same HttpSseJsonRpcProtocolFilter::onWrite that does the
360+ // rerouting. The write must happen on the dispatcher thread (ConnImpl
361+ // asserts it), which is already the case — filter.onRequest fires
362+ // from inside the dispatcher's read path.
363+ class EchoingCallbacks : public McpProtocolCallbacks {
364+ public:
365+ void onRequest (const jsonrpc::Request& request) override {
366+ ++requests_seen;
367+ if (callback_conn) {
368+ OwnedBuffer b;
369+ // A minimal JSON-RPC response; the exact shape doesn't matter,
370+ // only that it's recognizable on the SSE peer.
371+ std::string resp =
372+ R"( {"jsonrpc":"2.0","id":)" +
373+ (holds_alternative<int64_t >(request.id )
374+ ? std::to_string (get<int64_t >(request.id ))
375+ : std::string (" \" " ) + get<std::string>(request.id ) + " \" " ) +
376+ R"( ,"result":{"echoed":true}})" + " \n " ;
377+ b.add (resp);
378+ callback_conn->write (b, false );
379+ }
380+ }
381+ void onNotification (const jsonrpc::Notification&) override {}
382+ void onResponse (const jsonrpc::Response&) override {}
383+ void onConnectionEvent (network::ConnectionEvent) override {}
384+ void onError (const Error&) override {}
385+
386+ network::Connection* callback_conn = nullptr ;
387+ std::atomic<int > requests_seen{0 };
388+ } callbacks;
389+
390+ std::shared_ptr<filter::HttpSseFilterChainFactory> factory;
391+ std::unique_ptr<network::ServerConnection> sse_conn;
392+ network::IoHandlePtr sse_peer;
393+ std::unique_ptr<network::ServerConnection> cb_conn;
394+ network::IoHandlePtr cb_peer;
395+
396+ // Bring up the SSE stream connection first so the session id the
397+ // factory generates is registered before any POST can land.
398+ executeInDispatcher ([&]() {
399+ auto h = makeHarness (callbacks);
400+ factory = std::move (h.factory );
401+ sse_conn = std::move (h.conn );
402+ sse_peer = std::move (h.peer );
403+
404+ writeClientBytes (*sse_peer,
405+ " GET /sse HTTP/1.1\r\n "
406+ " Host: localhost\r\n "
407+ " \r\n " );
408+ });
409+
410+ const std::string handshake = drainPeer (*sse_peer);
411+ std::smatch m;
412+ std::regex endpoint_re (R"( event:\s*endpoint\s*\ndata:\s*([^\r\n]+))" );
413+ ASSERT_TRUE (std::regex_search (handshake, m, endpoint_re))
414+ << " no endpoint event on SSE stream: " << handshake;
415+ const std::string callback_url = m[1 ].str ();
416+ const std::string marker = " callback/" ;
417+ const auto cb_pos = callback_url.rfind (marker);
418+ ASSERT_NE (cb_pos, std::string::npos);
419+ const std::string session_id = callback_url.substr (cb_pos + marker.size ());
420+ ASSERT_FALSE (session_id.empty ());
421+
422+ // Now bring up the POST /callback connection on the SAME factory and
423+ // wire its connection pointer into the echoing callbacks. Without
424+ // that pointer the callbacks can't emit a write, and the routing
425+ // assertion below would be trivially true of any config.
426+ executeInDispatcher ([&]() {
427+ auto extra = makeConnectionForFactory (factory);
428+ cb_conn = std::move (extra.conn );
429+ cb_peer = std::move (extra.peer );
430+ callbacks.callback_conn = cb_conn.get ();
431+
432+ const std::string body =
433+ R"( {"jsonrpc":"2.0","id":42,"method":"ping"})" ;
434+ std::string req;
435+ req += " POST /callback/" + session_id + " HTTP/1.1\r\n " ;
436+ req += " Host: localhost\r\n " ;
437+ req += " Content-Type: application/json\r\n " ;
438+ req += " Content-Length: " + std::to_string (body.size ()) + " \r\n " ;
439+ req += " \r\n " ;
440+ req += body;
441+ writeClientBytes (*cb_peer, req);
442+ });
443+
444+ // The POST connection gets back only the 202 Accepted handshake;
445+ // the response body is rerouted off this socket.
446+ const std::string cb_peer_bytes = drainPeer (*cb_peer);
447+ EXPECT_NE (cb_peer_bytes.find (" HTTP/1.1 202 Accepted" ), std::string::npos)
448+ << " expected 202 Accepted on POST connection, got: " << cb_peer_bytes;
449+ EXPECT_EQ (cb_peer_bytes.find (" \" echoed\" :true" ), std::string::npos)
450+ << " response leaked back onto POST connection instead of SSE stream: "
451+ << cb_peer_bytes;
452+
453+ EXPECT_EQ (callbacks.requests_seen .load (), 1 )
454+ << " JSON-RPC filter never dispatched the POSTed request" ;
455+
456+ // The actual routed-response assertion: after the 202, the JSON-RPC
457+ // response bytes produced by EchoingCallbacks must appear on the SSE
458+ // peer — that's the registry-based reroute working end-to-end.
459+ const std::string routed = drainPeer (*sse_peer);
460+ EXPECT_NE (routed.find (" \" echoed\" :true" ), std::string::npos)
461+ << " JSON-RPC response was not routed through SSE stream, got: "
462+ << routed;
463+
464+ // Tear everything down on the dispatcher thread. The factory outlives
465+ // both connections via its filters_ vector, so it must be dropped
466+ // last — dropping it off-thread would trip SseSessionRegistry asserts
467+ // as the SSE-codec filter destructors run.
468+ executeInDispatcher ([&]() {
469+ // Null the side-channel connection pointer before destroying cb_conn
470+ // so any stray read event on the socket doesn't race our teardown
471+ // through EchoingCallbacks.
472+ callbacks.callback_conn = nullptr ;
473+ cb_conn->close (network::ConnectionCloseType::NoFlush);
474+ cb_conn.reset ();
475+ sse_conn->close (network::ConnectionCloseType::NoFlush);
476+ sse_conn.reset ();
477+ factory.reset ();
478+ });
479+ }
480+
323481} // namespace
324482} // namespace integration
325483} // namespace mcp
0 commit comments