Skip to content

Commit 5285277

Browse files
authored
Merge pull request #291 from koic/improvement_close_streams_outside_mutex
Close streams outside mutex in session cleanup
2 parents c563036 + 66428ac commit 5285277

File tree

2 files changed

+121
-35
lines changed

2 files changed

+121
-35
lines changed

lib/mcp/server/transports/streamable_http_transport.rb

Lines changed: 55 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,12 @@ def close
5050
@reaper_thread&.kill
5151
@reaper_thread = nil
5252

53-
@mutex.synchronize do
54-
@sessions.each_key { |session_id| cleanup_session_unsafe(session_id) }
53+
removed_sessions = @mutex.synchronize do
54+
@sessions.each_key.filter_map { |session_id| cleanup_session_unsafe(session_id) }
55+
end
56+
57+
removed_sessions.each do |session|
58+
close_stream_safely(session[:stream])
5559
end
5660
end
5761

@@ -65,15 +69,17 @@ def send_notification(method, params = nil, session_id: nil)
6569
}
6670
notification[:params] = params if params
6771

68-
@mutex.synchronize do
72+
streams_to_close = []
73+
74+
result = @mutex.synchronize do
6975
if session_id
7076
# Send to specific session
7177
session = @sessions[session_id]
72-
return false unless session && session[:stream]
78+
next false unless session && session[:stream]
7379

7480
if session_expired?(session)
75-
cleanup_session_unsafe(session_id)
76-
return false
81+
cleanup_and_collect_stream(session_id, streams_to_close)
82+
next false
7783
end
7884

7985
begin
@@ -84,7 +90,7 @@ def send_notification(method, params = nil, session_id: nil)
8490
e,
8591
{ session_id: session_id, error: "Failed to send notification" },
8692
)
87-
cleanup_session_unsafe(session_id)
93+
cleanup_and_collect_stream(session_id, streams_to_close)
8894
false
8995
end
9096
else
@@ -113,11 +119,17 @@ def send_notification(method, params = nil, session_id: nil)
113119
end
114120

115121
# Clean up failed sessions
116-
failed_sessions.each { |sid| cleanup_session_unsafe(sid) }
122+
failed_sessions.each { |sid| cleanup_and_collect_stream(sid, streams_to_close) }
117123

118124
sent_count
119125
end
120126
end
127+
128+
streams_to_close.each do |stream|
129+
close_stream_safely(stream)
130+
end
131+
132+
result
121133
end
122134

123135
private
@@ -136,22 +148,16 @@ def start_reaper_thread
136148
def reap_expired_sessions
137149
return unless @session_idle_timeout
138150

139-
expired_streams = @mutex.synchronize do
140-
@sessions.each_with_object([]) do |(session_id, session), streams|
141-
next unless session_expired?(session)
151+
removed_sessions = @mutex.synchronize do
152+
@sessions.each_key.filter_map do |session_id|
153+
next unless session_expired?(@sessions[session_id])
142154

143-
streams << session[:stream] if session[:stream]
144-
@sessions.delete(session_id)
155+
cleanup_session_unsafe(session_id)
145156
end
146157
end
147158

148-
expired_streams.each do |stream|
149-
# Closing outside the mutex is safe because expired sessions are already
150-
# removed from `@sessions` above, so other threads will not find them
151-
# and will not attempt to close the same stream.
152-
stream.close
153-
rescue StandardError
154-
# Ignore close-related errors from already closed/broken streams.
159+
removed_sessions.each do |session|
160+
close_stream_safely(session[:stream])
155161
end
156162
end
157163

@@ -228,23 +234,32 @@ def handle_delete(request)
228234
end
229235

230236
def cleanup_session(session_id)
231-
@mutex.synchronize do
237+
session = @mutex.synchronize do
232238
cleanup_session_unsafe(session_id)
233239
end
240+
241+
close_stream_safely(session[:stream]) if session
234242
end
235243

244+
# Removes a session from `@sessions` and returns it. Does not close the stream.
245+
# Callers must close the stream outside the mutex to avoid holding the lock during
246+
# potentially blocking I/O.
236247
def cleanup_session_unsafe(session_id)
237-
session = @sessions[session_id]
238-
return unless session
239-
240-
begin
241-
session[:stream]&.close
242-
rescue StandardError
243-
# Ignore close-related errors from already closed/broken streams.
244-
end
245248
@sessions.delete(session_id)
246249
end
247250

251+
def cleanup_and_collect_stream(session_id, streams_to_close)
252+
return unless (removed = cleanup_session_unsafe(session_id))
253+
254+
streams_to_close << removed[:stream]
255+
end
256+
257+
def close_stream_safely(stream)
258+
stream&.close
259+
rescue StandardError
260+
# Ignore close-related errors from already closed/broken streams.
261+
end
262+
248263
def extract_session_id(request)
249264
request.env["HTTP_MCP_SESSION_ID"]
250265
end
@@ -357,19 +372,24 @@ def handle_regular_request(body_string, session_id)
357372
end
358373

359374
def validate_and_touch_session(session_id)
360-
@mutex.synchronize do
361-
return session_not_found_response unless (session = @sessions[session_id])
362-
return unless @session_idle_timeout
375+
removed = nil
376+
377+
response = @mutex.synchronize do
378+
next session_not_found_response unless (session = @sessions[session_id])
379+
next unless @session_idle_timeout
363380

364381
if session_expired?(session)
365-
cleanup_session_unsafe(session_id)
366-
return session_not_found_response
382+
removed = cleanup_session_unsafe(session_id)
383+
next session_not_found_response
367384
end
368385

369386
session[:last_active_at] = Process.clock_gettime(Process::CLOCK_MONOTONIC)
387+
nil
370388
end
371389

372-
nil
390+
close_stream_safely(removed[:stream]) if removed
391+
392+
response
373393
end
374394

375395
def get_session_stream(session_id)

test/mcp/server/transports/streamable_http_transport_test.rb

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -857,6 +857,37 @@ class StreamableHTTPTransportTest < ActiveSupport::TestCase
857857
assert_not @transport.instance_variable_get(:@sessions).key?(session_id)
858858
end
859859

860+
test "send_notification closes stream outside mutex on write error" do
861+
init_request = create_rack_request(
862+
"POST",
863+
"/",
864+
{ "CONTENT_TYPE" => "application/json" },
865+
{ jsonrpc: "2.0", method: "initialize", id: "123" }.to_json,
866+
)
867+
init_response = @transport.handle_request(init_request)
868+
session_id = init_response[1]["Mcp-Session-Id"]
869+
870+
# Use a mock stream that verifies mutex is NOT held during close.
871+
mutex = @transport.instance_variable_get(:@mutex)
872+
closed_outside_mutex = false
873+
mock_stream = Object.new
874+
mock_stream.define_singleton_method(:write) { |_data| raise Errno::EPIPE }
875+
mock_stream.define_singleton_method(:close) do
876+
if mutex.try_lock
877+
closed_outside_mutex = true
878+
mutex.unlock
879+
end
880+
end
881+
882+
@transport.instance_variable_get(:@sessions)[session_id][:stream] = mock_stream
883+
884+
result = @transport.send_notification("test", { message: "test" }, session_id: session_id)
885+
886+
refute result
887+
assert closed_outside_mutex, "Stream should be closed outside the mutex"
888+
assert_not @transport.instance_variable_get(:@sessions).key?(session_id)
889+
end
890+
860891
test "send_notification broadcast continues when one session raises Errno::ECONNRESET" do
861892
# Create two sessions.
862893
init_request1 = create_rack_request(
@@ -1613,6 +1644,41 @@ class StreamableHTTPTransportTest < ActiveSupport::TestCase
16131644
transport.close
16141645
end
16151646

1647+
test "reap_expired_sessions closes stream outside mutex" do
1648+
transport = StreamableHTTPTransport.new(@server, session_idle_timeout: 0.01)
1649+
1650+
init_request = create_rack_request(
1651+
"POST",
1652+
"/",
1653+
{ "CONTENT_TYPE" => "application/json" },
1654+
{ jsonrpc: "2.0", method: "initialize", id: "init" }.to_json,
1655+
)
1656+
init_response = transport.handle_request(init_request)
1657+
session_id = init_response[1]["Mcp-Session-Id"]
1658+
1659+
# Replace the stream with one that verifies mutex is NOT held during close.
1660+
mutex = transport.instance_variable_get(:@mutex)
1661+
closed_outside_mutex = false
1662+
mock_stream = Object.new
1663+
mock_stream.define_singleton_method(:close) do
1664+
# If stream.close runs outside the mutex, try_lock succeeds.
1665+
if mutex.try_lock
1666+
closed_outside_mutex = true
1667+
mutex.unlock
1668+
end
1669+
end
1670+
transport.instance_variable_get(:@sessions)[session_id][:stream] = mock_stream
1671+
1672+
sleep(0.02) # Wait for session to expire.
1673+
1674+
transport.send(:reap_expired_sessions)
1675+
1676+
assert(closed_outside_mutex, "Stream should be closed outside the mutex")
1677+
assert_empty(transport.instance_variable_get(:@sessions))
1678+
ensure
1679+
transport.close
1680+
end
1681+
16161682
test "close stops the reaper thread" do
16171683
transport = StreamableHTTPTransport.new(@server, session_idle_timeout: 3600)
16181684
reaper_thread = transport.instance_variable_get(:@reaper_thread)

0 commit comments

Comments
 (0)