@@ -272,6 +272,63 @@ class StreamableHTTPTransportTest < ActiveSupport::TestCase
272272 assert_equal "Missing session ID" , body [ "error" ]
273273 end
274274
275+ test "rejects duplicate SSE connection with 409" do
276+ # Create a session
277+ init_request = create_rack_request (
278+ "POST" ,
279+ "/" ,
280+ { "CONTENT_TYPE" => "application/json" } ,
281+ { jsonrpc : "2.0" , method : "initialize" , id : "init" } . to_json ,
282+ )
283+ init_response = @transport . handle_request ( init_request )
284+ session_id = init_response [ 1 ] [ "Mcp-Session-Id" ]
285+
286+ # Simulate an active SSE stream by storing a stream object in the session
287+ mock_stream = StringIO . new
288+ @transport . instance_variable_get ( :@sessions ) [ session_id ] [ :stream ] = mock_stream
289+
290+ # Attempt a second GET request for the same session
291+ get_request = create_rack_request (
292+ "GET" ,
293+ "/" ,
294+ { "HTTP_MCP_SESSION_ID" => session_id } ,
295+ )
296+
297+ response = @transport . handle_request ( get_request )
298+ assert_equal 409 , response [ 0 ]
299+ assert_equal ( { "Content-Type" => "application/json" } , response [ 1 ] )
300+
301+ body = JSON . parse ( response [ 2 ] [ 0 ] )
302+ assert_equal "Conflict: Only one SSE stream is allowed per session" , body [ "error" ]
303+ end
304+
305+ test "store_stream_for_session does not overwrite existing stream (TOCTOU guard)" do
306+ # Create a session
307+ init_request = create_rack_request (
308+ "POST" ,
309+ "/" ,
310+ { "CONTENT_TYPE" => "application/json" } ,
311+ { jsonrpc : "2.0" , method : "initialize" , id : "init" } . to_json ,
312+ )
313+ init_response = @transport . handle_request ( init_request )
314+ session_id = init_response [ 1 ] [ "Mcp-Session-Id" ]
315+
316+ # Establish stream A
317+ stream_a = StringIO . new
318+ @transport . send ( :store_stream_for_session , session_id , stream_a )
319+ assert_equal stream_a , @transport . instance_variable_get ( :@sessions ) [ session_id ] [ :stream ]
320+
321+ # Attempt to store stream B (simulating a racing request)
322+ stream_b = StringIO . new
323+ @transport . send ( :store_stream_for_session , session_id , stream_b )
324+
325+ # Stream A should still be the active stream
326+ assert_equal stream_a , @transport . instance_variable_get ( :@sessions ) [ session_id ] [ :stream ]
327+
328+ # Stream B should have been closed
329+ assert stream_b . closed?
330+ end
331+
275332 test "handles GET request with invalid session ID" do
276333 request = create_rack_request (
277334 "GET" ,
0 commit comments