diff --git a/include/mcp/filter/http_sse_filter_chain_factory.h b/include/mcp/filter/http_sse_filter_chain_factory.h index 0f67c9f1..4d5c9b2c 100644 --- a/include/mcp/filter/http_sse_filter_chain_factory.h +++ b/include/mcp/filter/http_sse_filter_chain_factory.h @@ -18,6 +18,7 @@ class McpProtocolCallbacks; namespace filter { class HttpRoutingFilter; class MetricsFilter; +class SseSessionRegistry; } // namespace filter } // namespace mcp @@ -66,19 +67,28 @@ class HttpSseFilterChainFactory : public network::FilterChainFactory { * @param http_host HTTP Host header value for client mode * @param use_sse True for SSE mode (GET /sse first), false for Streamable * HTTP (direct POST) + * @param sse_path Server-side SSE endpoint path (e.g., "/sse"). Only + * meaningful when is_server=true. + * @param rpc_path Server-side JSON-RPC endpoint path (e.g., "/mcp"). Only + * meaningful when is_server=true. + * @param external_url Absolute URL the server is reachable at from the + * client's perspective. Used to build the endpoint-event + * callback URL advertised on GET /sse. Leave empty to + * derive the URL from the incoming Host header. */ HttpSseFilterChainFactory(event::Dispatcher& dispatcher, McpProtocolCallbacks& message_callbacks, bool is_server = true, const std::string& http_path = "/rpc", const std::string& http_host = "localhost", - bool use_sse = true) - : dispatcher_(dispatcher), - message_callbacks_(message_callbacks), - is_server_(is_server), - http_path_(http_path), - http_host_(http_host), - use_sse_(use_sse) {} + bool use_sse = true, + const std::string& sse_path = "/sse", + const std::string& rpc_path = "/mcp", + const std::string& external_url = ""); + + // Destructor defined out-of-line so the unique_ptr + // member can use the incomplete forward-declared type in this header. + ~HttpSseFilterChainFactory(); /** * Create filter chain for the connection @@ -167,11 +177,24 @@ class HttpSseFilterChainFactory : public network::FilterChainFactory { event::Dispatcher& dispatcher_; McpProtocolCallbacks& message_callbacks_; bool is_server_; - std::string http_path_; // HTTP request path for client mode - std::string http_host_; // HTTP Host header for client mode - bool use_sse_; // True for SSE mode, false for Streamable HTTP + std::string http_path_; // HTTP request path for client mode + std::string http_host_; // HTTP Host header for client mode + bool use_sse_; // True for SSE mode, false for Streamable HTTP + std::string sse_path_; // Server-side SSE endpoint path (e.g., "/sse") + std::string rpc_path_; // Server-side JSON-RPC endpoint path (e.g., "/mcp") + std::string external_url_; // External URL for absolute SSE callback URLs mutable bool enable_metrics_ = true; // Enable metrics by default + // SSE session registry — maps session IDs to the connections that are + // streaming SSE to each client, so POST /callback/{id} handlers can + // route a JSON-RPC response back through the correct stream. Lazily + // constructed on the first server-side filter chain creation; stays + // null for client-mode factories. Owned here (not a global singleton) + // so each McpServer instance has an isolated registry and lifetime is + // bounded by the factory, not process lifetime. Mutable because + // createFilterChain is const on the base class. + mutable std::unique_ptr sse_registry_; + // Store filters for lifetime management mutable std::vector filters_; diff --git a/include/mcp/server/mcp_server.h b/include/mcp/server/mcp_server.h index acaa1eaf..bce793f2 100644 --- a/include/mcp/server/mcp_server.h +++ b/include/mcp/server/mcp_server.h @@ -88,8 +88,15 @@ struct McpServerConfig : public application::ApplicationBase::Config { // HTTP/SSE specific configuration std::string http_rpc_path = "/rpc"; // Path for JSON-RPC over HTTP - std::string http_sse_path = "/events"; // Path for SSE event stream + std::string http_sse_path = "/sse"; // Path for SSE event stream std::string http_health_path = "/health"; // Path for health check endpoint + // Absolute URL the server is reachable at from the client's perspective + // (scheme + host + port + optional path prefix). Used to build the + // endpoint-event callback URL advertised on GET /sse. Leave empty to + // have the server derive a URL from the incoming Host header; set + // explicitly when the server sits behind a reverse proxy that rewrites + // scheme or path so clients don't try to POST back to an internal URL. + std::string external_url; // Session management size_t max_sessions = 100; diff --git a/src/filter/http_sse_filter_chain_factory.cc b/src/filter/http_sse_filter_chain_factory.cc index 8b205218..c5bad153 100644 --- a/src/filter/http_sse_filter_chain_factory.cc +++ b/src/filter/http_sse_filter_chain_factory.cc @@ -50,8 +50,12 @@ #include "mcp/filter/http_sse_filter_chain_factory.h" +#include +#include #include +#include #include +#include #include "mcp/filter/http_codec_filter.h" #include "mcp/filter/http_routing_filter.h" @@ -66,6 +70,99 @@ namespace mcp { namespace filter { +// ═══════════════════════════════════════════════════════════════════════════ +// SseSessionRegistry — dispatcher-owned map of SSE session IDs to the +// network::Connection* that is streaming SSE back to each client. +// +// MCP SSE transport splits the request/response pair across two TCP +// connections: +// 1. A long-lived GET /sse stream that the client leaves open for +// server-sent events. The server registers this connection under a +// fresh session ID and announces a POST callback URL containing +// that ID in the "endpoint" event. +// 2. Short POST /callback/{session_id} connections — one per outbound +// JSON-RPC request from the client. The server returns 202 Accepted +// immediately and then routes the actual JSON-RPC response through +// the SSE connection registered under that session ID. +// +// The registry is what lets the POST handler find the SSE connection it +// has to route the response through. It lives on the filter chain +// factory (one per McpServer), not as a process-wide singleton, so: +// - Independent McpServer instances in the same process do not share +// session IDs or leak into each other. +// - Registry lifetime is bounded by the factory, which is owned by +// McpServer — no global state to reason about at shutdown. +// +// Threading model: +// - The MCP server runs on a single dispatcher thread. All filter +// callbacks (onHeaders, onWrite, filter destructor) fire on that +// thread, so registry mutations are naturally single-threaded. +// - Every public method asserts isThreadSafe() so a future move to a +// worker-thread model fails loudly instead of silently corrupting +// the map. +// ═══════════════════════════════════════════════════════════════════════════ +class SseSessionRegistry { + public: + explicit SseSessionRegistry(event::Dispatcher& dispatcher) + : dispatcher_(dispatcher) {} + + // Record an SSE stream connection and hand back a stable session ID. + // Caller is responsible for calling removeSession() when the stream + // closes (filter destructor does this). + std::string registerSession(network::Connection* connection) { + assert(dispatcher_.isThreadSafe() && + "SseSessionRegistry::registerSession off-dispatcher-thread"); + std::string session_id = "client_" + std::to_string(next_id_++); + sessions_[session_id] = connection; + GOPHER_LOG_INFO("SSE session registered: {} (total={})", session_id, + sessions_.size()); + return session_id; + } + + // Drop a session. Safe to call with an unknown ID (no-op). Typically + // invoked from the SSE filter's destructor when the stream connection + // closes. + void removeSession(const std::string& session_id) { + assert(dispatcher_.isThreadSafe() && + "SseSessionRegistry::removeSession off-dispatcher-thread"); + if (sessions_.erase(session_id) > 0) { + GOPHER_LOG_INFO("SSE session removed: {} (total={})", session_id, + sessions_.size()); + } + } + + // Write a JSON-RPC response through the SSE stream registered under + // session_id. Returns true if the session existed and the write was + // handed to the connection (the actual wire bytes are framed into a + // `data: ...\n\n` SSE event by the SSE codec filter further down that + // connection's write chain). Returns false if the session has gone + // away (e.g. client already disconnected while we were handling the + // POST), in which case the caller should drop the response on the + // floor rather than pretending it was delivered. + bool sendResponse(const std::string& session_id, + const std::string& json_data) { + assert(dispatcher_.isThreadSafe() && + "SseSessionRegistry::sendResponse off-dispatcher-thread"); + auto it = sessions_.find(session_id); + if (it == sessions_.end()) { + GOPHER_LOG_WARN("SSE session not found for response routing: {}", + session_id); + return false; + } + OwnedBuffer buffer; + buffer.add(json_data.c_str(), json_data.length()); + it->second->write(buffer, /*end_stream=*/false); + GOPHER_LOG_DEBUG("SSE response routed to session {} ({} bytes)", + session_id, json_data.size()); + return true; + } + + private: + event::Dispatcher& dispatcher_; + std::map sessions_; + uint64_t next_id_{1}; +}; + // Forward declaration class HttpSseJsonRpcProtocolFilter; @@ -140,13 +237,21 @@ class HttpSseJsonRpcProtocolFilter const std::string& http_path = "/rpc", const std::string& http_host = "localhost", bool use_sse = true, - const HttpRouteRegistrationCallback& route_callback = nullptr) + const HttpRouteRegistrationCallback& route_callback = nullptr, + const std::string& configured_sse_path = "/sse", + const std::string& configured_rpc_path = "/mcp", + const std::string& configured_external_url = "", + SseSessionRegistry* sse_registry = nullptr) : dispatcher_(dispatcher), mcp_callbacks_(mcp_callbacks), is_server_(is_server), http_path_(http_path), http_host_(http_host), use_sse_(use_sse), + configured_sse_path_(configured_sse_path), + configured_rpc_path_(configured_rpc_path), + configured_external_url_(configured_external_url), + sse_registry_(sse_registry), route_registration_callback_(route_callback) { // Following production pattern: all operations for this filter // happen in the single dispatcher thread @@ -188,7 +293,16 @@ class HttpSseJsonRpcProtocolFilter std::make_shared(*this, dispatcher_, is_server_); } - ~HttpSseJsonRpcProtocolFilter() = default; + ~HttpSseJsonRpcProtocolFilter() { + // SSE stream connection is closing — drop this session from the + // registry so a POST /callback/{id} that arrives between close and + // destructor doesn't route a response into a dead connection. Runs + // on the dispatcher thread (filters destruct on-thread by contract). + if (sse_registry_ && !sse_session_id_.empty()) { + sse_registry_->removeSession(sse_session_id_); + sse_session_id_.clear(); + } + } // ===== Network Filter Interface ===== @@ -295,6 +409,39 @@ class HttpSseJsonRpcProtocolFilter * recursion! */ network::FilterStatus onWrite(Buffer& data, bool end_stream) override { + // SSE handshake bypass: we're inside a connection().write() call + // made from our own onHeaders (writing "HTTP/1.1 200 OK ... event: + // endpoint" for GET /sse, or "HTTP/1.1 202 Accepted" for POST + // /callback). The bytes are already fully-formed HTTP — skip all + // downstream JSON-RPC / HTTP-codec framing. + if (sse_writing_handshake_) { + return network::FilterStatus::Continue; + } + + // SSE callback routing: the JSON-RPC filter is emitting the response + // for a POST /callback/{id}. We already sent 202 Accepted on that + // POST connection, so we must NOT let these bytes continue down the + // write chain to be framed as HTTP and written back to the POST + // connection. Instead, pull the JSON body out and hand it to the + // registry, which writes it through the matching SSE stream. + if (is_server_ && !sse_callback_session_id_.empty() && data.length() > 0) { + const size_t len = data.length(); + std::string json_data(static_cast(data.linearize(len)), len); + data.drain(len); + + if (sse_registry_) { + sse_registry_->sendResponse(sse_callback_session_id_, json_data); + } else { + GOPHER_LOG_WARN( + "SSE callback response dropped: no registry available (session={})", + sse_callback_session_id_); + } + + // Stop iteration so ConnectionImpl doesn't flush an empty buffer + // as HTTP bytes on the POST connection. + return network::FilterStatus::StopIteration; + } + GOPHER_LOG_DEBUG( "HttpSseJsonRpcProtocolFilter: onWrite called, data_len={}, " "is_server={}, is_sse_mode={}, waiting_for_endpoint={}, " @@ -507,20 +654,138 @@ class HttpSseJsonRpcProtocolFilter // Determine transport mode based on headers if (is_server_) { - // Server: check Accept header for SSE - but DON'T enable SSE mode yet! - // The Accept header just indicates the client SUPPORTS SSE, not that we - // should use it. For JSON-RPC request/response (like initialize), we - // should use normal HTTP with Content-Length. - // SSE mode should only be enabled explicitly when the server wants to - // stream notifications. For now, always use normal HTTP for responses. + // Pull :method and :path out of the pseudo-headers so we can branch + // on the two SSE transport endpoints: GET {sse_path} (open stream) + // and POST /callback/{session_id} (route response via stream). + std::string method = "GET"; + auto method_it = headers.find(":method"); + if (method_it != headers.end()) { + method = method_it->second; + } + + std::string path = "/"; + auto path_it = headers.find(":path"); + if (path_it != headers.end()) { + path = path_it->second; + } else { + // Some codecs surface the request target as "url" rather than the + // HTTP/2-style :path pseudo-header. Accept either. + auto url_it = headers.find("url"); + if (url_it != headers.end()) { + path = url_it->second; + } + } + // Trim query string before matching — the SSE transport doesn't + // use query params on either endpoint and we want /sse?foo=bar to + // still open the stream. + size_t qpos = path.find('?'); + if (qpos != std::string::npos) { + path = path.substr(0, qpos); + } + + // ── GET {configured_sse_path_} → open an SSE stream. + if (method == "GET" && path == configured_sse_path_ && sse_registry_ && + write_callbacks_) { + client_accepts_sse_ = true; + sse_session_id_ = + sse_registry_->registerSession(&write_callbacks_->connection()); + + // Build the callback URL the client will POST future requests + // to. If an external URL is configured (reverse-proxy case) we + // announce an absolute URL so the client doesn't try to guess. + // Otherwise emit a relative path — it resolves relative to the + // SSE URL on the client side and keeps us agnostic to scheme or + // host. + std::string callback_url; + if (!configured_external_url_.empty()) { + std::string base = configured_external_url_; + if (!base.empty() && base.back() == '/') { + base.pop_back(); + } + callback_url = base + "/callback/" + sse_session_id_; + } else { + callback_url = "callback/" + sse_session_id_; + } + + // Write the SSE response prelude + endpoint event straight onto + // the wire. We set sse_writing_handshake_ so our own onWrite + // passes these bytes through untouched — otherwise the write + // filter chain would try to re-frame the raw HTTP bytes as an + // SSE event and corrupt the stream. + std::ostringstream sse_response; + sse_response << "HTTP/1.1 200 OK\r\n"; + sse_response << "Content-Type: text/event-stream\r\n"; + sse_response << "Cache-Control: no-cache\r\n"; + sse_response << "Access-Control-Allow-Origin: *\r\n"; + sse_response << "\r\n"; + sse_response << "event: endpoint\n"; + sse_response << "data: " << callback_url << "\n\n"; + + const std::string response_str = sse_response.str(); + OwnedBuffer response_buffer; + response_buffer.add(response_str.c_str(), response_str.length()); + + sse_writing_handshake_ = true; + write_callbacks_->connection().write(response_buffer, false); + sse_writing_handshake_ = false; + + sse_server_mode_ = true; + is_sse_mode_ = true; + sse_headers_written_ = true; + + GOPHER_LOG_INFO("SSE stream opened: session={} callback_url={}", + sse_session_id_, callback_url); + return; + } + + // ── POST .../callback/{session_id} → route the JSON-RPC body + // through the SSE stream registered under {session_id}. We send + // 202 Accepted on this POST connection right away and let the + // body keep flowing into the JSON-RPC filter normally; onWrite + // then intercepts the response before it gets written back to + // this POST connection and redirects it to the SSE stream. + // + // Use rfind to accept a path prefix so deployments behind a + // reverse proxy still match. If external_url announces a + // callback at /v1/mcp/gateways/xyz/callback/client_1 and the + // proxy passes that full path through, we still want to strip + // everything up to and including /callback/ and take the + // session ID from the tail. + const std::string callback_prefix = "/callback/"; + const auto cb_pos = path.rfind(callback_prefix); + if (method == "POST" && cb_pos != std::string::npos) { + sse_callback_session_id_ = path.substr(cb_pos + callback_prefix.size()); + GOPHER_LOG_DEBUG("SSE callback POST: session={}", + sse_callback_session_id_); + + if (write_callbacks_) { + const std::string http_202 = + "HTTP/1.1 202 Accepted\r\n" + "Content-Length: 0\r\n" + "Access-Control-Allow-Origin: *\r\n" + "\r\n"; + OwnedBuffer resp_buf; + resp_buf.add(http_202.c_str(), http_202.length()); + sse_writing_handshake_ = true; + write_callbacks_->connection().write(resp_buf, false); + sse_writing_handshake_ = false; + } + // Keep is_sse_mode_ off on this connection — the body is plain + // JSON-RPC, not an SSE stream. Response routing happens in + // onWrite via sse_callback_session_id_. + is_sse_mode_ = false; + return; + } + + // Non-SSE-transport request (POST /mcp Streamable HTTP, /health, + // /info, etc.). Preserve the existing behavior: check Accept for + // SSE capability but keep plain HTTP mode for request/response. auto accept = headers.find("accept"); if (accept != headers.end() && accept->second.find("text/event-stream") != std::string::npos) { - client_accepts_sse_ = true; // Track that client supports SSE - // But don't set is_sse_mode_ = true here - that would break JSON-RPC + client_accepts_sse_ = true; GOPHER_LOG_DEBUG("HttpSseJsonRpcProtocolFilter: client accepts SSE"); } - // Always use normal HTTP mode for request/response pattern is_sse_mode_ = false; } else { // Client: check Content-Type for SSE @@ -537,6 +802,13 @@ class HttpSseJsonRpcProtocolFilter } void onBody(const std::string& data, bool end_stream) override { + // The long-lived GET /sse request has no request body — but if the + // codec surfaces any trailing bytes we don't want to push them down + // into the JSON-RPC parser. Ignore bodies on the SSE stream + // connection entirely. + if (is_server_ && sse_server_mode_) { + return; + } // Server receives JSON-RPC in request body regardless of SSE mode // SSE mode only affects the response format if (is_server_) { @@ -912,6 +1184,30 @@ class HttpSseJsonRpcProtocolFilter std::string http_host_{"localhost"}; // Default HTTP host for requests bool use_sse_{true}; // True for SSE mode, false for Streamable HTTP + // SSE server transport (only meaningful when is_server_ == true). + std::string configured_sse_path_{"/sse"}; + std::string configured_rpc_path_{"/mcp"}; + std::string configured_external_url_; + // Registry of live SSE session IDs → their stream connections. Shared + // with sibling filter instances on the same factory; null in client + // mode and in server mode when the factory wasn't built with SSE + // server transport (back-compat default constructors). + SseSessionRegistry* sse_registry_{nullptr}; + // This connection serves the long-lived GET /sse stream. + bool sse_server_mode_{false}; + // Pass-through guard: true while we're inside onHeaders writing raw + // HTTP bytes (handshake or 202 Accepted) via connection().write(), so + // our own onWrite doesn't try to re-frame those bytes. + bool sse_writing_handshake_{false}; + // Session ID this connection's SSE stream is registered under. + // Populated when we handle GET /sse, cleared in the destructor. + std::string sse_session_id_; + // Session ID parsed from an incoming POST /callback/{id}. Non-empty + // means onWrite should redirect the response through the SSE stream + // registered under this ID instead of writing back to the POST + // connection. + std::string sse_callback_session_id_; + // SSE endpoint negotiation (client mode only) bool waiting_for_sse_endpoint_{false}; // Waiting for "endpoint" SSE event std::vector @@ -973,6 +1269,30 @@ void HttpSseFilterChainFactory::sendHttpResponse( // ===== Factory Implementation ===== +HttpSseFilterChainFactory::HttpSseFilterChainFactory( + event::Dispatcher& dispatcher, + McpProtocolCallbacks& message_callbacks, + bool is_server, + const std::string& http_path, + const std::string& http_host, + bool use_sse, + const std::string& sse_path, + const std::string& rpc_path, + const std::string& external_url) + : dispatcher_(dispatcher), + message_callbacks_(message_callbacks), + is_server_(is_server), + http_path_(http_path), + http_host_(http_host), + use_sse_(use_sse), + sse_path_(sse_path), + rpc_path_(rpc_path), + external_url_(external_url) {} + +// Out-of-line destructor so the unique_ptr member +// can see the complete type from this translation unit. +HttpSseFilterChainFactory::~HttpSseFilterChainFactory() = default; + bool HttpSseFilterChainFactory::createFilterChain( network::FilterManager& filter_manager) const { // Following production pattern: create filters in order @@ -1025,12 +1345,22 @@ bool HttpSseFilterChainFactory::createFilterChain( // Routing is now integrated into the combined filter // No separate routing filter needed + // Lazily construct the SSE session registry on the first server-side + // filter chain creation. Client-mode factories never touch it, and + // back-compat server-mode factories that don't use the SSE transport + // still carry a registry — it just sits empty because registerSession + // is only called from GET {configured_sse_path_}. + if (is_server_ && !sse_registry_) { + sse_registry_.reset(new SseSessionRegistry(dispatcher_)); + } + // Create the combined protocol filter // Pass the route registration callback so custom HTTP routes can be // registered auto combined_filter = std::make_shared( dispatcher_, message_callbacks_, is_server_, http_path_, http_host_, - use_sse_, route_registration_callback_); + use_sse_, route_registration_callback_, sse_path_, rpc_path_, + external_url_, sse_registry_.get()); // Add as both read and write filter filter_manager.addReadFilter(combined_filter); diff --git a/src/server/mcp_server.cc b/src/server/mcp_server.cc index 8abd2eef..fa4e69ce 100644 --- a/src/server/mcp_server.cc +++ b/src/server/mcp_server.cc @@ -217,9 +217,23 @@ void McpServer::performListen() { // backward compatibility GOPHER_LOG_INFO("Using default HTTP/SSE filter chain factory"); + // Pass the configured endpoint paths (and optional external_url + // for reverse-proxy deployments) through to the filter chain so + // the SSE server transport advertises callbacks under whatever + // paths the operator picked in McpServerConfig. The trailing + // defaults of the ctor cover use_sse and the client-mode + // http_path/http_host — those are unused in server mode but + // still positional in the signature. auto http_sse_factory = std::make_shared( - *main_dispatcher_, *protocol_callbacks_); + *main_dispatcher_, *protocol_callbacks_, + /*is_server=*/true, + /*http_path=*/config_.http_rpc_path, + /*http_host=*/"localhost", + /*use_sse=*/true, + /*sse_path=*/config_.http_sse_path, + /*rpc_path=*/config_.http_rpc_path, + /*external_url=*/config_.external_url); // Add filter factories from config (e.g., auth filters) // This follows the existing FilterFactoryCb pattern