diff --git a/CMakeLists.txt b/CMakeLists.txt index 2081d5fc..c381ed55 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -157,6 +157,7 @@ endif() #Find packages find_package(Threads REQUIRED) find_package(OpenSSL REQUIRED) +find_package(CURL REQUIRED) #Find libevent find_package(PkgConfig) @@ -604,6 +605,7 @@ foreach(lib_target ${REAL_TARGETS}) nlohmann_json::nlohmann_json ${LIBEVENT_LIBRARIES} yaml-cpp::yaml-cpp + CURL::libcurl ) # Link logging library - static targets prefer static version to avoid transitive deps diff --git a/include/mcp/client/mcp_client.h b/include/mcp/client/mcp_client.h index 34b0cc12..4a23fed9 100644 --- a/include/mcp/client/mcp_client.h +++ b/include/mcp/client/mcp_client.h @@ -586,6 +586,15 @@ class McpClient : public application::ApplicationBase { std::atomic connected_{false}; std::string current_uri_; + // Dead connection managers awaiting deferred destruction. + // During reconnect, the old connection manager cannot be destroyed + // immediately because libevent callbacks referencing it may still be + // in-flight on the dispatcher's current event loop iteration. Moved here and + // cleaned up on the next dispatcher iteration via deferredDelete or explicit + // clear. + std::vector> + dead_connection_managers_; + // Connection activity tracking for detecting stale connections std::chrono::steady_clock::time_point last_activity_time_; static constexpr int kConnectionIdleTimeoutSec = diff --git a/include/mcp/filter/http_sse_filter_chain_factory.h b/include/mcp/filter/http_sse_filter_chain_factory.h index 0f67c9f1..b415a97f 100644 --- a/include/mcp/filter/http_sse_filter_chain_factory.h +++ b/include/mcp/filter/http_sse_filter_chain_factory.h @@ -66,19 +66,29 @@ class HttpSseFilterChainFactory : public network::FilterChainFactory { * @param http_host HTTP Host header value for client mode * @param use_sse True for SSE mode (GET /sse first), false for Streamable * HTTP (direct POST) + * @param sse_path Server-side SSE endpoint path (e.g., "/sse") + * @param rpc_path Server-side JSON-RPC endpoint path (e.g., "/mcp") + * @param external_url External URL for absolute SSE callback URLs (proxy + * support) */ HttpSseFilterChainFactory(event::Dispatcher& dispatcher, McpProtocolCallbacks& message_callbacks, bool is_server = true, const std::string& http_path = "/rpc", const std::string& http_host = "localhost", - bool use_sse = true) + bool use_sse = true, + const std::string& sse_path = "/sse", + const std::string& rpc_path = "/mcp", + const std::string& external_url = "") : dispatcher_(dispatcher), message_callbacks_(message_callbacks), is_server_(is_server), http_path_(http_path), http_host_(http_host), - use_sse_(use_sse) {} + use_sse_(use_sse), + sse_path_(sse_path), + rpc_path_(rpc_path), + external_url_(external_url) {} /** * Create filter chain for the connection @@ -170,6 +180,9 @@ class HttpSseFilterChainFactory : public network::FilterChainFactory { std::string http_path_; // HTTP request path for client mode std::string http_host_; // HTTP Host header for client mode bool use_sse_; // True for SSE mode, false for Streamable HTTP + std::string sse_path_; // Server-side SSE endpoint path (e.g., "/sse") + std::string rpc_path_; // Server-side JSON-RPC endpoint path (e.g., "/mcp") + std::string external_url_; // External URL for absolute SSE callback URLs mutable bool enable_metrics_ = true; // Enable metrics by default // Store filters for lifetime management diff --git a/include/mcp/mcp_connection_manager.h b/include/mcp/mcp_connection_manager.h index b0bcc8fc..64dd0259 100644 --- a/include/mcp/mcp_connection_manager.h +++ b/include/mcp/mcp_connection_manager.h @@ -199,6 +199,10 @@ class McpConnectionManager : public McpProtocolCallbacks, std::unique_ptr connection_manager_; network::ConnectionPtr active_connection_; + // Closed connections kept alive to prevent use-after-free from + // libevent callbacks that still reference the connection's fd. + std::vector closed_connections_; + // Server listener management // Must keep listener manager alive for the lifetime of the server std::unique_ptr listener_manager_; diff --git a/include/mcp/server/mcp_server.h b/include/mcp/server/mcp_server.h index f67ce5fd..84870905 100644 --- a/include/mcp/server/mcp_server.h +++ b/include/mcp/server/mcp_server.h @@ -88,8 +88,9 @@ struct McpServerConfig : public application::ApplicationBase::Config { // HTTP/SSE specific configuration std::string http_rpc_path = "/rpc"; // Path for JSON-RPC over HTTP - std::string http_sse_path = "/events"; // Path for SSE event stream + std::string http_sse_path = "/sse"; // Path for SSE event stream std::string http_health_path = "/health"; // Path for health check endpoint + std::string external_url; // External URL for SSE callbacks (proxy support) // Session management size_t max_sessions = 100; @@ -938,6 +939,12 @@ class McpServer : public application::ApplicationBase, // Background task state std::atomic background_threads_running_{false}; + // Timer handles for background tasks — must outlive the timers they create. + // Without these, createTimer returns a unique_ptr that goes out of scope + // immediately, destroying the timer before it can fire. + event::TimerPtr session_cleanup_timer_; + event::TimerPtr resource_update_timer_; + // Deferred listen address std::string listen_address_; bool need_perform_listen_ = false; diff --git a/sdk/typescript/package-lock.json b/sdk/typescript/package-lock.json index 25cdd8df..c9a8a2dd 100644 --- a/sdk/typescript/package-lock.json +++ b/sdk/typescript/package-lock.json @@ -20,7 +20,7 @@ "@typescript-eslint/parser": "^6.21.0", "eslint": "^8.0.0", "jest": "^29.0.0", - "prettier": "^3.6.2", + "prettier": "^3.8.3", "ts-jest": "^29.0.0", "tsx": "^4.20.6", "typescript": "^5.0.0" @@ -1939,6 +1939,7 @@ "resolved": "https://registry.npmjs.org/@typescript-eslint/eslint-plugin/-/eslint-plugin-6.21.0.tgz", "integrity": "sha512-oy9+hTPCUFpngkEZUSzbf9MxI65wbKFoQYsgPdILTfbUldp5ovUuphZVe4i30emU9M/kP+T64Di0mxl7dSw3MA==", "dev": true, + "license": "MIT", "dependencies": { "@eslint-community/regexpp": "^4.5.1", "@typescript-eslint/scope-manager": "6.21.0", @@ -1974,6 +1975,7 @@ "resolved": "https://registry.npmjs.org/@typescript-eslint/parser/-/parser-6.21.0.tgz", "integrity": "sha512-tbsV1jPne5CkFQCgPBcDOt30ItF7aJoZL997JSF7MhGQqOeT3svWRYxiqlfA5RUdlHN6Fi+EI9bxqbdyAUZjYQ==", "dev": true, + "license": "BSD-2-Clause", "dependencies": { "@typescript-eslint/scope-manager": "6.21.0", "@typescript-eslint/types": "6.21.0", @@ -5490,10 +5492,11 @@ } }, "node_modules/prettier": { - "version": "3.6.2", - "resolved": "https://registry.npmjs.org/prettier/-/prettier-3.6.2.tgz", - "integrity": "sha512-I7AIg5boAr5R0FFtJ6rCfD+LFsWHp81dolrFD8S79U9tb8Az2nGrJncnMSnys+bpQJfRUzqs9hnA81OAA3hCuQ==", + "version": "3.8.3", + "resolved": "https://registry.npmjs.org/prettier/-/prettier-3.8.3.tgz", + "integrity": "sha512-7igPTM53cGHMW8xWuVTydi2KO233VFiTNyF5hLJqpilHfmn8C8gPf+PS7dUT64YcXFbiMGZxS9pCSxL/Dxm/Jw==", "dev": true, + "license": "MIT", "bin": { "prettier": "bin/prettier.cjs" }, diff --git a/sdk/typescript/package.json b/sdk/typescript/package.json index 8fff2370..9742b723 100644 --- a/sdk/typescript/package.json +++ b/sdk/typescript/package.json @@ -49,7 +49,7 @@ "@typescript-eslint/parser": "^6.21.0", "eslint": "^8.0.0", "jest": "^29.0.0", - "prettier": "^3.6.2", + "prettier": "^3.8.3", "ts-jest": "^29.0.0", "tsx": "^4.20.6", "typescript": "^5.0.0" diff --git a/src/client/mcp_client.cc b/src/client/mcp_client.cc index ebd340a0..dc5cbdd9 100644 --- a/src/client/mcp_client.cc +++ b/src/client/mcp_client.cc @@ -332,12 +332,21 @@ VoidResult McpClient::reconnect() { // Internal reconnection logic (must be called on dispatcher thread) VoidResult McpClient::reconnectInternal() { + // Clean up any previously deferred dead connection managers. + // Safe now — we are on a new dispatcher iteration so all in-flight + // callbacks from the previous iteration have completed. + dead_connection_managers_.clear(); + // Disconnect first if we think we're connected if (connected_ || connection_manager_) { - // Close the old connection + // Move the old connection manager to the dead list instead of + // destroying it immediately. Libevent callbacks referencing the + // old connection may still be queued in the current event-loop + // iteration. Destroying it now causes use-after-free / SIGSEGV. + // The dead list is cleared on the next reconnect or shutdown. if (connection_manager_) { connection_manager_->close(); - connection_manager_.reset(); + dead_connection_managers_.push_back(std::move(connection_manager_)); } connected_ = false; initialized_ = false; @@ -425,6 +434,7 @@ void McpClient::shutdown() { // Clean up resources protocol_state_machine_.reset(); connection_manager_.reset(); + dead_connection_managers_.clear(); request_tracker_.reset(); circuit_breaker_.reset(); @@ -571,12 +581,25 @@ std::future McpClient::initializeProtocol() { server_capabilities_ = init_result.capabilities; initialized_ = true; - // Notify protocol state machine that initialization is complete - if (protocol_state_machine_) { - protocol_state_machine_->handleEvent( - protocol::McpProtocolEvent::INITIALIZED); + // Notify protocol state machine that initialization is complete. + // CRITICAL: Must post to dispatcher thread because the state machine + // timer was created on the dispatcher's event_base. Calling + // cancelStateTimer() → event_del() from this detached thread corrupts + // libevent's internal data structures, causing SIGSEGV ~30 s later + // when the (uncanceled) initialization timeout timer fires. + if (protocol_state_machine_ && main_dispatcher_) { + main_dispatcher_->post([this]() { + if (protocol_state_machine_) { + protocol_state_machine_->handleEvent( + protocol::McpProtocolEvent::INITIALIZED); + } + }); } + // Send notifications/initialized as required by MCP spec + // Must happen BEFORE any other requests (e.g., tools/list) + sendNotification("notifications/initialized", nullopt); + result_promise->set_value(init_result); } } catch (...) { diff --git a/src/filter/http_codec_filter.cc b/src/filter/http_codec_filter.cc index 4b5787fe..811a7850 100644 --- a/src/filter/http_codec_filter.cc +++ b/src/filter/http_codec_filter.cc @@ -109,7 +109,13 @@ HttpCodecFilter::HttpCodecFilter(MessageCallbacks& callbacks, HttpCodecStateMachineConfig config; config.is_server = is_server_; // Set mode config.header_timeout = std::chrono::milliseconds(30000); - config.body_timeout = std::chrono::milliseconds(60000); + // Disable body timeout for client mode. SSE connections keep the + // response body open indefinitely (infinite stream of events). + // A 60 s body timeout would fire and crash in the state machine's + // executeTransition() because the SSE connection is still alive. + config.body_timeout = is_server_ + ? std::chrono::milliseconds(60000) + : std::chrono::milliseconds(0); config.idle_timeout = std::chrono::milliseconds(120000); config.enable_keep_alive = true; config.state_change_callback = @@ -161,7 +167,10 @@ HttpCodecFilter::HttpCodecFilter(const filter::FilterCreationContext& context, HttpCodecStateMachineConfig sm_config; sm_config.is_server = is_server_; sm_config.header_timeout = std::chrono::milliseconds(30000); - sm_config.body_timeout = std::chrono::milliseconds(60000); + // Disable body timeout for client mode — SSE streams never end. + sm_config.body_timeout = is_server_ + ? std::chrono::milliseconds(60000) + : std::chrono::milliseconds(0); sm_config.idle_timeout = std::chrono::milliseconds(120000); sm_config.enable_keep_alive = true; sm_config.state_change_callback = @@ -305,19 +314,17 @@ network::FilterStatus HttpCodecFilter::onWrite(Buffer& data, bool end_stream) { GOPHER_LOG_TRACE("onWrite: Content-Length={} body_preview={}...", body_length, body_data.substr(0, 50)); response << "Cache-Control: no-cache\r\n"; + // Close connection after response. Streamable HTTP JSON-RPC is + // one request → one response. Using Connection: close ensures + // the TCP socket is fully flushed and closed, which prevents + // responses from being stuck in proxy buffers (Traefik, nginx). + response << "Connection: close\r\n"; // CORS headers for browser-based clients (e.g., MCP Inspector) response << "Access-Control-Allow-Origin: *\r\n"; response << "Access-Control-Allow-Methods: GET, POST, OPTIONS\r\n"; response << "Access-Control-Allow-Headers: Content-Type, Authorization, " "Accept, Mcp-Session-Id, Mcp-Protocol-Version\r\n"; - if (current_stream_) { - response << "Connection: " - << (current_stream_->keep_alive ? "keep-alive" : "close") - << "\r\n"; - } else { - response << "Connection: keep-alive\r\n"; - } response << "\r\n"; response << body_data; } diff --git a/src/filter/http_sse_filter_chain_factory.cc b/src/filter/http_sse_filter_chain_factory.cc index 8b205218..acab3acc 100644 --- a/src/filter/http_sse_filter_chain_factory.cc +++ b/src/filter/http_sse_filter_chain_factory.cc @@ -50,7 +50,10 @@ #include "mcp/filter/http_sse_filter_chain_factory.h" +#include #include +#include +#include #include #include "mcp/filter/http_codec_filter.h" @@ -66,6 +69,85 @@ namespace mcp { namespace filter { +// ═══════════════════════════════════════════════════════════════════════════ +// SSE Session Registry — routes JSON-RPC responses through SSE streams +// +// When an MCP client connects via SSE transport: +// 1. GET /sse → server assigns session, sends endpoint: callback/{id} +// 2. POST /callback/{id} → server sends 202, routes response via SSE +// +// This registry maps session IDs to their SSE connections so POST handlers +// can route responses through the correct SSE stream. +// ═══════════════════════════════════════════════════════════════════════════ +class SseSessionRegistry { + public: + static SseSessionRegistry& instance() { + static SseSessionRegistry registry; + return registry; + } + + // Register an SSE connection and return its unique session ID + std::string registerSession(network::Connection* connection) { + std::lock_guard lock(mutex_); + std::string session_id = "client_" + std::to_string(counter_++); + sessions_[session_id] = connection; + GOPHER_LOG_INFO("SSE session registered: {} (total={})", + session_id, sessions_.size()); + return session_id; + } + + // Remove an SSE session (called on connection close) + void removeSession(const std::string& session_id) { + std::lock_guard lock(mutex_); + sessions_.erase(session_id); + GOPHER_LOG_INFO("SSE session removed: {} (total={})", + session_id, sessions_.size()); + } + + // Send a JSON-RPC response through an SSE session's stream + // Returns true if the session was found and data was written + bool sendResponse(const std::string& session_id, + const std::string& json_data) { + network::Connection* conn = nullptr; + { + std::lock_guard lock(mutex_); + auto it = sessions_.find(session_id); + if (it == sessions_.end()) { + GOPHER_LOG_ERROR("SSE session not found for response: {}", session_id); + return false; + } + conn = it->second; + } + + // Write raw JSON to the SSE connection — the connection's onWrite filter + // will add SSE framing (data: {json}\n\n) automatically since + // is_sse_mode_ is true on that connection's filter instance. + try { + OwnedBuffer buffer; + buffer.add(json_data.c_str(), json_data.length()); + conn->write(buffer, false); + } catch (...) { + GOPHER_LOG_ERROR("SSE write failed for session {}", session_id); + return false; + } + GOPHER_LOG_INFO("SSE response sent via session {} ({} bytes)", + session_id, json_data.size()); + return true; + } + + // Check if a session exists + bool hasSession(const std::string& session_id) { + std::lock_guard lock(mutex_); + return sessions_.find(session_id) != sessions_.end(); + } + + private: + SseSessionRegistry() = default; + std::mutex mutex_; + std::map sessions_; + std::atomic counter_{1}; +}; + // Forward declaration class HttpSseJsonRpcProtocolFilter; @@ -140,13 +222,19 @@ class HttpSseJsonRpcProtocolFilter const std::string& http_path = "/rpc", const std::string& http_host = "localhost", bool use_sse = true, - const HttpRouteRegistrationCallback& route_callback = nullptr) + const HttpRouteRegistrationCallback& route_callback = nullptr, + const std::string& sse_path = "/sse", + const std::string& rpc_path = "/mcp", + const std::string& external_url = "") : dispatcher_(dispatcher), mcp_callbacks_(mcp_callbacks), is_server_(is_server), http_path_(http_path), http_host_(http_host), use_sse_(use_sse), + configured_sse_path_(sse_path), + configured_rpc_path_(rpc_path), + configured_external_url_(external_url), route_registration_callback_(route_callback) { // Following production pattern: all operations for this filter // happen in the single dispatcher thread @@ -188,7 +276,15 @@ class HttpSseJsonRpcProtocolFilter std::make_shared(*this, dispatcher_, is_server_); } - ~HttpSseJsonRpcProtocolFilter() = default; + ~HttpSseJsonRpcProtocolFilter() { + // Clean up SSE session on connection close + // IMPORTANT: Remove from registry BEFORE connection is destroyed + // to prevent other threads from writing to a dead connection + if (!sse_session_id_.empty()) { + SseSessionRegistry::instance().removeSession(sse_session_id_); + sse_session_id_.clear(); + } + } // ===== Network Filter Interface ===== @@ -295,6 +391,30 @@ class HttpSseJsonRpcProtocolFilter * recursion! */ network::FilterStatus onWrite(Buffer& data, bool end_stream) override { + // SSE handshake bypass: pass raw HTTP+SSE response without any framing + if (sse_writing_handshake_) { + return network::FilterStatus::Continue; + } + + // SSE transport: route response through SSE stream instead of POST connection + // When a POST came in on /callback/{session_id}, we already sent 202. + // Now intercept the JSON-RPC response and send it via the SSE stream. + if (is_server_ && !sse_callback_session_id_.empty() && data.length() > 0) { + size_t len = data.length(); + std::string json_data( + static_cast(data.linearize(len)), len); + data.drain(len); + + GOPHER_LOG_INFO("Routing response via SSE session={} ({} bytes)", + sse_callback_session_id_, json_data.size()); + + SseSessionRegistry::instance().sendResponse( + sse_callback_session_id_, json_data); + + // Don't write anything to the POST connection (202 already sent) + return network::FilterStatus::StopIteration; + } + GOPHER_LOG_DEBUG( "HttpSseJsonRpcProtocolFilter: onWrite called, data_len={}, " "is_server={}, is_sse_mode={}, waiting_for_endpoint={}, " @@ -507,21 +627,136 @@ class HttpSseJsonRpcProtocolFilter // Determine transport mode based on headers if (is_server_) { - // Server: check Accept header for SSE - but DON'T enable SSE mode yet! - // The Accept header just indicates the client SUPPORTS SSE, not that we - // should use it. For JSON-RPC request/response (like initialize), we - // should use normal HTTP with Content-Length. - // SSE mode should only be enabled explicitly when the server wants to - // stream notifications. For now, always use normal HTTP for responses. - auto accept = headers.find("accept"); - if (accept != headers.end() && - accept->second.find("text/event-stream") != std::string::npos) { - client_accepts_sse_ = true; // Track that client supports SSE - // But don't set is_sse_mode_ = true here - that would break JSON-RPC - GOPHER_LOG_DEBUG("HttpSseJsonRpcProtocolFilter: client accepts SSE"); + // Extract method and path from headers + std::string method = "GET"; + auto method_it = headers.find(":method"); + if (method_it != headers.end()) { + method = method_it->second; + } + + std::string path = "/"; + auto path_it = headers.find(":path"); + if (path_it != headers.end()) { + path = path_it->second; + } else { + auto url_it = headers.find("url"); + if (url_it != headers.end()) { + path = url_it->second; + } + } + // Strip query string for comparison + size_t qpos = path.find('?'); + if (qpos != std::string::npos) { + path = path.substr(0, qpos); + } + + // Log all incoming requests for debugging routing issues + GOPHER_LOG_INFO("HTTP request: {} {} (is_server={})", method, path, is_server_); + + // Check if this is a GET request for the SSE endpoint + if (method == "GET" && path == configured_sse_path_) { + // SSE server mode: open a long-lived SSE stream + GOPHER_LOG_INFO( + "SSE client connected: GET {} (endpoint_event -> {})", + configured_sse_path_, configured_rpc_path_); + + client_accepts_sse_ = true; + + // Register this SSE connection in the session registry + if (write_callbacks_) { + sse_session_id_ = SseSessionRegistry::instance().registerSession( + &write_callbacks_->connection()); + + // Build callback URL for the endpoint event + // If external_url is set (behind proxy), use absolute URL + // Otherwise use relative path for direct connections + std::string callback_path; + if (!configured_external_url_.empty()) { + // Absolute URL: https://domain/v1/mcp/gateways/{id}/callback/client_X + std::string base = configured_external_url_; + // Remove trailing slash if present + if (!base.empty() && base.back() == '/') base.pop_back(); + callback_path = base + "/callback/" + sse_session_id_; + } else { + // Relative path: callback/client_X (resolved relative to SSE URL) + callback_path = "callback/" + sse_session_id_; + } + + // Send SSE response headers + endpoint event + // IMPORTANT: Write raw bytes BEFORE setting is_sse_mode_ + std::ostringstream sse_response; + sse_response << "HTTP/1.1 200 OK\r\n"; + sse_response << "Content-Type: text/event-stream\r\n"; + sse_response << "Cache-Control: no-cache\r\n"; + sse_response << "Access-Control-Allow-Origin: *\r\n"; + sse_response << "\r\n"; + sse_response << "event: endpoint\n"; + sse_response << "data: " << callback_path << "\n\n"; + + std::string response_str = sse_response.str(); + OwnedBuffer response_buffer; + response_buffer.add(response_str.c_str(), response_str.length()); + + sse_writing_handshake_ = true; + write_callbacks_->connection().write(response_buffer, false); + sse_writing_handshake_ = false; + + // Enable SSE mode for future writes on this connection + sse_server_mode_ = true; + is_sse_mode_ = true; + sse_headers_written_ = true; + + GOPHER_LOG_INFO( + "SSE stream opened: session={}, callback={}", + sse_session_id_, callback_path); + } else { + GOPHER_LOG_ERROR( + "SSE stream failed: write_callbacks_ is null for GET {}", + configured_sse_path_); + } + return; + } + + // Check if this is a POST to a callback URL (SSE transport) + // Pattern: POST /callback/{session_id} or POST .../callback/{session_id} + // When behind a reverse proxy (Traefik), the path may include a prefix + // from the external URL (e.g., /v1/mcp/gateways/{id}/callback/client_1). + // Use rfind to match "/callback/" anywhere in the path. + std::string callback_prefix = "/callback/"; + auto cb_pos = path.rfind(callback_prefix); + if (method == "POST" && cb_pos != std::string::npos) { + sse_callback_session_id_ = path.substr(cb_pos + callback_prefix.length()); + GOPHER_LOG_INFO("SSE callback POST received: session={}", + sse_callback_session_id_); + + // Send 202 Accepted immediately on the POST connection + // The actual response will be routed through the SSE stream + if (write_callbacks_) { + std::string http_202 = + "HTTP/1.1 202 Accepted\r\n" + "Content-Length: 0\r\n" + "Access-Control-Allow-Origin: *\r\n" + "\r\n"; + OwnedBuffer resp_buf; + resp_buf.add(http_202.c_str(), http_202.length()); + sse_writing_handshake_ = true; + write_callbacks_->connection().write(resp_buf, false); + sse_writing_handshake_ = false; + } + // Don't return — let the JSON-RPC body be processed normally. + // The response will be intercepted in onWrite and routed to SSE. + is_sse_mode_ = false; + } + // Check if this is a POST to the configured RPC path (Streamable HTTP) + // or any other non-SSE request + else { + auto accept = headers.find("accept"); + if (accept != headers.end() && + accept->second.find("text/event-stream") != std::string::npos) { + client_accepts_sse_ = true; + } + is_sse_mode_ = false; } - // Always use normal HTTP mode for request/response pattern - is_sse_mode_ = false; } else { // Client: check Content-Type for SSE auto content_type = headers.find("content-type"); @@ -537,6 +772,10 @@ class HttpSseJsonRpcProtocolFilter } void onBody(const std::string& data, bool end_stream) override { + // SSE server mode: no body processing needed (GET /sse has no body) + if (is_server_ && sse_server_mode_) { + return; + } // Server receives JSON-RPC in request body regardless of SSE mode // SSE mode only affects the response format if (is_server_) { @@ -620,7 +859,7 @@ class HttpSseJsonRpcProtocolFilter return; } - if (event == "message") { + if (event == "message" || event.empty()) { // SSE message event contains JSON-RPC message // Forward to JSON-RPC filter auto buffer = std::make_unique(); @@ -820,10 +1059,14 @@ class HttpSseJsonRpcProtocolFilter // Register OPTIONS for common MCP paths routing_filter_->registerHandler("OPTIONS", "/mcp", corsHandler); - routing_filter_->registerHandler("OPTIONS", "/mcp/events", corsHandler); routing_filter_->registerHandler("OPTIONS", "/rpc", corsHandler); routing_filter_->registerHandler("OPTIONS", "/health", corsHandler); routing_filter_->registerHandler("OPTIONS", "/info", corsHandler); + // Register OPTIONS for configured SSE and RPC paths + routing_filter_->registerHandler("OPTIONS", configured_sse_path_, corsHandler); + if (configured_rpc_path_ != "/mcp" && configured_rpc_path_ != "/rpc") { + routing_filter_->registerHandler("OPTIONS", configured_rpc_path_, corsHandler); + } // Register health endpoint routing_filter_->registerHandler( @@ -841,28 +1084,33 @@ class HttpSseJsonRpcProtocolFilter return resp; }); - // Don't register /rpc endpoint - it will pass through to this filter - // Only register endpoints that should be handled by routing filter + // Register ready endpoint (used by K8s readiness probe) + routing_filter_->registerHandler( + "GET", "/ready", [](const HttpRoutingFilter::RequestContext& req) { + HttpRoutingFilter::Response resp; + resp.status_code = 200; + resp.headers["content-type"] = "application/json"; + resp.headers["cache-control"] = "no-cache"; - // Register info endpoint + resp.body = R"({"status":"ready","timestamp":)" + + std::to_string(std::time(nullptr)) + "}"; + + resp.headers["content-length"] = std::to_string(resp.body.length()); + return resp; + }); + + // Register info endpoint - capture configured paths + std::string info_sse_path = configured_sse_path_; + std::string info_rpc_path = configured_rpc_path_; routing_filter_->registerHandler( - "GET", "/info", [](const HttpRoutingFilter::RequestContext& req) { + "GET", "/info", [info_sse_path, info_rpc_path](const HttpRoutingFilter::RequestContext& req) { HttpRoutingFilter::Response resp; resp.status_code = 200; resp.headers["content-type"] = "application/json"; resp.headers["Access-Control-Allow-Origin"] = "*"; - resp.body = R"({ - "server": "MCP Server", - "protocols": ["http", "sse", "json-rpc"], - "endpoints": { - "health": "/health", - "info": "/info", - "json_rpc": "/rpc", - "sse_events": "/events" - }, - "version": "1.0.0" - })"; + resp.body = R"({"server":"MCP Server","protocols":["http","sse","json-rpc"],"endpoints":{"health":"/health","info":"/info","json_rpc":")" + + info_rpc_path + R"(","sse":")" + info_sse_path + R"("},"version":"1.0.0"})"; resp.headers["content-length"] = std::to_string(resp.body.length()); return resp; @@ -912,6 +1160,15 @@ class HttpSseJsonRpcProtocolFilter std::string http_host_{"localhost"}; // Default HTTP host for requests bool use_sse_{true}; // True for SSE mode, false for Streamable HTTP + // SSE server transport configuration + std::string configured_sse_path_{"/sse"}; // Server-side SSE endpoint path + std::string configured_rpc_path_{"/mcp"}; // Server-side JSON-RPC endpoint path + std::string configured_external_url_; // External URL for absolute SSE callbacks + bool sse_server_mode_{false}; // True when serving SSE stream to a client + bool sse_writing_handshake_{false}; // True during SSE handshake write (bypass onWrite) + std::string sse_session_id_; // Session ID for this SSE connection (set on GET /sse) + std::string sse_callback_session_id_; // Session ID from POST /callback/{id} (for response routing) + // SSE endpoint negotiation (client mode only) bool waiting_for_sse_endpoint_{false}; // Waiting for "endpoint" SSE event std::vector @@ -1030,7 +1287,8 @@ bool HttpSseFilterChainFactory::createFilterChain( // registered auto combined_filter = std::make_shared( dispatcher_, message_callbacks_, is_server_, http_path_, http_host_, - use_sse_, route_registration_callback_); + use_sse_, route_registration_callback_, sse_path_, rpc_path_, + external_url_); // Add as both read and write filter filter_manager.addReadFilter(combined_filter); diff --git a/src/mcp_connection_manager.cc b/src/mcp_connection_manager.cc index 575b63ba..3fa97bc9 100644 --- a/src/mcp_connection_manager.cc +++ b/src/mcp_connection_manager.cc @@ -3,6 +3,9 @@ #include #include #include +#include + +#include #include "mcp/logging/log_macros.h" @@ -861,18 +864,15 @@ void McpConnectionManager::onConnectionEvent(network::ConnectionEvent event) { // Connection closed - clean up state connected_ = false; - // CRITICAL FIX: Defer connection destruction - // We are being called from within the connection's callback loop - // (raiseConnectionEvent). Destroying the connection here would cause - // use-after-free when the callback loop continues to iterate. Use post() to - // defer destruction until after current callback. if (active_connection_) { - auto conn_to_delete = std::make_shared( - std::move(active_connection_)); - dispatcher_.post([conn_to_delete]() { - // Connection is destroyed when lambda and shared_ptr go out of scope - conn_to_delete->reset(); - }); + // Disable read/write on the connection to unregister libevent + // events for this fd. Without this, libevent may fire stale + // callbacks for the fd after we've moved the connection to the + // dead list, causing use-after-free. + active_connection_->readDisable(true); + // Move to dead list instead of destroying — keeps the object + // alive so any remaining in-flight callbacks don't crash. + closed_connections_.push_back(std::move(active_connection_)); } } @@ -915,7 +915,7 @@ void McpConnectionManager::onMessageEndpoint(const std::string& endpoint) { } bool McpConnectionManager::sendHttpPost(const std::string& json_body) { - GOPHER_LOG_DEBUG( + GOPHER_LOG_INFO( "McpConnectionManager::sendHttpPost endpoint={}, body_len={}", message_endpoint_, json_body.length()); @@ -924,188 +924,49 @@ bool McpConnectionManager::sendHttpPost(const std::string& json_body) { return false; } - // Parse endpoint URL to get host, port, path - // Format: https://host:port/path or http://host:port/path - std::string host; - uint16_t port = 443; - std::string path; - bool use_ssl = true; - - size_t proto_end = message_endpoint_.find("://"); - if (proto_end == std::string::npos) { - GOPHER_LOG_ERROR("McpConnectionManager: Invalid endpoint URL"); - return false; - } - - std::string proto = message_endpoint_.substr(0, proto_end); - if (proto == "http") { - use_ssl = false; - port = 80; - } - - size_t host_start = proto_end + 3; - size_t path_start = message_endpoint_.find('/', host_start); - if (path_start == std::string::npos) { - path = "/"; - host = message_endpoint_.substr(host_start); - } else { - path = message_endpoint_.substr(path_start); - host = message_endpoint_.substr(host_start, path_start - host_start); - } - - // Check for port in host - size_t port_pos = host.find(':'); - if (port_pos != std::string::npos) { - port = static_cast(std::stoi(host.substr(port_pos + 1))); - host = host.substr(0, port_pos); - } - - GOPHER_LOG_DEBUG( - "McpConnectionManager: POST to host={}, port={}, path={}, ssl={}", host, - port, path, use_ssl); - - // Resolve hostname - std::string ip_address = resolveHostname(host); - if (ip_address.empty()) { - GOPHER_LOG_ERROR("McpConnectionManager: Failed to resolve hostname: {}", - host); - return false; - } - - // Create HTTP POST request manually - std::ostringstream request; - request << "POST " << path << " HTTP/1.1\r\n"; - request << "Host: " << host << "\r\n"; - request << "Content-Type: application/json\r\n"; - request << "Content-Length: " << json_body.length() << "\r\n"; - request << "Connection: close\r\n"; // One-shot connection - request << "\r\n"; - request << json_body; - - std::string request_str = request.str(); - GOPHER_LOG_TRACE( - "McpConnectionManager: HTTP POST request (first 300 chars): {}", - request_str.substr(0, 300)); - - // Create address - auto address = - std::make_shared(ip_address, port); - - // Create stream info - auto stream_info = stream_info::StreamInfoImpl::create(); - - // Create transport socket using the same factory as the main connection - // This ensures proper TCP+SSL handling - auto transport_factory = createTransportSocketFactory(); - if (!transport_factory) { - GOPHER_LOG_ERROR( - "McpConnectionManager: Failed to create transport factory"); - return false; - } - - auto* client_factory = dynamic_cast( - transport_factory.get()); - if (!client_factory) { - GOPHER_LOG_ERROR( - "McpConnectionManager: Transport factory doesn't support client " - "connections"); - return false; - } - auto transport_socket = client_factory->createTransportSocket(nullptr); - - // Create TCP socket using MCP socket interface (same pattern as connect()) - auto local_address = - network::Address::anyAddress(network::Address::IpVersion::v4, 0); - - auto socket_result = socket_interface_.socket( - network::SocketType::Stream, network::Address::Type::Ip, - network::Address::IpVersion::v4, false); - - if (!socket_result.ok()) { - GOPHER_LOG_ERROR("McpConnectionManager: Failed to create socket"); - return false; - } - - // Create IO handle wrapper for the socket - auto io_handle = socket_interface_.ioHandleForFd(*socket_result.value, false); - if (!io_handle) { - socket_interface_.close(*socket_result.value); - GOPHER_LOG_ERROR("McpConnectionManager: Failed to create IO handle"); - return false; - } - - // Create ConnectionSocket wrapper - auto socket_wrapper = std::make_unique( - std::move(io_handle), local_address, address); - - // Set socket to non-blocking mode - socket_wrapper->ioHandle().setBlocking(false); - - // Create the connection (same pattern as connect()) - auto post_connection = std::make_unique( - dispatcher_, std::move(socket_wrapper), std::move(transport_socket), - false); // Not yet connected - - auto* post_conn_ptr = post_connection.get(); - - // Simple connection callback that writes the request after connect - class PostConnectionCallbacks : public network::ConnectionCallbacks { - public: - PostConnectionCallbacks(const std::string& request, - network::Connection* conn) - : request_(request), connection_(conn) {} - - void onEvent(network::ConnectionEvent event) override { - GOPHER_LOG_DEBUG("PostConnection onEvent: {}", static_cast(event)); - if (event == network::ConnectionEvent::Connected) { - GOPHER_LOG_DEBUG("PostConnection connected, sending POST request"); - OwnedBuffer buffer; - buffer.add(request_); - connection_->write(buffer, false); - } else if (event == network::ConnectionEvent::RemoteClose || - event == network::ConnectionEvent::LocalClose) { - GOPHER_LOG_DEBUG("PostConnection connection closed"); - // Connection closed - this is expected after we get the response - } + // Use libcurl for the POST request. The raw socket approach fails because: + // 1. EAGAIN on the POST connection's SSL handshake drops the Client Hello + // 2. HTTP/1.1 vs HTTP/2 mismatch with servers that prefer h2 + // curl handles all of this correctly. + std::string endpoint = message_endpoint_; + std::string body = json_body; + + std::thread([endpoint, body]() { + CURL* curl = curl_easy_init(); + if (!curl) { + GOPHER_LOG_ERROR("sendHttpPost: Failed to init curl"); + return; } - void onAboveWriteBufferHighWatermark() override {} - void onBelowWriteBufferLowWatermark() override {} - - private: - std::string request_; - network::Connection* connection_; - }; - - // Clean up any previous POST connection - post_connection_.reset(); - post_callbacks_.reset(); - - // Store callbacks as member to keep alive - post_callbacks_ = - std::make_unique(request_str, post_conn_ptr); - post_connection->addConnectionCallbacks(*post_callbacks_); - - // CRITICAL FIX: Initialize the filter manager for the POST connection. - // Without this, the filter manager is in an uninitialized state and - // onRead() returns early without processing, but subsequent code paths - // may still access connection state that hasn't been properly set up, - // leading to crashes. Even though we don't need to parse the HTTP response - // (it's just a 200 OK acknowledgment), we need the filter manager initialized - // for the connection to function correctly. - auto* conn_base = - dynamic_cast(post_connection.get()); - if (conn_base) { - conn_base->filterManager().initializeReadFilters(); - } - - // Store the connection as member (keeps it alive) - post_connection_ = std::unique_ptr( - static_cast(post_connection.release())); + struct curl_slist* headers = nullptr; + headers = curl_slist_append(headers, "Content-Type: application/json"); + headers = curl_slist_append(headers, "Accept: application/json"); + + curl_easy_setopt(curl, CURLOPT_URL, endpoint.c_str()); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, body.c_str()); + curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, (long)body.length()); + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); + curl_easy_setopt(curl, CURLOPT_TIMEOUT, 10L); + curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, 5L); + curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, 0L); + // Suppress response body output + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, + +[](void*, size_t size, size_t nmemb, void*) -> size_t { + return size * nmemb; + }); + + CURLcode res = curl_easy_perform(curl); + if (res != CURLE_OK) { + GOPHER_LOG_ERROR("sendHttpPost curl error: {}", curl_easy_strerror(res)); + } else { + long http_code = 0; + curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &http_code); + GOPHER_LOG_INFO("sendHttpPost: HTTP {}", http_code); + } - // Initiate connection - GOPHER_LOG_DEBUG("McpConnectionManager: Initiating POST connection"); - post_connection_->connect(); + curl_slist_free_all(headers); + curl_easy_cleanup(curl); + }).detach(); return true; } diff --git a/src/server/mcp_server.cc b/src/server/mcp_server.cc index e1060ce3..1e38b5bf 100644 --- a/src/server/mcp_server.cc +++ b/src/server/mcp_server.cc @@ -218,7 +218,14 @@ void McpServer::performListen() { auto http_sse_factory = std::make_shared( - *main_dispatcher_, *protocol_callbacks_); + *main_dispatcher_, *protocol_callbacks_, + true, // is_server + config_.http_rpc_path, // http_path + "localhost", // http_host + true, // use_sse + config_.http_sse_path, // sse_path (server-side SSE endpoint) + config_.http_rpc_path, // rpc_path (server-side JSON-RPC endpoint) + config_.external_url); // external_url (for SSE callback URLs) // Add filter factories from config (e.g., auth filters) // This follows the existing FilterFactoryCb pattern @@ -656,16 +663,22 @@ void McpServer::onRequest(const jsonrpc::Request& request) { // Send response through the current connection (for TCP/HTTP connections) // Following production pattern: server sends JSON-RPC, filter handles HTTP if (current_connection_) { - // Convert response to JSON and send through connection - // The filter chain will handle HTTP protocol wrapping auto json_val = json::to_json(response); std::string json_str = json_val.toString(); + GOPHER_LOG_INFO("Writing response ({} bytes) to connection for method={}", + json_str.size(), request.method); + OwnedBuffer response_buffer; response_buffer.add(json_str); - - // Write JSON-RPC response - HTTP filter will wrap it - current_connection_->write(response_buffer, false); + // end_stream=true signals the HTTP codec that this response is complete. + // Without this, keep-alive connections may hold the response in the + // transport layer waiting for more data, causing clients (like Claude) + // to never receive the response even though it was written. + current_connection_->write(response_buffer, true); + } else { + GOPHER_LOG_ERROR("No current_connection_ for response to method={}", + request.method); } // Also try connection managers (for stdio transport) @@ -776,30 +789,40 @@ void McpServer::onConnectionEvent(network::ConnectionEvent event) { // Clean up session for this connection if (session_manager_ && current_connection_) { - // Remove session associated with the closed connection - session_manager_->removeSessionByConnection(current_connection_); - - // Remove from connection-session mapping - // Following production pattern: use lock since this map may be accessed - // from multiple dispatcher threads + auto* closing_connection = current_connection_; + + // Remove session associated with the closed connection. + // SessionManager::removeSessionByConnection handles both the + // session map and its internal connection_sessions_ map atomically + // under a single mutex. No need to also erase from McpServer's + // own connection_sessions_ — that map is kept in sync at insert + // time but the authoritative state is in SessionManager. + session_manager_->removeSessionByConnection(closing_connection); { std::lock_guard lock(connection_sessions_mutex_); - connection_sessions_.erase(current_connection_); + connection_sessions_.erase(closing_connection); } - // Remove connection from active list - // Following production pattern: all in dispatcher thread, no mutex - // needed - active_connections_.remove_if( - [this](const network::ConnectionPtr& conn) { - return conn.get() == current_connection_; - }); - - // Clear the current connection + // Clear the current connection BEFORE removing from active list current_connection_ = nullptr; // Decrement connection count num_connections_--; + + // CRITICAL: Defer connection removal from active_connections_ list. + // We are inside a connection callback right now — destroying the + // ConnectionPtr here would free the connection object while its + // callback is still on the stack, causing use-after-free / segfault. + // Post the cleanup to the dispatcher so it happens after the callback + // returns. + if (main_dispatcher_) { + main_dispatcher_->post([this, closing_connection]() { + active_connections_.remove_if( + [closing_connection](const network::ConnectionPtr& conn) { + return conn.get() == closing_connection; + }); + }); + } } break; } @@ -1141,44 +1164,57 @@ jsonrpc::Response McpServer::handleGetPrompt(const jsonrpc::Request& request, void McpServer::startBackgroundTasks() { background_threads_running_ = true; - // Schedule periodic session cleanup using dispatcher timer - auto cleanup_timer = main_dispatcher_->createTimer([this]() { - if (background_threads_running_) { - // Clean up expired sessions - session_manager_->cleanupExpiredSessions(); - - // Reschedule for next cleanup - auto next_timer = main_dispatcher_->createTimer([this]() { - if (background_threads_running_) { - startBackgroundTasks(); - } - }); - next_timer->enableTimer(std::chrono::seconds(30)); - } + // Schedule periodic session cleanup. + // The timer handle is stored in session_cleanup_timer_ so it survives + // until the next reschedule or server shutdown. + session_cleanup_timer_ = main_dispatcher_->createTimer([this]() { + if (!background_threads_running_) return; + + // Clean up expired sessions + session_manager_->cleanupExpiredSessions(); + + // Reschedule by creating a new timer stored in the same member. + // The old timer (this one) has already fired and is consumed. + session_cleanup_timer_ = main_dispatcher_->createTimer([this]() { + if (background_threads_running_) { + session_manager_->cleanupExpiredSessions(); + // Re-arm for continuous cleanup + startBackgroundTasks(); + } + }); + session_cleanup_timer_->enableTimer(std::chrono::seconds(30)); }); - cleanup_timer->enableTimer(std::chrono::seconds(30)); + session_cleanup_timer_->enableTimer(std::chrono::seconds(30)); // Schedule periodic resource update notifications - auto update_timer = main_dispatcher_->createTimer([this]() { - if (background_threads_running_) { - // Process pending resource updates for each session - // Get all sessions and send pending updates - - // Reschedule - auto next_timer = main_dispatcher_->createTimer([this]() { - if (background_threads_running_) { - // Continue resource update processing - } - }); - next_timer->enableTimer(config_.resource_update_debounce); - } + resource_update_timer_ = main_dispatcher_->createTimer([this]() { + if (!background_threads_running_) return; + + // Process pending resource updates for each session + // (placeholder — actual update logic can be added here) + + // Reschedule + resource_update_timer_ = main_dispatcher_->createTimer([this]() { + if (background_threads_running_) { + // Continue resource update processing + } + }); + resource_update_timer_->enableTimer(config_.resource_update_debounce); }); - update_timer->enableTimer(config_.resource_update_debounce); + resource_update_timer_->enableTimer(config_.resource_update_debounce); } void McpServer::stopBackgroundTasks() { background_threads_running_ = false; - // Timers will naturally expire and not reschedule + // Explicitly disable and release timers so they don't fire after shutdown + if (session_cleanup_timer_) { + session_cleanup_timer_->disableTimer(); + session_cleanup_timer_.reset(); + } + if (resource_update_timer_) { + resource_update_timer_->disableTimer(); + resource_update_timer_.reset(); + } } // ListenerCallbacks implementation (production pattern)