Skip to content

Commit 5dcae8f

Browse files
gophergogogophergogo
authored andcommitted
Extract SseSessionRegistry into its own translation unit (#215)
Pull the SSE session-id-to-connection map out of the HTTP+SSE filter chain factory's .cc and into include/mcp/filter/sse_session_registry.h + src/filter/sse_session_registry.cc. No behavior change; the factory includes the header and uses the class exactly as before. The extraction is driven by testability: as a file-local class the registry could only be exercised indirectly through a full filter chain, which meant there was no narrow unit test for the tiny piece of shared state responsible for routing every POST /callback response. Giving it a real header also makes the class reusable if another MCP transport ever needs the same session-id-to-stream mapping pattern (the registry itself is transport-agnostic). Also add two small introspection accessors (sessionCount, hasSession) that tests use to assert state after a sequence of register/removeSession calls, without reaching into private members.
1 parent 016a390 commit 5dcae8f

4 files changed

Lines changed: 150 additions & 92 deletions

File tree

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,7 @@ set(MCP_CORE_SOURCES
449449
src/filter/http_codec_state_machine.cc
450450
src/filter/sse_codec_state_machine.cc
451451
src/filter/http_sse_filter_chain_factory.cc
452+
src/filter/sse_session_registry.cc
452453
src/filter/stdio_filter_chain_factory.cc
453454
src/filter/json_rpc_protocol_filter.cc
454455
src/filter/protocol_detection_filter_chain_factory.cc
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
#pragma once
2+
3+
#include <cstdint>
4+
#include <map>
5+
#include <string>
6+
7+
#include "mcp/event/event_loop.h"
8+
#include "mcp/network/connection.h"
9+
10+
namespace mcp {
11+
namespace filter {
12+
13+
/**
14+
* SseSessionRegistry — dispatcher-owned map of SSE session IDs to the
15+
* network::Connection* streaming SSE back to each client.
16+
*
17+
* MCP SSE transport splits a request/response pair across two TCP
18+
* connections on the server:
19+
* 1. A long-lived GET /sse stream the client leaves open for
20+
* server-sent events. The server registers this connection under a
21+
* fresh session ID and announces a POST callback URL containing
22+
* that ID in the "endpoint" event.
23+
* 2. Short POST /callback/{session_id} connections — one per outbound
24+
* JSON-RPC request. The server returns 202 Accepted immediately and
25+
* routes the JSON-RPC response through the SSE connection registered
26+
* under the matching session ID.
27+
*
28+
* The registry is what lets the POST handler find the SSE connection it
29+
* must route the response through. It is owned by the HTTP+SSE filter
30+
* chain factory (one per McpServer), not a process-wide singleton:
31+
* - Independent McpServer instances in the same process do not share
32+
* session IDs or leak into each other.
33+
* - Lifetime is bounded by the factory, which is owned by McpServer —
34+
* no global state to reason about at shutdown.
35+
*
36+
* Threading model:
37+
* - The MCP server runs on a single dispatcher thread. All filter
38+
* callbacks (onHeaders, onWrite, filter destructor) fire on that
39+
* thread, so registry mutations are naturally single-threaded.
40+
* - Every public method asserts isThreadSafe() so a future move to a
41+
* worker-thread model fails loudly instead of silently corrupting
42+
* the map.
43+
*/
44+
class SseSessionRegistry {
45+
public:
46+
explicit SseSessionRegistry(event::Dispatcher& dispatcher);
47+
48+
// Record an SSE stream connection and hand back a stable session ID.
49+
// Caller must call removeSession() when the stream closes — the SSE
50+
// filter's destructor does this.
51+
std::string registerSession(network::Connection* connection);
52+
53+
// Drop a session. Safe to call with an unknown ID (no-op).
54+
void removeSession(const std::string& session_id);
55+
56+
// Write a JSON-RPC response through the SSE stream registered under
57+
// session_id. Returns true if the session existed and the write was
58+
// handed to the connection (the SSE codec filter further down the
59+
// write chain frames the bytes into a `data: ...\n\n` SSE event).
60+
// Returns false if the session has gone away (e.g. client already
61+
// disconnected); the caller should drop the response rather than
62+
// pretending it was delivered.
63+
bool sendResponse(const std::string& session_id,
64+
const std::string& json_data);
65+
66+
// Test / introspection: current session count. Asserts dispatcher
67+
// thread to match the rest of the API.
68+
size_t sessionCount() const;
69+
70+
// Test / introspection: whether a given ID is currently registered.
71+
bool hasSession(const std::string& session_id) const;
72+
73+
private:
74+
event::Dispatcher& dispatcher_;
75+
std::map<std::string, network::Connection*> sessions_;
76+
uint64_t next_id_{1};
77+
};
78+
79+
} // namespace filter
80+
} // namespace mcp

src/filter/http_sse_filter_chain_factory.cc

Lines changed: 4 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
#include "mcp/filter/json_rpc_protocol_filter.h"
6363
#include "mcp/filter/metrics_filter.h"
6464
#include "mcp/filter/sse_codec_filter.h"
65+
#include "mcp/filter/sse_session_registry.h"
6566
#include "mcp/json/json_serialization.h"
6667
#include "mcp/logging/log_macros.h"
6768
#include "mcp/mcp_connection_manager.h"
@@ -70,98 +71,9 @@
7071
namespace mcp {
7172
namespace filter {
7273

73-
// ═══════════════════════════════════════════════════════════════════════════
74-
// SseSessionRegistry — dispatcher-owned map of SSE session IDs to the
75-
// network::Connection* that is streaming SSE back to each client.
76-
//
77-
// MCP SSE transport splits the request/response pair across two TCP
78-
// connections:
79-
// 1. A long-lived GET /sse stream that the client leaves open for
80-
// server-sent events. The server registers this connection under a
81-
// fresh session ID and announces a POST callback URL containing
82-
// that ID in the "endpoint" event.
83-
// 2. Short POST /callback/{session_id} connections — one per outbound
84-
// JSON-RPC request from the client. The server returns 202 Accepted
85-
// immediately and then routes the actual JSON-RPC response through
86-
// the SSE connection registered under that session ID.
87-
//
88-
// The registry is what lets the POST handler find the SSE connection it
89-
// has to route the response through. It lives on the filter chain
90-
// factory (one per McpServer), not as a process-wide singleton, so:
91-
// - Independent McpServer instances in the same process do not share
92-
// session IDs or leak into each other.
93-
// - Registry lifetime is bounded by the factory, which is owned by
94-
// McpServer — no global state to reason about at shutdown.
95-
//
96-
// Threading model:
97-
// - The MCP server runs on a single dispatcher thread. All filter
98-
// callbacks (onHeaders, onWrite, filter destructor) fire on that
99-
// thread, so registry mutations are naturally single-threaded.
100-
// - Every public method asserts isThreadSafe() so a future move to a
101-
// worker-thread model fails loudly instead of silently corrupting
102-
// the map.
103-
// ═══════════════════════════════════════════════════════════════════════════
104-
class SseSessionRegistry {
105-
public:
106-
explicit SseSessionRegistry(event::Dispatcher& dispatcher)
107-
: dispatcher_(dispatcher) {}
108-
109-
// Record an SSE stream connection and hand back a stable session ID.
110-
// Caller is responsible for calling removeSession() when the stream
111-
// closes (filter destructor does this).
112-
std::string registerSession(network::Connection* connection) {
113-
assert(dispatcher_.isThreadSafe() &&
114-
"SseSessionRegistry::registerSession off-dispatcher-thread");
115-
std::string session_id = "client_" + std::to_string(next_id_++);
116-
sessions_[session_id] = connection;
117-
GOPHER_LOG_INFO("SSE session registered: {} (total={})", session_id,
118-
sessions_.size());
119-
return session_id;
120-
}
121-
122-
// Drop a session. Safe to call with an unknown ID (no-op). Typically
123-
// invoked from the SSE filter's destructor when the stream connection
124-
// closes.
125-
void removeSession(const std::string& session_id) {
126-
assert(dispatcher_.isThreadSafe() &&
127-
"SseSessionRegistry::removeSession off-dispatcher-thread");
128-
if (sessions_.erase(session_id) > 0) {
129-
GOPHER_LOG_INFO("SSE session removed: {} (total={})", session_id,
130-
sessions_.size());
131-
}
132-
}
133-
134-
// Write a JSON-RPC response through the SSE stream registered under
135-
// session_id. Returns true if the session existed and the write was
136-
// handed to the connection (the actual wire bytes are framed into a
137-
// `data: ...\n\n` SSE event by the SSE codec filter further down that
138-
// connection's write chain). Returns false if the session has gone
139-
// away (e.g. client already disconnected while we were handling the
140-
// POST), in which case the caller should drop the response on the
141-
// floor rather than pretending it was delivered.
142-
bool sendResponse(const std::string& session_id,
143-
const std::string& json_data) {
144-
assert(dispatcher_.isThreadSafe() &&
145-
"SseSessionRegistry::sendResponse off-dispatcher-thread");
146-
auto it = sessions_.find(session_id);
147-
if (it == sessions_.end()) {
148-
GOPHER_LOG_WARN("SSE session not found for response routing: {}",
149-
session_id);
150-
return false;
151-
}
152-
OwnedBuffer buffer;
153-
buffer.add(json_data.c_str(), json_data.length());
154-
it->second->write(buffer, /*end_stream=*/false);
155-
GOPHER_LOG_DEBUG("SSE response routed to session {} ({} bytes)", session_id,
156-
json_data.size());
157-
return true;
158-
}
159-
160-
private:
161-
event::Dispatcher& dispatcher_;
162-
std::map<std::string, network::Connection*> sessions_;
163-
uint64_t next_id_{1};
164-
};
74+
// SseSessionRegistry is defined in mcp/filter/sse_session_registry.h so
75+
// unit tests can exercise it directly without going through the full
76+
// filter chain.
16577

16678
// Forward declaration
16779
class HttpSseJsonRpcProtocolFilter;

src/filter/sse_session_registry.cc

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
#include "mcp/filter/sse_session_registry.h"
2+
3+
#include <cassert>
4+
5+
#include "mcp/buffer.h"
6+
#include "mcp/logging/log_macros.h"
7+
8+
namespace mcp {
9+
namespace filter {
10+
11+
SseSessionRegistry::SseSessionRegistry(event::Dispatcher& dispatcher)
12+
: dispatcher_(dispatcher) {}
13+
14+
std::string SseSessionRegistry::registerSession(
15+
network::Connection* connection) {
16+
assert(dispatcher_.isThreadSafe() &&
17+
"SseSessionRegistry::registerSession off-dispatcher-thread");
18+
std::string session_id = "client_" + std::to_string(next_id_++);
19+
sessions_[session_id] = connection;
20+
GOPHER_LOG_INFO("SSE session registered: {} (total={})", session_id,
21+
sessions_.size());
22+
return session_id;
23+
}
24+
25+
void SseSessionRegistry::removeSession(const std::string& session_id) {
26+
assert(dispatcher_.isThreadSafe() &&
27+
"SseSessionRegistry::removeSession off-dispatcher-thread");
28+
if (sessions_.erase(session_id) > 0) {
29+
GOPHER_LOG_INFO("SSE session removed: {} (total={})", session_id,
30+
sessions_.size());
31+
}
32+
}
33+
34+
bool SseSessionRegistry::sendResponse(const std::string& session_id,
35+
const std::string& json_data) {
36+
assert(dispatcher_.isThreadSafe() &&
37+
"SseSessionRegistry::sendResponse off-dispatcher-thread");
38+
auto it = sessions_.find(session_id);
39+
if (it == sessions_.end()) {
40+
GOPHER_LOG_WARN("SSE session not found for response routing: {}",
41+
session_id);
42+
return false;
43+
}
44+
OwnedBuffer buffer;
45+
buffer.add(json_data.c_str(), json_data.length());
46+
it->second->write(buffer, /*end_stream=*/false);
47+
GOPHER_LOG_DEBUG("SSE response routed to session {} ({} bytes)", session_id,
48+
json_data.size());
49+
return true;
50+
}
51+
52+
size_t SseSessionRegistry::sessionCount() const {
53+
assert(dispatcher_.isThreadSafe() &&
54+
"SseSessionRegistry::sessionCount off-dispatcher-thread");
55+
return sessions_.size();
56+
}
57+
58+
bool SseSessionRegistry::hasSession(const std::string& session_id) const {
59+
assert(dispatcher_.isThreadSafe() &&
60+
"SseSessionRegistry::hasSession off-dispatcher-thread");
61+
return sessions_.find(session_id) != sessions_.end();
62+
}
63+
64+
} // namespace filter
65+
} // namespace mcp

0 commit comments

Comments
 (0)