Skip to content

Commit 548abcb

Browse files
committed
fix(server): evict stale GET SSE stream mapping on reconnect
When a GET SSE stream disconnects and the client reconnects, the previous stream's STANDALONE_SSE_STREAM_ID entry in streamsMapping may still be present because cleanup via invokeOnCompletion is asynchronous. The client gets a 200 OK (headers already committed by Ktor's sse {} handler) with an immediately-closed empty stream, causing a retry loop. Replace the hard 409 rejection with a close-and-replace strategy: when a new GET arrives and a STANDALONE_SSE_STREAM_ID entry exists, close the old SSE session and remove the stale mapping before establishing the new stream. Fixes #715
1 parent bf8dc6d commit 548abcb

3 files changed

Lines changed: 280 additions & 7 deletions

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
package io.modelcontextprotocol.kotlin.sdk.integration.streamablehttp
2+
3+
import io.kotest.matchers.nulls.shouldNotBeNull
4+
import io.kotest.matchers.shouldBe
5+
import io.ktor.client.HttpClient
6+
import io.ktor.client.plugins.sse.SSE
7+
import io.ktor.client.request.header
8+
import io.ktor.client.request.post
9+
import io.ktor.client.request.prepareGet
10+
import io.ktor.client.request.setBody
11+
import io.ktor.client.statement.bodyAsChannel
12+
import io.ktor.http.ContentType
13+
import io.ktor.http.HttpHeaders
14+
import io.ktor.http.HttpStatusCode
15+
import io.ktor.http.contentType
16+
import io.ktor.utils.io.readUTF8Line
17+
import io.modelcontextprotocol.kotlin.sdk.types.ClientCapabilities
18+
import io.modelcontextprotocol.kotlin.sdk.types.Implementation
19+
import io.modelcontextprotocol.kotlin.sdk.types.InitializeRequest
20+
import io.modelcontextprotocol.kotlin.sdk.types.InitializeRequestParams
21+
import io.modelcontextprotocol.kotlin.sdk.types.JSONRPCRequest
22+
import io.modelcontextprotocol.kotlin.sdk.types.LATEST_PROTOCOL_VERSION
23+
import io.modelcontextprotocol.kotlin.sdk.types.toJSON
24+
import io.modelcontextprotocol.kotlin.test.utils.actualPort
25+
import kotlinx.coroutines.Dispatchers
26+
import kotlinx.coroutines.runBlocking
27+
import kotlinx.serialization.json.Json
28+
import kotlin.test.Test
29+
import kotlin.test.assertNotNull
30+
import io.ktor.client.engine.cio.CIO as ClientCIO
31+
32+
private const val SESSION_ID_HEADER = "mcp-session-id"
33+
private const val PROTOCOL_VERSION_HEADER = "mcp-protocol-version"
34+
35+
/**
36+
* Integration tests for GET SSE stream reconnection using a real embedded CIO server.
37+
*
38+
* Verifies that the transport correctly evicts stale STANDALONE_SSE_STREAM_ID
39+
* entries when a client reconnects after a disconnect, rather than silently
40+
* rejecting the new stream.
41+
*/
42+
class StreamableHttpSseReconnectTest : AbstractStreamableHttpIntegrationTest() {
43+
44+
/**
45+
* Verifies that after a GET SSE stream disconnects and the client
46+
* immediately reconnects, the server evicts the stale stream mapping
47+
* and allows the new stream to succeed.
48+
*/
49+
@Test
50+
fun `GET SSE reconnect after disconnect should succeed`(): Unit = runBlocking(Dispatchers.IO) {
51+
var server: StreamableHttpTestServer? = null
52+
var httpClient: HttpClient? = null
53+
54+
try {
55+
server = initTestServer("reconnect-test")
56+
val port = server.ktorServer.actualPort()
57+
val mcpUrl = "http://$URL:$port/mcp"
58+
59+
httpClient = HttpClient(ClientCIO) { install(SSE) }
60+
61+
// Step 1: Initialize session via POST
62+
val initResponse = httpClient.post(mcpUrl) {
63+
contentType(ContentType.Application.Json)
64+
header(
65+
HttpHeaders.Accept,
66+
"${ContentType.Application.Json}, ${ContentType.Text.EventStream}",
67+
)
68+
setBody(Json.encodeToString(buildInitPayload()))
69+
}
70+
initResponse.status shouldBe HttpStatusCode.OK
71+
val sessionId = assertNotNull(initResponse.headers[SESSION_ID_HEADER])
72+
73+
// Step 2: Open GET SSE stream, consume the flush event, then disconnect
74+
httpClient.prepareGet(mcpUrl) {
75+
header(HttpHeaders.Accept, ContentType.Text.EventStream.toString())
76+
header(SESSION_ID_HEADER, sessionId)
77+
header(PROTOCOL_VERSION_HEADER, LATEST_PROTOCOL_VERSION)
78+
}.execute { response ->
79+
response.status shouldBe HttpStatusCode.OK
80+
response.bodyAsChannel().readUTF8Line()
81+
}
82+
83+
// Step 3: Immediately reconnect. The transport should detect that
84+
// the previous stream's coroutine is cancelled and evict the stale
85+
// mapping, allowing the new stream to succeed.
86+
httpClient.prepareGet(mcpUrl) {
87+
header(HttpHeaders.Accept, ContentType.Text.EventStream.toString())
88+
header(SESSION_ID_HEADER, sessionId)
89+
header(PROTOCOL_VERSION_HEADER, LATEST_PROTOCOL_VERSION)
90+
}.execute { response ->
91+
response.status shouldBe HttpStatusCode.OK
92+
response.headers[SESSION_ID_HEADER] shouldBe sessionId
93+
94+
val channel = response.bodyAsChannel()
95+
val firstLine = channel.readUTF8Line()
96+
firstLine.shouldNotBeNull()
97+
channel.isClosedForRead shouldBe false
98+
}
99+
} finally {
100+
httpClient?.close()
101+
server?.ktorServer?.stopSuspend(1000, 2000)
102+
}
103+
}
104+
105+
/**
106+
* Verifies that a second concurrent GET SSE stream on the same session
107+
* replaces the first one (last-writer-wins). The new stream should be
108+
* live and the old stream closed.
109+
*/
110+
@Test
111+
fun `concurrent GET SSE stream replaces the previous stream`(): Unit = runBlocking(Dispatchers.IO) {
112+
var server: StreamableHttpTestServer? = null
113+
var httpClient: HttpClient? = null
114+
115+
try {
116+
server = initTestServer("replace-test")
117+
val port = server.ktorServer.actualPort()
118+
val mcpUrl = "http://$URL:$port/mcp"
119+
120+
httpClient = HttpClient(ClientCIO) { install(SSE) }
121+
122+
// Step 1: Initialize session via POST
123+
val initResponse = httpClient.post(mcpUrl) {
124+
contentType(ContentType.Application.Json)
125+
header(
126+
HttpHeaders.Accept,
127+
"${ContentType.Application.Json}, ${ContentType.Text.EventStream}",
128+
)
129+
setBody(Json.encodeToString(buildInitPayload()))
130+
}
131+
initResponse.status shouldBe HttpStatusCode.OK
132+
val sessionId = assertNotNull(initResponse.headers[SESSION_ID_HEADER])
133+
134+
// Step 2: Open first GET SSE stream and keep it open
135+
httpClient.prepareGet(mcpUrl) {
136+
header(HttpHeaders.Accept, ContentType.Text.EventStream.toString())
137+
header(SESSION_ID_HEADER, sessionId)
138+
header(PROTOCOL_VERSION_HEADER, LATEST_PROTOCOL_VERSION)
139+
}.execute { firstResponse ->
140+
firstResponse.status shouldBe HttpStatusCode.OK
141+
firstResponse.bodyAsChannel().readUTF8Line()
142+
143+
// Step 3: Open a second GET — the new stream replaces the old one
144+
httpClient.prepareGet(mcpUrl) {
145+
header(HttpHeaders.Accept, ContentType.Text.EventStream.toString())
146+
header(SESSION_ID_HEADER, sessionId)
147+
header(PROTOCOL_VERSION_HEADER, LATEST_PROTOCOL_VERSION)
148+
}.execute { secondResponse ->
149+
secondResponse.status shouldBe HttpStatusCode.OK
150+
secondResponse.headers[SESSION_ID_HEADER] shouldBe sessionId
151+
152+
val channel = secondResponse.bodyAsChannel()
153+
val firstLine = channel.readUTF8Line()
154+
firstLine.shouldNotBeNull()
155+
channel.isClosedForRead shouldBe false
156+
}
157+
}
158+
} finally {
159+
httpClient?.close()
160+
server?.ktorServer?.stopSuspend(1000, 2000)
161+
}
162+
}
163+
164+
private fun buildInitPayload(): JSONRPCRequest = InitializeRequest(
165+
InitializeRequestParams(
166+
protocolVersion = LATEST_PROTOCOL_VERSION,
167+
capabilities = ClientCapabilities(),
168+
clientInfo = Implementation(name = "reconnect-test-client", version = "1.0.0"),
169+
),
170+
).toJSON()
171+
}

kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -464,13 +464,15 @@ public class StreamableHttpServerTransport(private val configuration: Configurat
464464
}
465465
}
466466

467-
if (STANDALONE_SSE_STREAM_ID in streamsMapping) {
468-
call.reject(
469-
HttpStatusCode.Conflict,
470-
RPCError.ErrorCode.CONNECTION_CLOSED,
471-
"Conflict: Only one SSE stream is allowed per session",
472-
)
473-
return
467+
streamsMapping.remove(STANDALONE_SSE_STREAM_ID)?.let { existingStream ->
468+
// Close the previous SSE stream. This handles both stale streams
469+
// (client disconnected but cleanup hasn't fired) and live streams
470+
// (client opened a new connection, replacing the old one).
471+
try {
472+
existingStream.session?.close()
473+
} catch (_: Exception) {
474+
// Ignore — the old stream may already be closed.
475+
}
474476
}
475477

476478
// SSE headers (Content-Type, Cache-Control, Connection) are already set by the framework's SSE handler

kotlin-sdk-server/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransportTest.kt

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,106 @@ class StreamableHttpServerTransportTest {
460460
}
461461
}
462462

463+
@Test
464+
fun `second concurrent GET SSE replaces the first stream`() = testApplication {
465+
val mcpPath = "/mcp"
466+
467+
application {
468+
mcpStreamableHttp(mcpPath) {
469+
Server(
470+
Implementation("test-server", "1.0.0"),
471+
ServerOptions(capabilities = ServerCapabilities()),
472+
)
473+
}
474+
}
475+
476+
val client = createTestClient()
477+
478+
// Step 1: Initialize session via POST
479+
val initResponse = client.post(mcpPath) {
480+
addStreamableHeaders()
481+
setBody(buildInitializeRequestPayload())
482+
}
483+
initResponse.status shouldBe HttpStatusCode.OK
484+
val sessionId = assertNotNull(initResponse.headers[MCP_SESSION_ID_HEADER])
485+
486+
// Step 2: Open first GET SSE stream
487+
client.prepareGet(mcpPath) {
488+
header(HttpHeaders.Accept, ContentType.Text.EventStream.toString())
489+
header(MCP_SESSION_ID_HEADER, sessionId)
490+
header("mcp-protocol-version", LATEST_PROTOCOL_VERSION)
491+
}.execute { firstResponse ->
492+
firstResponse.status shouldBe HttpStatusCode.OK
493+
firstResponse.bodyAsChannel().readUTF8Line()
494+
495+
// Step 3: While the first stream is open, open a second GET.
496+
// The new stream replaces the old one (last-writer-wins).
497+
client.prepareGet(mcpPath) {
498+
header(HttpHeaders.Accept, ContentType.Text.EventStream.toString())
499+
header(MCP_SESSION_ID_HEADER, sessionId)
500+
header("mcp-protocol-version", LATEST_PROTOCOL_VERSION)
501+
}.execute { secondResponse ->
502+
secondResponse.status shouldBe HttpStatusCode.OK
503+
secondResponse.headers[MCP_SESSION_ID_HEADER] shouldBe sessionId
504+
505+
val channel = secondResponse.bodyAsChannel()
506+
val firstLine = channel.readUTF8Line()
507+
firstLine.shouldNotBeNull()
508+
channel.isClosedForRead shouldBe false
509+
}
510+
}
511+
}
512+
513+
@Test
514+
fun `GET SSE reconnect after previous stream disconnects should succeed`() = testApplication {
515+
val mcpPath = "/mcp"
516+
517+
application {
518+
mcpStreamableHttp(mcpPath) {
519+
Server(
520+
Implementation("test-server", "1.0.0"),
521+
ServerOptions(capabilities = ServerCapabilities()),
522+
)
523+
}
524+
}
525+
526+
val client = createTestClient()
527+
528+
// Step 1: Initialize session via POST
529+
val initResponse = client.post(mcpPath) {
530+
addStreamableHeaders()
531+
setBody(buildInitializeRequestPayload())
532+
}
533+
initResponse.status shouldBe HttpStatusCode.OK
534+
val sessionId = assertNotNull(initResponse.headers[MCP_SESSION_ID_HEADER])
535+
536+
// Step 2: Open and then close a GET SSE stream
537+
client.prepareGet(mcpPath) {
538+
header(HttpHeaders.Accept, ContentType.Text.EventStream.toString())
539+
header(MCP_SESSION_ID_HEADER, sessionId)
540+
header("mcp-protocol-version", LATEST_PROTOCOL_VERSION)
541+
}.execute { response ->
542+
response.status shouldBe HttpStatusCode.OK
543+
response.bodyAsChannel().readUTF8Line()
544+
}
545+
546+
// Step 3: Immediately reconnect — the transport should close the stale
547+
// stream and allow the new one.
548+
client.prepareGet(mcpPath) {
549+
header(HttpHeaders.Accept, ContentType.Text.EventStream.toString())
550+
header(MCP_SESSION_ID_HEADER, sessionId)
551+
header("mcp-protocol-version", LATEST_PROTOCOL_VERSION)
552+
}.execute { response ->
553+
response.status shouldBe HttpStatusCode.OK
554+
response.headers[MCP_SESSION_ID_HEADER] shouldBe sessionId
555+
556+
val channel = response.bodyAsChannel()
557+
val firstLine = channel.readUTF8Line()
558+
firstLine.shouldNotBeNull()
559+
channel.isClosedForRead shouldBe false
560+
}
561+
}
562+
463563
@Test
464564
fun `GET SSE stream includes Mcp-Session-Id header and stays open`() = testApplication {
465565
val mcpPath = "/mcp"

0 commit comments

Comments
 (0)