Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ endif()
#Find packages
find_package(Threads REQUIRED)
find_package(OpenSSL REQUIRED)
find_package(CURL REQUIRED)

#Find libevent
find_package(PkgConfig)
Expand Down Expand Up @@ -604,6 +605,7 @@ foreach(lib_target ${REAL_TARGETS})
nlohmann_json::nlohmann_json
${LIBEVENT_LIBRARIES}
yaml-cpp::yaml-cpp
CURL::libcurl
)

# Link logging library - static targets prefer static version to avoid transitive deps
Expand Down
9 changes: 9 additions & 0 deletions include/mcp/client/mcp_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,15 @@ class McpClient : public application::ApplicationBase {
std::atomic<bool> connected_{false};
std::string current_uri_;

// Dead connection managers awaiting deferred destruction.
// During reconnect, the old connection manager cannot be destroyed
// immediately because libevent callbacks referencing it may still be
// in-flight on the dispatcher's current event loop iteration. Moved here and
// cleaned up on the next dispatcher iteration via deferredDelete or explicit
// clear.
std::vector<std::unique_ptr<mcp::McpConnectionManager>>
dead_connection_managers_;

// Connection activity tracking for detecting stale connections
std::chrono::steady_clock::time_point last_activity_time_;
static constexpr int kConnectionIdleTimeoutSec =
Expand Down
17 changes: 15 additions & 2 deletions include/mcp/filter/http_sse_filter_chain_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,29 @@ class HttpSseFilterChainFactory : public network::FilterChainFactory {
* @param http_host HTTP Host header value for client mode
* @param use_sse True for SSE mode (GET /sse first), false for Streamable
* HTTP (direct POST)
* @param sse_path Server-side SSE endpoint path (e.g., "/sse")
* @param rpc_path Server-side JSON-RPC endpoint path (e.g., "/mcp")
* @param external_url External URL for absolute SSE callback URLs (proxy
* support)
*/
HttpSseFilterChainFactory(event::Dispatcher& dispatcher,
McpProtocolCallbacks& message_callbacks,
bool is_server = true,
const std::string& http_path = "/rpc",
const std::string& http_host = "localhost",
bool use_sse = true)
bool use_sse = true,
const std::string& sse_path = "/sse",
const std::string& rpc_path = "/mcp",
const std::string& external_url = "")
: dispatcher_(dispatcher),
message_callbacks_(message_callbacks),
is_server_(is_server),
http_path_(http_path),
http_host_(http_host),
use_sse_(use_sse) {}
use_sse_(use_sse),
sse_path_(sse_path),
rpc_path_(rpc_path),
external_url_(external_url) {}

/**
* Create filter chain for the connection
Expand Down Expand Up @@ -170,6 +180,9 @@ class HttpSseFilterChainFactory : public network::FilterChainFactory {
std::string http_path_; // HTTP request path for client mode
std::string http_host_; // HTTP Host header for client mode
bool use_sse_; // True for SSE mode, false for Streamable HTTP
std::string sse_path_; // Server-side SSE endpoint path (e.g., "/sse")
std::string rpc_path_; // Server-side JSON-RPC endpoint path (e.g., "/mcp")
std::string external_url_; // External URL for absolute SSE callback URLs
mutable bool enable_metrics_ = true; // Enable metrics by default

// Store filters for lifetime management
Expand Down
4 changes: 4 additions & 0 deletions include/mcp/mcp_connection_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ class McpConnectionManager : public McpProtocolCallbacks,
std::unique_ptr<network::ConnectionManager> connection_manager_;
network::ConnectionPtr active_connection_;

// Closed connections kept alive to prevent use-after-free from
// libevent callbacks that still reference the connection's fd.
std::vector<network::ConnectionPtr> closed_connections_;

// Server listener management
// Must keep listener manager alive for the lifetime of the server
std::unique_ptr<network::ListenerManager> listener_manager_;
Expand Down
9 changes: 8 additions & 1 deletion include/mcp/server/mcp_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,9 @@ struct McpServerConfig : public application::ApplicationBase::Config {

// HTTP/SSE specific configuration
std::string http_rpc_path = "/rpc"; // Path for JSON-RPC over HTTP
std::string http_sse_path = "/events"; // Path for SSE event stream
std::string http_sse_path = "/sse"; // Path for SSE event stream
std::string http_health_path = "/health"; // Path for health check endpoint
std::string external_url; // External URL for SSE callbacks (proxy support)

// Session management
size_t max_sessions = 100;
Expand Down Expand Up @@ -938,6 +939,12 @@ class McpServer : public application::ApplicationBase,
// Background task state
std::atomic<bool> background_threads_running_{false};

// Timer handles for background tasks — must outlive the timers they create.
// Without these, createTimer returns a unique_ptr that goes out of scope
// immediately, destroying the timer before it can fire.
event::TimerPtr session_cleanup_timer_;
event::TimerPtr resource_update_timer_;

// Deferred listen address
std::string listen_address_;
bool need_perform_listen_ = false;
Expand Down
11 changes: 7 additions & 4 deletions sdk/typescript/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sdk/typescript/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
"@typescript-eslint/parser": "^6.21.0",
"eslint": "^8.0.0",
"jest": "^29.0.0",
"prettier": "^3.6.2",
"prettier": "^3.8.3",
"ts-jest": "^29.0.0",
"tsx": "^4.20.6",
"typescript": "^5.0.0"
Expand Down
35 changes: 29 additions & 6 deletions src/client/mcp_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -332,12 +332,21 @@ VoidResult McpClient::reconnect() {

// Internal reconnection logic (must be called on dispatcher thread)
VoidResult McpClient::reconnectInternal() {
// Clean up any previously deferred dead connection managers.
// Safe now — we are on a new dispatcher iteration so all in-flight
// callbacks from the previous iteration have completed.
dead_connection_managers_.clear();

// Disconnect first if we think we're connected
if (connected_ || connection_manager_) {
// Close the old connection
// Move the old connection manager to the dead list instead of
// destroying it immediately. Libevent callbacks referencing the
// old connection may still be queued in the current event-loop
// iteration. Destroying it now causes use-after-free / SIGSEGV.
// The dead list is cleared on the next reconnect or shutdown.
if (connection_manager_) {
connection_manager_->close();
connection_manager_.reset();
dead_connection_managers_.push_back(std::move(connection_manager_));
}
connected_ = false;
initialized_ = false;
Expand Down Expand Up @@ -425,6 +434,7 @@ void McpClient::shutdown() {
// Clean up resources
protocol_state_machine_.reset();
connection_manager_.reset();
dead_connection_managers_.clear();
request_tracker_.reset();
circuit_breaker_.reset();

Expand Down Expand Up @@ -571,12 +581,25 @@ std::future<InitializeResult> McpClient::initializeProtocol() {
server_capabilities_ = init_result.capabilities;
initialized_ = true;

// Notify protocol state machine that initialization is complete
if (protocol_state_machine_) {
protocol_state_machine_->handleEvent(
protocol::McpProtocolEvent::INITIALIZED);
// Notify protocol state machine that initialization is complete.
// CRITICAL: Must post to dispatcher thread because the state machine
// timer was created on the dispatcher's event_base. Calling
// cancelStateTimer() → event_del() from this detached thread corrupts
// libevent's internal data structures, causing SIGSEGV ~30 s later
// when the (uncanceled) initialization timeout timer fires.
if (protocol_state_machine_ && main_dispatcher_) {
main_dispatcher_->post([this]() {
if (protocol_state_machine_) {
protocol_state_machine_->handleEvent(
protocol::McpProtocolEvent::INITIALIZED);
}
});
}

// Send notifications/initialized as required by MCP spec
// Must happen BEFORE any other requests (e.g., tools/list)
sendNotification("notifications/initialized", nullopt);

result_promise->set_value(init_result);
}
} catch (...) {
Expand Down
25 changes: 16 additions & 9 deletions src/filter/http_codec_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,13 @@ HttpCodecFilter::HttpCodecFilter(MessageCallbacks& callbacks,
HttpCodecStateMachineConfig config;
config.is_server = is_server_; // Set mode
config.header_timeout = std::chrono::milliseconds(30000);
config.body_timeout = std::chrono::milliseconds(60000);
// Disable body timeout for client mode. SSE connections keep the
// response body open indefinitely (infinite stream of events).
// A 60 s body timeout would fire and crash in the state machine's
// executeTransition() because the SSE connection is still alive.
config.body_timeout = is_server_
? std::chrono::milliseconds(60000)
: std::chrono::milliseconds(0);
config.idle_timeout = std::chrono::milliseconds(120000);
config.enable_keep_alive = true;
config.state_change_callback =
Expand Down Expand Up @@ -161,7 +167,10 @@ HttpCodecFilter::HttpCodecFilter(const filter::FilterCreationContext& context,
HttpCodecStateMachineConfig sm_config;
sm_config.is_server = is_server_;
sm_config.header_timeout = std::chrono::milliseconds(30000);
sm_config.body_timeout = std::chrono::milliseconds(60000);
// Disable body timeout for client mode — SSE streams never end.
sm_config.body_timeout = is_server_
? std::chrono::milliseconds(60000)
: std::chrono::milliseconds(0);
sm_config.idle_timeout = std::chrono::milliseconds(120000);
sm_config.enable_keep_alive = true;
sm_config.state_change_callback =
Expand Down Expand Up @@ -305,19 +314,17 @@ network::FilterStatus HttpCodecFilter::onWrite(Buffer& data, bool end_stream) {
GOPHER_LOG_TRACE("onWrite: Content-Length={} body_preview={}...",
body_length, body_data.substr(0, 50));
response << "Cache-Control: no-cache\r\n";
// Close connection after response. Streamable HTTP JSON-RPC is
// one request → one response. Using Connection: close ensures
// the TCP socket is fully flushed and closed, which prevents
// responses from being stuck in proxy buffers (Traefik, nginx).
response << "Connection: close\r\n";
// CORS headers for browser-based clients (e.g., MCP Inspector)
response << "Access-Control-Allow-Origin: *\r\n";
response << "Access-Control-Allow-Methods: GET, POST, OPTIONS\r\n";
response
<< "Access-Control-Allow-Headers: Content-Type, Authorization, "
"Accept, Mcp-Session-Id, Mcp-Protocol-Version\r\n";
if (current_stream_) {
response << "Connection: "
<< (current_stream_->keep_alive ? "keep-alive" : "close")
<< "\r\n";
} else {
response << "Connection: keep-alive\r\n";
}
response << "\r\n";
response << body_data;
}
Expand Down
Loading
Loading