Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,7 @@ set(MCP_CORE_SOURCES
src/filter/http_codec_state_machine.cc
src/filter/sse_codec_state_machine.cc
src/filter/http_sse_filter_chain_factory.cc
src/filter/sse_session_registry.cc
src/filter/stdio_filter_chain_factory.cc
src/filter/json_rpc_protocol_filter.cc
src/filter/protocol_detection_filter_chain_factory.cc
Expand Down
37 changes: 30 additions & 7 deletions include/mcp/filter/http_sse_filter_chain_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class McpProtocolCallbacks;
namespace filter {
class HttpRoutingFilter;
class MetricsFilter;
class SseSessionRegistry;
} // namespace filter
} // namespace mcp

Expand Down Expand Up @@ -66,19 +67,28 @@ class HttpSseFilterChainFactory : public network::FilterChainFactory {
* @param http_host HTTP Host header value for client mode
* @param use_sse True for SSE mode (GET /sse first), false for Streamable
* HTTP (direct POST)
* @param sse_path Server-side SSE endpoint path (e.g., "/sse"). Only
* meaningful when is_server=true.
* @param rpc_path Server-side JSON-RPC endpoint path (e.g., "/mcp"). Only
* meaningful when is_server=true.
* @param external_url Absolute URL the server is reachable at from the
* client's perspective. Used to build the endpoint-event
* callback URL advertised on GET /sse. Leave empty to
* derive the URL from the incoming Host header.
*/
HttpSseFilterChainFactory(event::Dispatcher& dispatcher,
McpProtocolCallbacks& message_callbacks,
bool is_server = true,
const std::string& http_path = "/rpc",
const std::string& http_host = "localhost",
bool use_sse = true)
: dispatcher_(dispatcher),
message_callbacks_(message_callbacks),
is_server_(is_server),
http_path_(http_path),
http_host_(http_host),
use_sse_(use_sse) {}
bool use_sse = true,
const std::string& sse_path = "/sse",
const std::string& rpc_path = "/mcp",
const std::string& external_url = "");

// Destructor defined out-of-line so the unique_ptr<SseSessionRegistry>
// member can use the incomplete forward-declared type in this header.
~HttpSseFilterChainFactory();

/**
* Create filter chain for the connection
Expand Down Expand Up @@ -170,8 +180,21 @@ class HttpSseFilterChainFactory : public network::FilterChainFactory {
std::string http_path_; // HTTP request path for client mode
std::string http_host_; // HTTP Host header for client mode
bool use_sse_; // True for SSE mode, false for Streamable HTTP
std::string sse_path_; // Server-side SSE endpoint path (e.g., "/sse")
std::string rpc_path_; // Server-side JSON-RPC endpoint path (e.g., "/mcp")
std::string external_url_; // External URL for absolute SSE callback URLs
mutable bool enable_metrics_ = true; // Enable metrics by default

// SSE session registry — maps session IDs to the connections that are
// streaming SSE to each client, so POST /callback/{id} handlers can
// route a JSON-RPC response back through the correct stream. Lazily
// constructed on the first server-side filter chain creation; stays
// null for client-mode factories. Owned here (not a global singleton)
// so each McpServer instance has an isolated registry and lifetime is
// bounded by the factory, not process lifetime. Mutable because
// createFilterChain is const on the base class.
mutable std::unique_ptr<SseSessionRegistry> sse_registry_;

// Store filters for lifetime management
mutable std::vector<network::FilterSharedPtr> filters_;

Expand Down
80 changes: 80 additions & 0 deletions include/mcp/filter/sse_session_registry.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
#pragma once

#include <cstdint>
#include <map>
#include <string>

#include "mcp/event/event_loop.h"
#include "mcp/network/connection.h"

namespace mcp {
namespace filter {

/**
* SseSessionRegistry — dispatcher-owned map of SSE session IDs to the
* network::Connection* streaming SSE back to each client.
*
* MCP SSE transport splits a request/response pair across two TCP
* connections on the server:
* 1. A long-lived GET /sse stream the client leaves open for
* server-sent events. The server registers this connection under a
* fresh session ID and announces a POST callback URL containing
* that ID in the "endpoint" event.
* 2. Short POST /callback/{session_id} connections — one per outbound
* JSON-RPC request. The server returns 202 Accepted immediately and
* routes the JSON-RPC response through the SSE connection registered
* under the matching session ID.
*
* The registry is what lets the POST handler find the SSE connection it
* must route the response through. It is owned by the HTTP+SSE filter
* chain factory (one per McpServer), not a process-wide singleton:
* - Independent McpServer instances in the same process do not share
* session IDs or leak into each other.
* - Lifetime is bounded by the factory, which is owned by McpServer —
* no global state to reason about at shutdown.
*
* Threading model:
* - The MCP server runs on a single dispatcher thread. All filter
* callbacks (onHeaders, onWrite, filter destructor) fire on that
* thread, so registry mutations are naturally single-threaded.
* - Every public method asserts isThreadSafe() so a future move to a
* worker-thread model fails loudly instead of silently corrupting
* the map.
*/
class SseSessionRegistry {
public:
explicit SseSessionRegistry(event::Dispatcher& dispatcher);

// Record an SSE stream connection and hand back a stable session ID.
// Caller must call removeSession() when the stream closes — the SSE
// filter's destructor does this.
std::string registerSession(network::Connection* connection);

// Drop a session. Safe to call with an unknown ID (no-op).
void removeSession(const std::string& session_id);

// Write a JSON-RPC response through the SSE stream registered under
// session_id. Returns true if the session existed and the write was
// handed to the connection (the SSE codec filter further down the
// write chain frames the bytes into a `data: ...\n\n` SSE event).
// Returns false if the session has gone away (e.g. client already
// disconnected); the caller should drop the response rather than
// pretending it was delivered.
bool sendResponse(const std::string& session_id,
const std::string& json_data);

// Test / introspection: current session count. Asserts dispatcher
// thread to match the rest of the API.
size_t sessionCount() const;

// Test / introspection: whether a given ID is currently registered.
bool hasSession(const std::string& session_id) const;

private:
event::Dispatcher& dispatcher_;
std::map<std::string, network::Connection*> sessions_;
uint64_t next_id_{1};
};

} // namespace filter
} // namespace mcp
9 changes: 8 additions & 1 deletion include/mcp/server/mcp_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,15 @@ struct McpServerConfig : public application::ApplicationBase::Config {

// HTTP/SSE specific configuration
std::string http_rpc_path = "/rpc"; // Path for JSON-RPC over HTTP
std::string http_sse_path = "/events"; // Path for SSE event stream
std::string http_sse_path = "/sse"; // Path for SSE event stream
std::string http_health_path = "/health"; // Path for health check endpoint
// Absolute URL the server is reachable at from the client's perspective
// (scheme + host + port + optional path prefix). Used to build the
// endpoint-event callback URL advertised on GET /sse. Leave empty to
// have the server derive a URL from the incoming Host header; set
// explicitly when the server sits behind a reverse proxy that rewrites
// scheme or path so clients don't try to POST back to an internal URL.
std::string external_url;

// Session management
size_t max_sessions = 100;
Expand Down
Loading
Loading