From dc3d2d9e39131226681ae5d3f8fc917eb7247da3 Mon Sep 17 00:00:00 2001 From: gophergogo Date: Sun, 19 Apr 2026 16:50:22 -0700 Subject: [PATCH 1/5] Rename default SSE path to /sse and add external_url config MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two small config changes on McpServerConfig in preparation for the SSE server transport landing on top of this PR stack: - Default http_sse_path changes from /events to /sse. /events does not convey intent (SSE vs. arbitrary event stream), and /sse matches what the Python/TypeScript MCP SDKs use, keeping the cross-language wire contract consistent. - Add external_url. The SSE GET handler has to announce a callback URL in the endpoint event; when the server sits behind a reverse proxy that rewrites scheme or path (e.g. https:// → http:// + /mcp/ prefix), the Host header alone doesn't reconstruct the client-visible URL. external_url is the explicit override; empty means derive from Host. No behavior change yet — the factory that consumes these lands in a later commit on this branch. --- include/mcp/server/mcp_server.h | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/include/mcp/server/mcp_server.h b/include/mcp/server/mcp_server.h index acaa1eaf..bce793f2 100644 --- a/include/mcp/server/mcp_server.h +++ b/include/mcp/server/mcp_server.h @@ -88,8 +88,15 @@ struct McpServerConfig : public application::ApplicationBase::Config { // HTTP/SSE specific configuration std::string http_rpc_path = "/rpc"; // Path for JSON-RPC over HTTP - std::string http_sse_path = "/events"; // Path for SSE event stream + std::string http_sse_path = "/sse"; // Path for SSE event stream std::string http_health_path = "/health"; // Path for health check endpoint + // Absolute URL the server is reachable at from the client's perspective + // (scheme + host + port + optional path prefix). Used to build the + // endpoint-event callback URL advertised on GET /sse. Leave empty to + // have the server derive a URL from the incoming Host header; set + // explicitly when the server sits behind a reverse proxy that rewrites + // scheme or path so clients don't try to POST back to an internal URL. + std::string external_url; // Session management size_t max_sessions = 100; From c7c669c31c116d8c42de4ffa168d602b9cb3c629 Mon Sep 17 00:00:00 2001 From: gophergogo Date: Sun, 19 Apr 2026 16:52:16 -0700 Subject: [PATCH 2/5] Add SSE/RPC path and external_url params to filter chain factory MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extend HttpSseFilterChainFactory's constructor with three optional server-side parameters so McpServer can propagate its configured endpoint paths down into the filter chain without the factory having to reach back into McpServerConfig: - sse_path: server-side SSE endpoint (matches McpServerConfig http_sse_path, default "/sse") - rpc_path: server-side JSON-RPC endpoint (default "/mcp") - external_url: absolute URL the server is reachable at from the client's perspective; used to build the endpoint-event callback URL advertised on GET /sse. Empty means derive the URL from the incoming Host header. Defaulted so existing callers (tests, client-mode construction, example apps that only care about http_path/http_host) keep compiling unchanged. The factory does not consume these fields yet — that wiring lands in a follow-up commit once the SSE server transport itself goes in on top of this stack. --- .../filter/http_sse_filter_chain_factory.h | 27 +++++++++++++++---- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/include/mcp/filter/http_sse_filter_chain_factory.h b/include/mcp/filter/http_sse_filter_chain_factory.h index 0f67c9f1..fa6bff23 100644 --- a/include/mcp/filter/http_sse_filter_chain_factory.h +++ b/include/mcp/filter/http_sse_filter_chain_factory.h @@ -66,19 +66,33 @@ class HttpSseFilterChainFactory : public network::FilterChainFactory { * @param http_host HTTP Host header value for client mode * @param use_sse True for SSE mode (GET /sse first), false for Streamable * HTTP (direct POST) + * @param sse_path Server-side SSE endpoint path (e.g., "/sse"). Only + * meaningful when is_server=true. + * @param rpc_path Server-side JSON-RPC endpoint path (e.g., "/mcp"). Only + * meaningful when is_server=true. + * @param external_url Absolute URL the server is reachable at from the + * client's perspective. Used to build the endpoint-event + * callback URL advertised on GET /sse. Leave empty to + * derive the URL from the incoming Host header. */ HttpSseFilterChainFactory(event::Dispatcher& dispatcher, McpProtocolCallbacks& message_callbacks, bool is_server = true, const std::string& http_path = "/rpc", const std::string& http_host = "localhost", - bool use_sse = true) + bool use_sse = true, + const std::string& sse_path = "/sse", + const std::string& rpc_path = "/mcp", + const std::string& external_url = "") : dispatcher_(dispatcher), message_callbacks_(message_callbacks), is_server_(is_server), http_path_(http_path), http_host_(http_host), - use_sse_(use_sse) {} + use_sse_(use_sse), + sse_path_(sse_path), + rpc_path_(rpc_path), + external_url_(external_url) {} /** * Create filter chain for the connection @@ -167,9 +181,12 @@ class HttpSseFilterChainFactory : public network::FilterChainFactory { event::Dispatcher& dispatcher_; McpProtocolCallbacks& message_callbacks_; bool is_server_; - std::string http_path_; // HTTP request path for client mode - std::string http_host_; // HTTP Host header for client mode - bool use_sse_; // True for SSE mode, false for Streamable HTTP + std::string http_path_; // HTTP request path for client mode + std::string http_host_; // HTTP Host header for client mode + bool use_sse_; // True for SSE mode, false for Streamable HTTP + std::string sse_path_; // Server-side SSE endpoint path (e.g., "/sse") + std::string rpc_path_; // Server-side JSON-RPC endpoint path (e.g., "/mcp") + std::string external_url_; // External URL for absolute SSE callback URLs mutable bool enable_metrics_ = true; // Enable metrics by default // Store filters for lifetime management From 2965aa3c413b86a9edc80c9ea05a822b765cce6b Mon Sep 17 00:00:00 2001 From: gophergogo Date: Sun, 19 Apr 2026 17:03:44 -0700 Subject: [PATCH 3/5] Implement SSE server transport with per-factory session registry MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit MCP's SSE transport splits each JSON-RPC request/response across two TCP connections on the server: a long-lived GET {sse_path} stream, and short POST /callback/{session_id} connections that carry the request body. The server returns 202 Accepted on the POST and routes the real JSON-RPC response back through the SSE stream registered under the matching session ID. This commit lands the server side of that dance inside HttpSseJsonRpcProtocolFilter. Design notes: - SseSessionRegistry is owned by HttpSseFilterChainFactory, not a process-wide singleton. The upstream draft used a global singleton with a std::mutex; that conflicts with the project rule to prefer state machines and dispatcher-thread invariants over mutex-guarded shared state, leaks session IDs across McpServer instances in the same process, and encourages cross-thread connection.write() calls. A per-factory registry keeps lifetime bounded by the server, and since all filter callbacks on a given server already fire on one dispatcher thread, the map needs no lock — only an assert on each public method to keep that invariant honest if someone later splits the server across workers. - The filter, not the registry, holds the session-ID-to-connection binding lifecycle: registerSession in onHeaders for GET {sse_path}, removeSession in the filter destructor when the SSE stream connection tears down. The destructor runs on the dispatcher thread per the filter/connection contract, so a POST /callback arriving between "SSE connection closed" and "filter destructed" is handled by an already-empty registry lookup returning false — no use-after-free window. - onHeaders sends the SSE prelude (HTTP 200 + text/event-stream headers + the "event: endpoint" line) via connection().write() inline, same for the POST's 202 Accepted. A sse_writing_handshake_ pass-through flag keeps our own onWrite from re-framing those raw bytes as an SSE event when the write chain calls back into us — the alternative (posting through the dispatcher) would race the client reading the handshake before we could register the session in the registry. - onWrite intercepts the JSON-RPC response when sse_callback_session_id_ is set, drains the buffer, hands the JSON to the registry, and returns StopIteration so the response doesn't get written back to the POST connection (which already got its 202). - The filter ctor gains four trailing defaulted params (sse_path, rpc_path, external_url, SseSessionRegistry*). All four default to the values that match pre-existing behavior, so no caller that doesn't care about SSE server transport needs to change. - Factory ctor/destructor are moved out-of-line so the unique_ptr member of the forward-declared SseSessionRegistry class compiles cleanly from the header. The existing HttpSse/HttpsSse/SseCodec/HttpCodecSse test suite (10 cases) continues to pass unchanged — the new transport is opt-in via the GET {sse_path} / POST /callback/{id} URL pattern and back-compat factories that don't use either path see no behavioral change. Integration test for the new round-trip lands in the next commit. --- .../filter/http_sse_filter_chain_factory.h | 26 +- src/filter/http_sse_filter_chain_factory.cc | 347 +++++++++++++++++- 2 files changed, 351 insertions(+), 22 deletions(-) diff --git a/include/mcp/filter/http_sse_filter_chain_factory.h b/include/mcp/filter/http_sse_filter_chain_factory.h index fa6bff23..4d5c9b2c 100644 --- a/include/mcp/filter/http_sse_filter_chain_factory.h +++ b/include/mcp/filter/http_sse_filter_chain_factory.h @@ -18,6 +18,7 @@ class McpProtocolCallbacks; namespace filter { class HttpRoutingFilter; class MetricsFilter; +class SseSessionRegistry; } // namespace filter } // namespace mcp @@ -83,16 +84,11 @@ class HttpSseFilterChainFactory : public network::FilterChainFactory { bool use_sse = true, const std::string& sse_path = "/sse", const std::string& rpc_path = "/mcp", - const std::string& external_url = "") - : dispatcher_(dispatcher), - message_callbacks_(message_callbacks), - is_server_(is_server), - http_path_(http_path), - http_host_(http_host), - use_sse_(use_sse), - sse_path_(sse_path), - rpc_path_(rpc_path), - external_url_(external_url) {} + const std::string& external_url = ""); + + // Destructor defined out-of-line so the unique_ptr + // member can use the incomplete forward-declared type in this header. + ~HttpSseFilterChainFactory(); /** * Create filter chain for the connection @@ -189,6 +185,16 @@ class HttpSseFilterChainFactory : public network::FilterChainFactory { std::string external_url_; // External URL for absolute SSE callback URLs mutable bool enable_metrics_ = true; // Enable metrics by default + // SSE session registry — maps session IDs to the connections that are + // streaming SSE to each client, so POST /callback/{id} handlers can + // route a JSON-RPC response back through the correct stream. Lazily + // constructed on the first server-side filter chain creation; stays + // null for client-mode factories. Owned here (not a global singleton) + // so each McpServer instance has an isolated registry and lifetime is + // bounded by the factory, not process lifetime. Mutable because + // createFilterChain is const on the base class. + mutable std::unique_ptr sse_registry_; + // Store filters for lifetime management mutable std::vector filters_; diff --git a/src/filter/http_sse_filter_chain_factory.cc b/src/filter/http_sse_filter_chain_factory.cc index 8b205218..ac630d79 100644 --- a/src/filter/http_sse_filter_chain_factory.cc +++ b/src/filter/http_sse_filter_chain_factory.cc @@ -50,8 +50,12 @@ #include "mcp/filter/http_sse_filter_chain_factory.h" +#include +#include #include +#include #include +#include #include "mcp/filter/http_codec_filter.h" #include "mcp/filter/http_routing_filter.h" @@ -66,6 +70,99 @@ namespace mcp { namespace filter { +// ═══════════════════════════════════════════════════════════════════════════ +// SseSessionRegistry — dispatcher-owned map of SSE session IDs to the +// network::Connection* that is streaming SSE back to each client. +// +// MCP SSE transport splits the request/response pair across two TCP +// connections: +// 1. A long-lived GET /sse stream that the client leaves open for +// server-sent events. The server registers this connection under a +// fresh session ID and announces a POST callback URL containing +// that ID in the "endpoint" event. +// 2. Short POST /callback/{session_id} connections — one per outbound +// JSON-RPC request from the client. The server returns 202 Accepted +// immediately and then routes the actual JSON-RPC response through +// the SSE connection registered under that session ID. +// +// The registry is what lets the POST handler find the SSE connection it +// has to route the response through. It lives on the filter chain +// factory (one per McpServer), not as a process-wide singleton, so: +// - Independent McpServer instances in the same process do not share +// session IDs or leak into each other. +// - Registry lifetime is bounded by the factory, which is owned by +// McpServer — no global state to reason about at shutdown. +// +// Threading model: +// - The MCP server runs on a single dispatcher thread. All filter +// callbacks (onHeaders, onWrite, filter destructor) fire on that +// thread, so registry mutations are naturally single-threaded. +// - Every public method asserts isThreadSafe() so a future move to a +// worker-thread model fails loudly instead of silently corrupting +// the map. +// ═══════════════════════════════════════════════════════════════════════════ +class SseSessionRegistry { + public: + explicit SseSessionRegistry(event::Dispatcher& dispatcher) + : dispatcher_(dispatcher) {} + + // Record an SSE stream connection and hand back a stable session ID. + // Caller is responsible for calling removeSession() when the stream + // closes (filter destructor does this). + std::string registerSession(network::Connection* connection) { + assert(dispatcher_.isThreadSafe() && + "SseSessionRegistry::registerSession off-dispatcher-thread"); + std::string session_id = "client_" + std::to_string(next_id_++); + sessions_[session_id] = connection; + GOPHER_LOG_INFO("SSE session registered: {} (total={})", session_id, + sessions_.size()); + return session_id; + } + + // Drop a session. Safe to call with an unknown ID (no-op). Typically + // invoked from the SSE filter's destructor when the stream connection + // closes. + void removeSession(const std::string& session_id) { + assert(dispatcher_.isThreadSafe() && + "SseSessionRegistry::removeSession off-dispatcher-thread"); + if (sessions_.erase(session_id) > 0) { + GOPHER_LOG_INFO("SSE session removed: {} (total={})", session_id, + sessions_.size()); + } + } + + // Write a JSON-RPC response through the SSE stream registered under + // session_id. Returns true if the session existed and the write was + // handed to the connection (the actual wire bytes are framed into a + // `data: ...\n\n` SSE event by the SSE codec filter further down that + // connection's write chain). Returns false if the session has gone + // away (e.g. client already disconnected while we were handling the + // POST), in which case the caller should drop the response on the + // floor rather than pretending it was delivered. + bool sendResponse(const std::string& session_id, + const std::string& json_data) { + assert(dispatcher_.isThreadSafe() && + "SseSessionRegistry::sendResponse off-dispatcher-thread"); + auto it = sessions_.find(session_id); + if (it == sessions_.end()) { + GOPHER_LOG_WARN("SSE session not found for response routing: {}", + session_id); + return false; + } + OwnedBuffer buffer; + buffer.add(json_data.c_str(), json_data.length()); + it->second->write(buffer, /*end_stream=*/false); + GOPHER_LOG_DEBUG("SSE response routed to session {} ({} bytes)", + session_id, json_data.size()); + return true; + } + + private: + event::Dispatcher& dispatcher_; + std::map sessions_; + uint64_t next_id_{1}; +}; + // Forward declaration class HttpSseJsonRpcProtocolFilter; @@ -140,13 +237,21 @@ class HttpSseJsonRpcProtocolFilter const std::string& http_path = "/rpc", const std::string& http_host = "localhost", bool use_sse = true, - const HttpRouteRegistrationCallback& route_callback = nullptr) + const HttpRouteRegistrationCallback& route_callback = nullptr, + const std::string& configured_sse_path = "/sse", + const std::string& configured_rpc_path = "/mcp", + const std::string& configured_external_url = "", + SseSessionRegistry* sse_registry = nullptr) : dispatcher_(dispatcher), mcp_callbacks_(mcp_callbacks), is_server_(is_server), http_path_(http_path), http_host_(http_host), use_sse_(use_sse), + configured_sse_path_(configured_sse_path), + configured_rpc_path_(configured_rpc_path), + configured_external_url_(configured_external_url), + sse_registry_(sse_registry), route_registration_callback_(route_callback) { // Following production pattern: all operations for this filter // happen in the single dispatcher thread @@ -188,7 +293,16 @@ class HttpSseJsonRpcProtocolFilter std::make_shared(*this, dispatcher_, is_server_); } - ~HttpSseJsonRpcProtocolFilter() = default; + ~HttpSseJsonRpcProtocolFilter() { + // SSE stream connection is closing — drop this session from the + // registry so a POST /callback/{id} that arrives between close and + // destructor doesn't route a response into a dead connection. Runs + // on the dispatcher thread (filters destruct on-thread by contract). + if (sse_registry_ && !sse_session_id_.empty()) { + sse_registry_->removeSession(sse_session_id_); + sse_session_id_.clear(); + } + } // ===== Network Filter Interface ===== @@ -295,6 +409,39 @@ class HttpSseJsonRpcProtocolFilter * recursion! */ network::FilterStatus onWrite(Buffer& data, bool end_stream) override { + // SSE handshake bypass: we're inside a connection().write() call + // made from our own onHeaders (writing "HTTP/1.1 200 OK ... event: + // endpoint" for GET /sse, or "HTTP/1.1 202 Accepted" for POST + // /callback). The bytes are already fully-formed HTTP — skip all + // downstream JSON-RPC / HTTP-codec framing. + if (sse_writing_handshake_) { + return network::FilterStatus::Continue; + } + + // SSE callback routing: the JSON-RPC filter is emitting the response + // for a POST /callback/{id}. We already sent 202 Accepted on that + // POST connection, so we must NOT let these bytes continue down the + // write chain to be framed as HTTP and written back to the POST + // connection. Instead, pull the JSON body out and hand it to the + // registry, which writes it through the matching SSE stream. + if (is_server_ && !sse_callback_session_id_.empty() && data.length() > 0) { + const size_t len = data.length(); + std::string json_data(static_cast(data.linearize(len)), len); + data.drain(len); + + if (sse_registry_) { + sse_registry_->sendResponse(sse_callback_session_id_, json_data); + } else { + GOPHER_LOG_WARN( + "SSE callback response dropped: no registry available (session={})", + sse_callback_session_id_); + } + + // Stop iteration so ConnectionImpl doesn't flush an empty buffer + // as HTTP bytes on the POST connection. + return network::FilterStatus::StopIteration; + } + GOPHER_LOG_DEBUG( "HttpSseJsonRpcProtocolFilter: onWrite called, data_len={}, " "is_server={}, is_sse_mode={}, waiting_for_endpoint={}, " @@ -507,20 +654,131 @@ class HttpSseJsonRpcProtocolFilter // Determine transport mode based on headers if (is_server_) { - // Server: check Accept header for SSE - but DON'T enable SSE mode yet! - // The Accept header just indicates the client SUPPORTS SSE, not that we - // should use it. For JSON-RPC request/response (like initialize), we - // should use normal HTTP with Content-Length. - // SSE mode should only be enabled explicitly when the server wants to - // stream notifications. For now, always use normal HTTP for responses. + // Pull :method and :path out of the pseudo-headers so we can branch + // on the two SSE transport endpoints: GET {sse_path} (open stream) + // and POST /callback/{session_id} (route response via stream). + std::string method = "GET"; + auto method_it = headers.find(":method"); + if (method_it != headers.end()) { + method = method_it->second; + } + + std::string path = "/"; + auto path_it = headers.find(":path"); + if (path_it != headers.end()) { + path = path_it->second; + } else { + // Some codecs surface the request target as "url" rather than the + // HTTP/2-style :path pseudo-header. Accept either. + auto url_it = headers.find("url"); + if (url_it != headers.end()) { + path = url_it->second; + } + } + // Trim query string before matching — the SSE transport doesn't + // use query params on either endpoint and we want /sse?foo=bar to + // still open the stream. + size_t qpos = path.find('?'); + if (qpos != std::string::npos) { + path = path.substr(0, qpos); + } + + // ── GET {configured_sse_path_} → open an SSE stream. + if (method == "GET" && path == configured_sse_path_ && sse_registry_ && + write_callbacks_) { + client_accepts_sse_ = true; + sse_session_id_ = + sse_registry_->registerSession(&write_callbacks_->connection()); + + // Build the callback URL the client will POST future requests + // to. If an external URL is configured (reverse-proxy case) we + // announce an absolute URL so the client doesn't try to guess. + // Otherwise emit a relative path — it resolves relative to the + // SSE URL on the client side and keeps us agnostic to scheme or + // host. + std::string callback_url; + if (!configured_external_url_.empty()) { + std::string base = configured_external_url_; + if (!base.empty() && base.back() == '/') { + base.pop_back(); + } + callback_url = base + "/callback/" + sse_session_id_; + } else { + callback_url = "callback/" + sse_session_id_; + } + + // Write the SSE response prelude + endpoint event straight onto + // the wire. We set sse_writing_handshake_ so our own onWrite + // passes these bytes through untouched — otherwise the write + // filter chain would try to re-frame the raw HTTP bytes as an + // SSE event and corrupt the stream. + std::ostringstream sse_response; + sse_response << "HTTP/1.1 200 OK\r\n"; + sse_response << "Content-Type: text/event-stream\r\n"; + sse_response << "Cache-Control: no-cache\r\n"; + sse_response << "Access-Control-Allow-Origin: *\r\n"; + sse_response << "\r\n"; + sse_response << "event: endpoint\n"; + sse_response << "data: " << callback_url << "\n\n"; + + const std::string response_str = sse_response.str(); + OwnedBuffer response_buffer; + response_buffer.add(response_str.c_str(), response_str.length()); + + sse_writing_handshake_ = true; + write_callbacks_->connection().write(response_buffer, false); + sse_writing_handshake_ = false; + + sse_server_mode_ = true; + is_sse_mode_ = true; + sse_headers_written_ = true; + + GOPHER_LOG_INFO("SSE stream opened: session={} callback_url={}", + sse_session_id_, callback_url); + return; + } + + // ── POST /callback/{session_id} → route the JSON-RPC body + // through the SSE stream registered under {session_id}. We send + // 202 Accepted on this POST connection right away and let the + // body keep flowing into the JSON-RPC filter normally; onWrite + // then intercepts the response before it gets written back to + // this POST connection and redirects it to the SSE stream. + const std::string callback_prefix = "/callback/"; + if (method == "POST" && path.compare(0, callback_prefix.size(), + callback_prefix) == 0) { + sse_callback_session_id_ = path.substr(callback_prefix.size()); + GOPHER_LOG_DEBUG("SSE callback POST: session={}", + sse_callback_session_id_); + + if (write_callbacks_) { + const std::string http_202 = + "HTTP/1.1 202 Accepted\r\n" + "Content-Length: 0\r\n" + "Access-Control-Allow-Origin: *\r\n" + "\r\n"; + OwnedBuffer resp_buf; + resp_buf.add(http_202.c_str(), http_202.length()); + sse_writing_handshake_ = true; + write_callbacks_->connection().write(resp_buf, false); + sse_writing_handshake_ = false; + } + // Keep is_sse_mode_ off on this connection — the body is plain + // JSON-RPC, not an SSE stream. Response routing happens in + // onWrite via sse_callback_session_id_. + is_sse_mode_ = false; + return; + } + + // Non-SSE-transport request (POST /mcp Streamable HTTP, /health, + // /info, etc.). Preserve the existing behavior: check Accept for + // SSE capability but keep plain HTTP mode for request/response. auto accept = headers.find("accept"); if (accept != headers.end() && accept->second.find("text/event-stream") != std::string::npos) { - client_accepts_sse_ = true; // Track that client supports SSE - // But don't set is_sse_mode_ = true here - that would break JSON-RPC + client_accepts_sse_ = true; GOPHER_LOG_DEBUG("HttpSseJsonRpcProtocolFilter: client accepts SSE"); } - // Always use normal HTTP mode for request/response pattern is_sse_mode_ = false; } else { // Client: check Content-Type for SSE @@ -537,6 +795,13 @@ class HttpSseJsonRpcProtocolFilter } void onBody(const std::string& data, bool end_stream) override { + // The long-lived GET /sse request has no request body — but if the + // codec surfaces any trailing bytes we don't want to push them down + // into the JSON-RPC parser. Ignore bodies on the SSE stream + // connection entirely. + if (is_server_ && sse_server_mode_) { + return; + } // Server receives JSON-RPC in request body regardless of SSE mode // SSE mode only affects the response format if (is_server_) { @@ -912,6 +1177,30 @@ class HttpSseJsonRpcProtocolFilter std::string http_host_{"localhost"}; // Default HTTP host for requests bool use_sse_{true}; // True for SSE mode, false for Streamable HTTP + // SSE server transport (only meaningful when is_server_ == true). + std::string configured_sse_path_{"/sse"}; + std::string configured_rpc_path_{"/mcp"}; + std::string configured_external_url_; + // Registry of live SSE session IDs → their stream connections. Shared + // with sibling filter instances on the same factory; null in client + // mode and in server mode when the factory wasn't built with SSE + // server transport (back-compat default constructors). + SseSessionRegistry* sse_registry_{nullptr}; + // This connection serves the long-lived GET /sse stream. + bool sse_server_mode_{false}; + // Pass-through guard: true while we're inside onHeaders writing raw + // HTTP bytes (handshake or 202 Accepted) via connection().write(), so + // our own onWrite doesn't try to re-frame those bytes. + bool sse_writing_handshake_{false}; + // Session ID this connection's SSE stream is registered under. + // Populated when we handle GET /sse, cleared in the destructor. + std::string sse_session_id_; + // Session ID parsed from an incoming POST /callback/{id}. Non-empty + // means onWrite should redirect the response through the SSE stream + // registered under this ID instead of writing back to the POST + // connection. + std::string sse_callback_session_id_; + // SSE endpoint negotiation (client mode only) bool waiting_for_sse_endpoint_{false}; // Waiting for "endpoint" SSE event std::vector @@ -973,6 +1262,30 @@ void HttpSseFilterChainFactory::sendHttpResponse( // ===== Factory Implementation ===== +HttpSseFilterChainFactory::HttpSseFilterChainFactory( + event::Dispatcher& dispatcher, + McpProtocolCallbacks& message_callbacks, + bool is_server, + const std::string& http_path, + const std::string& http_host, + bool use_sse, + const std::string& sse_path, + const std::string& rpc_path, + const std::string& external_url) + : dispatcher_(dispatcher), + message_callbacks_(message_callbacks), + is_server_(is_server), + http_path_(http_path), + http_host_(http_host), + use_sse_(use_sse), + sse_path_(sse_path), + rpc_path_(rpc_path), + external_url_(external_url) {} + +// Out-of-line destructor so the unique_ptr member +// can see the complete type from this translation unit. +HttpSseFilterChainFactory::~HttpSseFilterChainFactory() = default; + bool HttpSseFilterChainFactory::createFilterChain( network::FilterManager& filter_manager) const { // Following production pattern: create filters in order @@ -1025,12 +1338,22 @@ bool HttpSseFilterChainFactory::createFilterChain( // Routing is now integrated into the combined filter // No separate routing filter needed + // Lazily construct the SSE session registry on the first server-side + // filter chain creation. Client-mode factories never touch it, and + // back-compat server-mode factories that don't use the SSE transport + // still carry a registry — it just sits empty because registerSession + // is only called from GET {configured_sse_path_}. + if (is_server_ && !sse_registry_) { + sse_registry_.reset(new SseSessionRegistry(dispatcher_)); + } + // Create the combined protocol filter // Pass the route registration callback so custom HTTP routes can be // registered auto combined_filter = std::make_shared( dispatcher_, message_callbacks_, is_server_, http_path_, http_host_, - use_sse_, route_registration_callback_); + use_sse_, route_registration_callback_, sse_path_, rpc_path_, + external_url_, sse_registry_.get()); // Add as both read and write filter filter_manager.addReadFilter(combined_filter); From 603361c85cdd07e5bd5734714de41ccd75730c28 Mon Sep 17 00:00:00 2001 From: gophergogo Date: Sun, 19 Apr 2026 17:05:42 -0700 Subject: [PATCH 4/5] Wire McpServerConfig endpoint paths into HttpSseFilterChainFactory MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit McpServerConfig already carries http_sse_path, http_rpc_path, and the new external_url (added earlier in this stack), but the fallback "default HTTP/SSE filter chain factory" construction path was still building the factory with no path arguments — so the SSE server transport was hardcoded to /sse and /mcp with no external URL, regardless of what the operator configured. Forward all three fields through the ctor. Also set http_path to http_rpc_path explicitly so a server that uses this fallback path surfaces a consistent RPC endpoint; http_host stays "localhost" because it's the client-mode Host header and irrelevant in server mode. Note: upstream's version of this change also rewrote the active_connections_ cleanup on close to post-based deferral, but the PR #212 crash-fix stack already replaced that code with Dispatcher::deferredDelete, which is the correct primitive here and handles the use-after-free window that post() only partially covers. Intentionally skipping that part of the upstream diff. --- src/server/mcp_server.cc | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/src/server/mcp_server.cc b/src/server/mcp_server.cc index 8abd2eef..fa4e69ce 100644 --- a/src/server/mcp_server.cc +++ b/src/server/mcp_server.cc @@ -217,9 +217,23 @@ void McpServer::performListen() { // backward compatibility GOPHER_LOG_INFO("Using default HTTP/SSE filter chain factory"); + // Pass the configured endpoint paths (and optional external_url + // for reverse-proxy deployments) through to the filter chain so + // the SSE server transport advertises callbacks under whatever + // paths the operator picked in McpServerConfig. The trailing + // defaults of the ctor cover use_sse and the client-mode + // http_path/http_host — those are unused in server mode but + // still positional in the signature. auto http_sse_factory = std::make_shared( - *main_dispatcher_, *protocol_callbacks_); + *main_dispatcher_, *protocol_callbacks_, + /*is_server=*/true, + /*http_path=*/config_.http_rpc_path, + /*http_host=*/"localhost", + /*use_sse=*/true, + /*sse_path=*/config_.http_sse_path, + /*rpc_path=*/config_.http_rpc_path, + /*external_url=*/config_.external_url); // Add filter factories from config (e.g., auth filters) // This follows the existing FilterFactoryCb pattern From a8b307ba1f7df276381e54cac461630e20a93b61 Mon Sep 17 00:00:00 2001 From: gophergogo Date: Sun, 19 Apr 2026 17:07:20 -0700 Subject: [PATCH 5/5] Match POST /callback/{id} under reverse-proxy path prefixes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a server sits behind a reverse proxy that terminates TLS and rewrites paths (Traefik, nginx, an ingress controller), external_url is typically set to the full proxy-side URL — e.g. https://gateway.example.com/v1/mcp/gateways/abc123. The endpoint event then announces the callback URL under that prefix: data: https://gateway.example.com/v1/mcp/gateways/abc123/callback/client_1 and the client POSTs to /v1/mcp/gateways/abc123/callback/client_1. The proxy forwards the full path through to the server, so the incoming request's :path does not start with /callback/. Switch the match from "path begins with /callback/" to "path contains /callback/ anywhere" (rfind), and strip through the last /callback/ occurrence to recover the session ID from the tail. Direct (non-proxy) deployments still work because /callback/{id} has only one /callback/ and rfind lands at position 0. Skipping the request-log line from the upstream version of this fix — it's debug noise, not part of the routing change. --- src/filter/http_sse_filter_chain_factory.cc | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/filter/http_sse_filter_chain_factory.cc b/src/filter/http_sse_filter_chain_factory.cc index ac630d79..c5bad153 100644 --- a/src/filter/http_sse_filter_chain_factory.cc +++ b/src/filter/http_sse_filter_chain_factory.cc @@ -738,16 +738,23 @@ class HttpSseJsonRpcProtocolFilter return; } - // ── POST /callback/{session_id} → route the JSON-RPC body + // ── POST .../callback/{session_id} → route the JSON-RPC body // through the SSE stream registered under {session_id}. We send // 202 Accepted on this POST connection right away and let the // body keep flowing into the JSON-RPC filter normally; onWrite // then intercepts the response before it gets written back to // this POST connection and redirects it to the SSE stream. + // + // Use rfind to accept a path prefix so deployments behind a + // reverse proxy still match. If external_url announces a + // callback at /v1/mcp/gateways/xyz/callback/client_1 and the + // proxy passes that full path through, we still want to strip + // everything up to and including /callback/ and take the + // session ID from the tail. const std::string callback_prefix = "/callback/"; - if (method == "POST" && path.compare(0, callback_prefix.size(), - callback_prefix) == 0) { - sse_callback_session_id_ = path.substr(callback_prefix.size()); + const auto cb_pos = path.rfind(callback_prefix); + if (method == "POST" && cb_pos != std::string::npos) { + sse_callback_session_id_ = path.substr(cb_pos + callback_prefix.size()); GOPHER_LOG_DEBUG("SSE callback POST: session={}", sse_callback_session_id_);