Skip to content

Commit 65069ed

Browse files
committed
Rename session[:stream] to session[:get_sse_stream]
Follow-up to #294. The generic `:stream` key does not convey that it refers to the GET SSE stream, making it harder to distinguish from `session[:post_request_streams]` introduced in #294.
1 parent 40b048b commit 65069ed

File tree

2 files changed

+22
-22
lines changed

2 files changed

+22
-22
lines changed

lib/mcp/server/transports/streamable_http_transport.rb

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ class StreamableHTTPTransport < Transport
1515

1616
def initialize(server, stateless: false, session_idle_timeout: nil)
1717
super(server)
18-
# Maps `session_id` to `{ stream: stream_object, server_session: ServerSession, last_active_at: float_from_monotonic_clock }`.
18+
# Maps `session_id` to `{ get_sse_stream: stream_object, server_session: ServerSession, last_active_at: float_from_monotonic_clock }`.
1919
@sessions = {}
2020
@mutex = Mutex.new
2121

@@ -61,7 +61,7 @@ def close
6161
end
6262

6363
removed_sessions.each do |session|
64-
close_stream_safely(session[:stream])
64+
close_stream_safely(session[:get_sse_stream])
6565
close_post_request_streams(session)
6666
end
6767
end
@@ -113,7 +113,7 @@ def send_notification(method, params = nil, session_id: nil, related_request_id:
113113
failed_sessions = []
114114

115115
@sessions.each do |sid, session|
116-
next unless (stream = session[:stream])
116+
next unless (stream = session[:get_sse_stream])
117117

118118
if session_expired?(session)
119119
failed_sessions << sid
@@ -247,7 +247,7 @@ def reap_expired_sessions
247247
end
248248

249249
removed_sessions.each do |session|
250-
close_stream_safely(session[:stream])
250+
close_stream_safely(session[:get_sse_stream])
251251
close_post_request_streams(session)
252252
end
253253
end
@@ -334,7 +334,7 @@ def cleanup_session(session_id)
334334
end
335335

336336
if session
337-
close_stream_safely(session[:stream])
337+
close_stream_safely(session[:get_sse_stream])
338338
close_post_request_streams(session)
339339
end
340340
end
@@ -358,7 +358,7 @@ def cleanup_session_unsafe(session_id)
358358
def cleanup_and_collect_stream(session_id, streams_to_close)
359359
return unless (removed = cleanup_session_unsafe(session_id))
360360

361-
streams_to_close << removed[:stream]
361+
streams_to_close << removed[:get_sse_stream]
362362
removed[:post_request_streams]&.each_value { |stream| streams_to_close << stream }
363363
end
364364

@@ -449,7 +449,7 @@ def handle_initialization(body_string, body)
449449

450450
@mutex.synchronize do
451451
@sessions[session_id] = {
452-
stream: nil,
452+
get_sse_stream: nil,
453453
server_session: server_session,
454454
last_active_at: Process.clock_gettime(Process::CLOCK_MONOTONIC),
455455
}
@@ -543,7 +543,7 @@ def active_stream(session, related_request_id: nil)
543543
if related_request_id
544544
session.dig(:post_request_streams, related_request_id)
545545
else
546-
session[:stream]
546+
session[:get_sse_stream]
547547
end
548548
end
549549

@@ -572,7 +572,7 @@ def validate_and_touch_session(session_id)
572572
end
573573

574574
if removed
575-
close_stream_safely(removed[:stream])
575+
close_stream_safely(removed[:get_sse_stream])
576576

577577
removed[:post_request_streams]&.each_value do |stream|
578578
close_stream_safely(stream)
@@ -583,7 +583,7 @@ def validate_and_touch_session(session_id)
583583
end
584584

585585
def get_session_stream(session_id)
586-
@mutex.synchronize { @sessions[session_id]&.fetch(:stream, nil) }
586+
@mutex.synchronize { @sessions[session_id]&.fetch(:get_sse_stream, nil) }
587587
end
588588

589589
def session_exists?(session_id)
@@ -626,8 +626,8 @@ def create_sse_body(session_id)
626626
def store_stream_for_session(session_id, stream)
627627
@mutex.synchronize do
628628
session = @sessions[session_id]
629-
if session && !session[:stream]
630-
session[:stream] = stream
629+
if session && !session[:get_sse_stream]
630+
session[:get_sse_stream] = stream
631631
else
632632
# Either session was removed, or another request already established a stream.
633633
stream.close
@@ -652,13 +652,13 @@ def start_keepalive_thread(session_id)
652652
end
653653

654654
def session_active_with_stream?(session_id)
655-
@mutex.synchronize { @sessions.key?(session_id) && @sessions[session_id][:stream] }
655+
@mutex.synchronize { @sessions.key?(session_id) && @sessions[session_id][:get_sse_stream] }
656656
end
657657

658658
def send_keepalive_ping(session_id)
659659
@mutex.synchronize do
660-
if @sessions[session_id] && @sessions[session_id][:stream]
661-
send_ping_to_stream(@sessions[session_id][:stream])
660+
if @sessions[session_id] && @sessions[session_id][:get_sse_stream]
661+
send_ping_to_stream(@sessions[session_id][:get_sse_stream])
662662
end
663663
end
664664
rescue *STREAM_WRITE_ERRORS => e

test/mcp/server/transports/streamable_http_transport_test.rb

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ def string
346346

347347
# Simulate an active SSE stream by storing a stream object in the session
348348
mock_stream = StringIO.new
349-
@transport.instance_variable_get(:@sessions)[session_id][:stream] = mock_stream
349+
@transport.instance_variable_get(:@sessions)[session_id][:get_sse_stream] = mock_stream
350350

351351
# Attempt a second GET request for the same session
352352
get_request = create_rack_request(
@@ -377,14 +377,14 @@ def string
377377
# Establish stream A
378378
stream_a = StringIO.new
379379
@transport.send(:store_stream_for_session, session_id, stream_a)
380-
assert_equal stream_a, @transport.instance_variable_get(:@sessions)[session_id][:stream]
380+
assert_equal stream_a, @transport.instance_variable_get(:@sessions)[session_id][:get_sse_stream]
381381

382382
# Attempt to store stream B (simulating a racing request)
383383
stream_b = StringIO.new
384384
@transport.send(:store_stream_for_session, session_id, stream_b)
385385

386386
# Stream A should still be the active stream
387-
assert_equal stream_a, @transport.instance_variable_get(:@sessions)[session_id][:stream]
387+
assert_equal stream_a, @transport.instance_variable_get(:@sessions)[session_id][:get_sse_stream]
388388

389389
# Stream B should have been closed
390390
assert stream_b.closed?
@@ -928,7 +928,7 @@ def string
928928
end
929929
end
930930

931-
@transport.instance_variable_get(:@sessions)[session_id][:stream] = mock_stream
931+
@transport.instance_variable_get(:@sessions)[session_id][:get_sse_stream] = mock_stream
932932

933933
result = @transport.send_notification("test", { message: "test" }, session_id: session_id)
934934

@@ -973,7 +973,7 @@ def string
973973
# The broken request_stream should be removed.
974974
refute @transport.instance_variable_get(:@sessions)[session_id][:post_request_streams].key?(related_id)
975975
# GET SSE stream should still be intact.
976-
assert @transport.instance_variable_get(:@sessions)[session_id][:stream]
976+
assert @transport.instance_variable_get(:@sessions)[session_id][:get_sse_stream]
977977
end
978978

979979
test "active_stream does not fall back to GET SSE when related_request_id is given but request_stream is missing" do
@@ -2378,7 +2378,7 @@ def string
23782378
mutex.unlock
23792379
end
23802380
end
2381-
transport.instance_variable_get(:@sessions)[session_id][:stream] = mock_stream
2381+
transport.instance_variable_get(:@sessions)[session_id][:get_sse_stream] = mock_stream
23822382

23832383
sleep(0.02) # Wait for session to expire.
23842384

@@ -2495,7 +2495,7 @@ def string
24952495

24962496
# Attach a mock stream to the session
24972497
stream = StringIO.new
2498-
transport.instance_variable_get(:@sessions)[session_id][:stream] = stream
2498+
transport.instance_variable_get(:@sessions)[session_id][:get_sse_stream] = stream
24992499

25002500
# Wait for the session to exceed the idle timeout (0.01s)
25012501
sleep(0.02)

0 commit comments

Comments
 (0)