Skip to content

Commit 1443117

Browse files
committed
fix(server): evict stale GET SSE stream mapping on reconnect
When a GET SSE stream disconnects and the client reconnects, the previous stream's STANDALONE_SSE_STREAM_ID entry in streamsMapping may still be present because cleanup via invokeOnCompletion is asynchronous. The client gets a 200 OK (headers already committed by Ktor's sse {} handler) with an immediately-closed empty stream, causing a retry loop. Changes: - Explicitly close the old SSE session before establishing a new stream. If the old session is already dead this is a no-op. - Replace invokeOnCompletion with try/finally around awaitCancellation() for faster cleanup during cancellation. - Add identity-based removal (=== check) in finally and emitOnStream so old stream cleanup cannot evict a replacement's mapping. - Rethrow CancellationException instead of swallowing it. Fixes #715
1 parent bf8dc6d commit 1443117

3 files changed

Lines changed: 310 additions & 15 deletions

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
package io.modelcontextprotocol.kotlin.sdk.integration.streamablehttp
2+
3+
import io.kotest.matchers.nulls.shouldNotBeNull
4+
import io.kotest.matchers.shouldBe
5+
import io.ktor.client.HttpClient
6+
import io.ktor.client.plugins.sse.SSE
7+
import io.ktor.client.request.header
8+
import io.ktor.client.request.post
9+
import io.ktor.client.request.prepareGet
10+
import io.ktor.client.request.setBody
11+
import io.ktor.client.statement.bodyAsChannel
12+
import io.ktor.http.ContentType
13+
import io.ktor.http.HttpHeaders
14+
import io.ktor.http.HttpStatusCode
15+
import io.ktor.http.contentType
16+
import io.ktor.utils.io.readUTF8Line
17+
import io.modelcontextprotocol.kotlin.sdk.types.ClientCapabilities
18+
import io.modelcontextprotocol.kotlin.sdk.types.Implementation
19+
import io.modelcontextprotocol.kotlin.sdk.types.InitializeRequest
20+
import io.modelcontextprotocol.kotlin.sdk.types.InitializeRequestParams
21+
import io.modelcontextprotocol.kotlin.sdk.types.JSONRPCRequest
22+
import io.modelcontextprotocol.kotlin.sdk.types.LATEST_PROTOCOL_VERSION
23+
import io.modelcontextprotocol.kotlin.sdk.types.toJSON
24+
import io.modelcontextprotocol.kotlin.test.utils.actualPort
25+
import kotlinx.coroutines.Dispatchers
26+
import kotlinx.coroutines.runBlocking
27+
import kotlinx.serialization.json.Json
28+
import kotlin.test.Test
29+
import kotlin.test.assertNotNull
30+
import io.ktor.client.engine.cio.CIO as ClientCIO
31+
32+
private const val SESSION_ID_HEADER = "mcp-session-id"
33+
private const val PROTOCOL_VERSION_HEADER = "mcp-protocol-version"
34+
35+
/**
36+
* Integration tests for GET SSE stream reconnection using a real embedded CIO server.
37+
*
38+
* Verifies that the transport correctly evicts stale STANDALONE_SSE_STREAM_ID
39+
* entries when a client reconnects after a disconnect, rather than silently
40+
* rejecting the new stream.
41+
*/
42+
class StreamableHttpSseReconnectTest : AbstractStreamableHttpIntegrationTest() {
43+
44+
/**
45+
* Verifies that after a GET SSE stream disconnects and the client
46+
* immediately reconnects, the server evicts the stale stream mapping
47+
* and allows the new stream to succeed.
48+
*/
49+
@Test
50+
fun `GET SSE reconnect after disconnect should succeed`(): Unit = runBlocking(Dispatchers.IO) {
51+
var server: StreamableHttpTestServer? = null
52+
var httpClient: HttpClient? = null
53+
54+
try {
55+
server = initTestServer("reconnect-test")
56+
val port = server.ktorServer.actualPort()
57+
val mcpUrl = "http://$URL:$port/mcp"
58+
59+
httpClient = HttpClient(ClientCIO) { install(SSE) }
60+
61+
// Step 1: Initialize session via POST
62+
val initResponse = httpClient.post(mcpUrl) {
63+
contentType(ContentType.Application.Json)
64+
header(
65+
HttpHeaders.Accept,
66+
"${ContentType.Application.Json}, ${ContentType.Text.EventStream}",
67+
)
68+
setBody(Json.encodeToString(buildInitPayload()))
69+
}
70+
initResponse.status shouldBe HttpStatusCode.OK
71+
val sessionId = assertNotNull(initResponse.headers[SESSION_ID_HEADER])
72+
73+
// Step 2: Open GET SSE stream, consume the flush event, then disconnect
74+
httpClient.prepareGet(mcpUrl) {
75+
header(HttpHeaders.Accept, ContentType.Text.EventStream.toString())
76+
header(SESSION_ID_HEADER, sessionId)
77+
header(PROTOCOL_VERSION_HEADER, LATEST_PROTOCOL_VERSION)
78+
}.execute { response ->
79+
response.status shouldBe HttpStatusCode.OK
80+
response.bodyAsChannel().readUTF8Line()
81+
}
82+
83+
// Step 3: Immediately reconnect. The transport detects that the
84+
// previous stream's coroutine is no longer active and evicts the
85+
// stale mapping, allowing the new stream to succeed.
86+
httpClient.prepareGet(mcpUrl) {
87+
header(HttpHeaders.Accept, ContentType.Text.EventStream.toString())
88+
header(SESSION_ID_HEADER, sessionId)
89+
header(PROTOCOL_VERSION_HEADER, LATEST_PROTOCOL_VERSION)
90+
}.execute { response ->
91+
response.status shouldBe HttpStatusCode.OK
92+
response.headers[SESSION_ID_HEADER] shouldBe sessionId
93+
94+
val channel = response.bodyAsChannel()
95+
val firstLine = channel.readUTF8Line()
96+
firstLine.shouldNotBeNull()
97+
channel.isClosedForRead shouldBe false
98+
}
99+
} finally {
100+
httpClient?.close()
101+
server?.ktorServer?.stopSuspend(1000, 2000)
102+
}
103+
}
104+
105+
/**
106+
* Verifies that a second concurrent GET SSE stream on the same session
107+
* closes the old stream and takes over. The new stream should be live.
108+
*/
109+
@Test
110+
fun `concurrent GET SSE stream closes old stream and takes over`(): Unit = runBlocking(Dispatchers.IO) {
111+
var server: StreamableHttpTestServer? = null
112+
var httpClient: HttpClient? = null
113+
114+
try {
115+
server = initTestServer("takeover-test")
116+
val port = server.ktorServer.actualPort()
117+
val mcpUrl = "http://$URL:$port/mcp"
118+
119+
httpClient = HttpClient(ClientCIO) { install(SSE) }
120+
121+
// Step 1: Initialize session via POST
122+
val initResponse = httpClient.post(mcpUrl) {
123+
contentType(ContentType.Application.Json)
124+
header(
125+
HttpHeaders.Accept,
126+
"${ContentType.Application.Json}, ${ContentType.Text.EventStream}",
127+
)
128+
setBody(Json.encodeToString(buildInitPayload()))
129+
}
130+
initResponse.status shouldBe HttpStatusCode.OK
131+
val sessionId = assertNotNull(initResponse.headers[SESSION_ID_HEADER])
132+
133+
// Step 2: Open first GET SSE stream and keep it open
134+
httpClient.prepareGet(mcpUrl) {
135+
header(HttpHeaders.Accept, ContentType.Text.EventStream.toString())
136+
header(SESSION_ID_HEADER, sessionId)
137+
header(PROTOCOL_VERSION_HEADER, LATEST_PROTOCOL_VERSION)
138+
}.execute { firstResponse ->
139+
firstResponse.status shouldBe HttpStatusCode.OK
140+
firstResponse.bodyAsChannel().readUTF8Line()
141+
142+
// Step 3: Open a second GET — closes old stream, new one takes over
143+
httpClient.prepareGet(mcpUrl) {
144+
header(HttpHeaders.Accept, ContentType.Text.EventStream.toString())
145+
header(SESSION_ID_HEADER, sessionId)
146+
header(PROTOCOL_VERSION_HEADER, LATEST_PROTOCOL_VERSION)
147+
}.execute { secondResponse ->
148+
secondResponse.status shouldBe HttpStatusCode.OK
149+
secondResponse.headers[SESSION_ID_HEADER] shouldBe sessionId
150+
151+
// New stream is alive
152+
val secondChannel = secondResponse.bodyAsChannel()
153+
val firstLine = secondChannel.readUTF8Line()
154+
firstLine.shouldNotBeNull()
155+
secondChannel.isClosedForRead shouldBe false
156+
}
157+
}
158+
} finally {
159+
httpClient?.close()
160+
server?.ktorServer?.stopSuspend(1000, 2000)
161+
}
162+
}
163+
164+
private fun buildInitPayload(): JSONRPCRequest = InitializeRequest(
165+
InitializeRequestParams(
166+
protocolVersion = LATEST_PROTOCOL_VERSION,
167+
capabilities = ClientCapabilities(),
168+
clientInfo = Implementation(name = "reconnect-test-client", version = "1.0.0"),
169+
),
170+
).toJSON()
171+
}

kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt

Lines changed: 38 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -464,27 +464,43 @@ public class StreamableHttpServerTransport(private val configuration: Configurat
464464
}
465465
}
466466

467-
if (STANDALONE_SSE_STREAM_ID in streamsMapping) {
468-
call.reject(
469-
HttpStatusCode.Conflict,
470-
RPCError.ErrorCode.CONNECTION_CLOSED,
471-
"Conflict: Only one SSE stream is allowed per session",
472-
)
473-
return
467+
streamsMapping[STANDALONE_SSE_STREAM_ID]?.let { existingContext ->
468+
// Close the previous SSE session. If the stream is already dead this
469+
// is a no-op. If it is still alive, closing it cancels the coroutine
470+
// blocked in awaitCancellation(), which triggers the identity-guarded
471+
// finally block to remove the mapping.
472+
try {
473+
existingContext.session?.close()
474+
} catch (_: CancellationException) {
475+
throw CancellationException("Cancelled while closing previous SSE stream")
476+
} catch (_: Exception) {
477+
// Ignore — the old stream may already be closed.
478+
}
479+
// After closing, give the old coroutine's finally block a chance to
480+
// remove the mapping. If the entry is still present (race edge case),
481+
// evict it — the old session is closed either way.
482+
streamsMapping.remove(STANDALONE_SSE_STREAM_ID)
474483
}
475484

476485
// SSE headers (Content-Type, Cache-Control, Connection) are already set by the framework's SSE handler
477486
flushSse(sseSession)
478-
streamsMapping[STANDALONE_SSE_STREAM_ID] = SessionContext(sseSession, call)
487+
val newContext = SessionContext(sseSession, call)
488+
streamsMapping[STANDALONE_SSE_STREAM_ID] = newContext
479489
val clientProtocolVersion =
480490
call.request.header(MCP_PROTOCOL_VERSION_HEADER) ?: DEFAULT_NEGOTIATED_PROTOCOL_VERSION
481491
maybeSendPrimingEvent(STANDALONE_SSE_STREAM_ID, sseSession, clientProtocolVersion)
482-
sseSession.coroutineContext.job.invokeOnCompletion {
483-
streamsMapping.remove(STANDALONE_SSE_STREAM_ID)
484-
}
485492
// Keep the SSE connection open until the client disconnects or the transport is closed.
486-
// Without this, the Ktor sse{} handler returns immediately, closing the stream.
487-
awaitCancellation()
493+
// Cleanup uses try/finally (runs during cancellation propagation) instead of
494+
// invokeOnCompletion (runs after job completion) to minimize the window between
495+
// disconnect and mapping removal. Identity check ensures only this stream's entry
496+
// is removed — not a replacement that arrived in the meantime.
497+
try {
498+
awaitCancellation()
499+
} finally {
500+
if (streamsMapping[STANDALONE_SSE_STREAM_ID] === newContext) {
501+
streamsMapping.remove(STANDALONE_SSE_STREAM_ID)
502+
}
503+
}
488504
}
489505

490506
/** Handles an HTTP DELETE request by closing the session and the transport. */
@@ -725,10 +741,17 @@ public class StreamableHttpServerTransport(private val configuration: Configurat
725741
try {
726742
session?.send(event = "message", id = eventId, data = McpJson.encodeToString(message))
727743
} catch (e: CancellationException) {
728-
streamsMapping.remove(streamId)
744+
// Identity-based removal: only evict this stream's entry, not a replacement's.
745+
val current = streamsMapping[streamId]
746+
if (current?.session === session) {
747+
streamsMapping.remove(streamId)
748+
}
729749
throw e
730750
} catch (_: Exception) {
731-
streamsMapping.remove(streamId)
751+
val current = streamsMapping[streamId]
752+
if (current?.session === session) {
753+
streamsMapping.remove(streamId)
754+
}
732755
}
733756
}
734757

kotlin-sdk-server/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransportTest.kt

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,107 @@ class StreamableHttpServerTransportTest {
460460
}
461461
}
462462

463+
@Test
464+
fun `second concurrent GET SSE closes old stream and takes over`() = testApplication {
465+
val mcpPath = "/mcp"
466+
467+
application {
468+
mcpStreamableHttp(mcpPath) {
469+
Server(
470+
Implementation("test-server", "1.0.0"),
471+
ServerOptions(capabilities = ServerCapabilities()),
472+
)
473+
}
474+
}
475+
476+
val client = createTestClient()
477+
478+
// Step 1: Initialize session via POST
479+
val initResponse = client.post(mcpPath) {
480+
addStreamableHeaders()
481+
setBody(buildInitializeRequestPayload())
482+
}
483+
initResponse.status shouldBe HttpStatusCode.OK
484+
val sessionId = assertNotNull(initResponse.headers[MCP_SESSION_ID_HEADER])
485+
486+
// Step 2: Open first GET SSE stream
487+
client.prepareGet(mcpPath) {
488+
header(HttpHeaders.Accept, ContentType.Text.EventStream.toString())
489+
header(MCP_SESSION_ID_HEADER, sessionId)
490+
header("mcp-protocol-version", LATEST_PROTOCOL_VERSION)
491+
}.execute { firstResponse ->
492+
firstResponse.status shouldBe HttpStatusCode.OK
493+
firstResponse.bodyAsChannel().readUTF8Line()
494+
495+
// Step 3: Open a second GET — the transport closes the old session
496+
// and the new stream takes over.
497+
client.prepareGet(mcpPath) {
498+
header(HttpHeaders.Accept, ContentType.Text.EventStream.toString())
499+
header(MCP_SESSION_ID_HEADER, sessionId)
500+
header("mcp-protocol-version", LATEST_PROTOCOL_VERSION)
501+
}.execute { secondResponse ->
502+
secondResponse.status shouldBe HttpStatusCode.OK
503+
secondResponse.headers[MCP_SESSION_ID_HEADER] shouldBe sessionId
504+
505+
// New stream is alive
506+
val secondChannel = secondResponse.bodyAsChannel()
507+
val firstLine = secondChannel.readUTF8Line()
508+
firstLine.shouldNotBeNull()
509+
secondChannel.isClosedForRead shouldBe false
510+
}
511+
}
512+
}
513+
514+
@Test
515+
fun `GET SSE reconnect after previous stream disconnects should succeed`() = testApplication {
516+
val mcpPath = "/mcp"
517+
518+
application {
519+
mcpStreamableHttp(mcpPath) {
520+
Server(
521+
Implementation("test-server", "1.0.0"),
522+
ServerOptions(capabilities = ServerCapabilities()),
523+
)
524+
}
525+
}
526+
527+
val client = createTestClient()
528+
529+
// Step 1: Initialize session via POST
530+
val initResponse = client.post(mcpPath) {
531+
addStreamableHeaders()
532+
setBody(buildInitializeRequestPayload())
533+
}
534+
initResponse.status shouldBe HttpStatusCode.OK
535+
val sessionId = assertNotNull(initResponse.headers[MCP_SESSION_ID_HEADER])
536+
537+
// Step 2: Open and then close a GET SSE stream
538+
client.prepareGet(mcpPath) {
539+
header(HttpHeaders.Accept, ContentType.Text.EventStream.toString())
540+
header(MCP_SESSION_ID_HEADER, sessionId)
541+
header("mcp-protocol-version", LATEST_PROTOCOL_VERSION)
542+
}.execute { response ->
543+
response.status shouldBe HttpStatusCode.OK
544+
response.bodyAsChannel().readUTF8Line()
545+
}
546+
547+
// Step 3: Immediately reconnect — the transport should close the stale
548+
// stream and allow the new one.
549+
client.prepareGet(mcpPath) {
550+
header(HttpHeaders.Accept, ContentType.Text.EventStream.toString())
551+
header(MCP_SESSION_ID_HEADER, sessionId)
552+
header("mcp-protocol-version", LATEST_PROTOCOL_VERSION)
553+
}.execute { response ->
554+
response.status shouldBe HttpStatusCode.OK
555+
response.headers[MCP_SESSION_ID_HEADER] shouldBe sessionId
556+
557+
val channel = response.bodyAsChannel()
558+
val firstLine = channel.readUTF8Line()
559+
firstLine.shouldNotBeNull()
560+
channel.isClosedForRead shouldBe false
561+
}
562+
}
563+
463564
@Test
464565
fun `GET SSE stream includes Mcp-Session-Id header and stays open`() = testApplication {
465566
val mcpPath = "/mcp"

0 commit comments

Comments
 (0)