Skip to content

Commit 2ef57fb

Browse files
gophergogogophergogo
authored andcommitted
Implement SSE server transport with per-factory session registry (#215)
MCP's SSE transport splits each JSON-RPC request/response across two TCP connections on the server: a long-lived GET {sse_path} stream, and short POST /callback/{session_id} connections that carry the request body. The server returns 202 Accepted on the POST and routes the real JSON-RPC response back through the SSE stream registered under the matching session ID. This commit lands the server side of that dance inside HttpSseJsonRpcProtocolFilter. Design notes: - SseSessionRegistry is owned by HttpSseFilterChainFactory, not a process-wide singleton. The upstream draft used a global singleton with a std::mutex; that conflicts with the project rule to prefer state machines and dispatcher-thread invariants over mutex-guarded shared state, leaks session IDs across McpServer instances in the same process, and encourages cross-thread connection.write() calls. A per-factory registry keeps lifetime bounded by the server, and since all filter callbacks on a given server already fire on one dispatcher thread, the map needs no lock — only an assert on each public method to keep that invariant honest if someone later splits the server across workers. - The filter, not the registry, holds the session-ID-to-connection binding lifecycle: registerSession in onHeaders for GET {sse_path}, removeSession in the filter destructor when the SSE stream connection tears down. The destructor runs on the dispatcher thread per the filter/connection contract, so a POST /callback arriving between "SSE connection closed" and "filter destructed" is handled by an already-empty registry lookup returning false — no use-after-free window. - onHeaders sends the SSE prelude (HTTP 200 + text/event-stream headers + the "event: endpoint" line) via connection().write() inline, same for the POST's 202 Accepted. A sse_writing_handshake_ pass-through flag keeps our own onWrite from re-framing those raw bytes as an SSE event when the write chain calls back into us — the alternative (posting through the dispatcher) would race the client reading the handshake before we could register the session in the registry. - onWrite intercepts the JSON-RPC response when sse_callback_session_id_ is set, drains the buffer, hands the JSON to the registry, and returns StopIteration so the response doesn't get written back to the POST connection (which already got its 202). - The filter ctor gains four trailing defaulted params (sse_path, rpc_path, external_url, SseSessionRegistry*). All four default to the values that match pre-existing behavior, so no caller that doesn't care about SSE server transport needs to change. - Factory ctor/destructor are moved out-of-line so the unique_ptr member of the forward-declared SseSessionRegistry class compiles cleanly from the header. The existing HttpSse/HttpsSse/SseCodec/HttpCodecSse test suite (10 cases) continues to pass unchanged — the new transport is opt-in via the GET {sse_path} / POST /callback/{id} URL pattern and back-compat factories that don't use either path see no behavioral change. Integration test for the new round-trip lands in the next commit.
1 parent f82ecfb commit 2ef57fb

2 files changed

Lines changed: 351 additions & 22 deletions

File tree

include/mcp/filter/http_sse_filter_chain_factory.h

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ class McpProtocolCallbacks;
1818
namespace filter {
1919
class HttpRoutingFilter;
2020
class MetricsFilter;
21+
class SseSessionRegistry;
2122
} // namespace filter
2223
} // namespace mcp
2324

@@ -83,16 +84,11 @@ class HttpSseFilterChainFactory : public network::FilterChainFactory {
8384
bool use_sse = true,
8485
const std::string& sse_path = "/sse",
8586
const std::string& rpc_path = "/mcp",
86-
const std::string& external_url = "")
87-
: dispatcher_(dispatcher),
88-
message_callbacks_(message_callbacks),
89-
is_server_(is_server),
90-
http_path_(http_path),
91-
http_host_(http_host),
92-
use_sse_(use_sse),
93-
sse_path_(sse_path),
94-
rpc_path_(rpc_path),
95-
external_url_(external_url) {}
87+
const std::string& external_url = "");
88+
89+
// Destructor defined out-of-line so the unique_ptr<SseSessionRegistry>
90+
// member can use the incomplete forward-declared type in this header.
91+
~HttpSseFilterChainFactory();
9692

9793
/**
9894
* Create filter chain for the connection
@@ -189,6 +185,16 @@ class HttpSseFilterChainFactory : public network::FilterChainFactory {
189185
std::string external_url_; // External URL for absolute SSE callback URLs
190186
mutable bool enable_metrics_ = true; // Enable metrics by default
191187

188+
// SSE session registry — maps session IDs to the connections that are
189+
// streaming SSE to each client, so POST /callback/{id} handlers can
190+
// route a JSON-RPC response back through the correct stream. Lazily
191+
// constructed on the first server-side filter chain creation; stays
192+
// null for client-mode factories. Owned here (not a global singleton)
193+
// so each McpServer instance has an isolated registry and lifetime is
194+
// bounded by the factory, not process lifetime. Mutable because
195+
// createFilterChain is const on the base class.
196+
mutable std::unique_ptr<SseSessionRegistry> sse_registry_;
197+
192198
// Store filters for lifetime management
193199
mutable std::vector<network::FilterSharedPtr> filters_;
194200

0 commit comments

Comments
 (0)