Skip to content

Commit 8a7f9f8

Browse files
authored
fix(server): GET SSE stream crash on Netty in Streamable HTTP (#681)
`appendSseHeaders()` was called inside Ktor's `sse {}` block where response headers are already committed. On Netty this throws `UnsupportedOperationException`, killing the SSE stream and causing the client to retry in a loop ## How Has This Been Tested? unit, simple-streamable-server sample ## Breaking Changes none ## Types of changes - [x] Bug fix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) - [ ] Documentation update ## Checklist - [x] I have read the [MCP Documentation](https://modelcontextprotocol.io) - [x] My code follows the repository's style guidelines - [x] New and existing tests pass locally - [x ] I have added appropriate error handling - [ x] I have added or updated documentation as needed
1 parent cceca4d commit 8a7f9f8

File tree

3 files changed

+64
-4
lines changed

3 files changed

+64
-4
lines changed

kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/KtorServer.kt

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,17 @@
33
package io.modelcontextprotocol.kotlin.sdk.server
44

55
import io.github.oshai.kotlinlogging.KotlinLogging
6+
import io.ktor.http.HttpMethod
67
import io.ktor.http.HttpStatusCode
78
import io.ktor.server.application.Application
89
import io.ktor.server.application.ApplicationCall
10+
import io.ktor.server.application.ApplicationCallPipeline
911
import io.ktor.server.application.MissingApplicationPluginException
1012
import io.ktor.server.application.install
1113
import io.ktor.server.request.ApplicationRequest
1214
import io.ktor.server.request.header
15+
import io.ktor.server.request.httpMethod
16+
import io.ktor.server.response.header
1317
import io.ktor.server.response.respond
1418
import io.ktor.server.routing.Route
1519
import io.ktor.server.routing.RoutingContext
@@ -114,6 +118,16 @@ private fun Application.mcpStreamableHttp(
114118

115119
routing {
116120
route(path) {
121+
// Set Mcp-Session-Id on GET responses before Ktor's sse {} commits headers.
122+
intercept(ApplicationCallPipeline.Plugins) {
123+
if (context.request.httpMethod == HttpMethod.Get) {
124+
val sessionId = context.request.header(MCP_SESSION_ID_HEADER)
125+
if (sessionId != null && transportManager.getTransport(sessionId) != null) {
126+
context.response.header(MCP_SESSION_ID_HEADER, sessionId)
127+
}
128+
}
129+
}
130+
117131
sse {
118132
val transport = existingStreamableTransport(call, transportManager) ?: return@sse
119133
transport.handleRequest(this, call)

kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -453,8 +453,8 @@ public class StreamableHttpServerTransport(private val configuration: Configurat
453453
return
454454
}
455455

456-
call.appendSseHeaders()
457-
flushSse(sseSession) // flush headers immediately
456+
// SSE headers (Content-Type, Cache-Control, Connection) are already set by the framework's SSE handler
457+
flushSse(sseSession)
458458
streamsMapping[STANDALONE_SSE_STREAM_ID] = SessionContext(sseSession, call)
459459
maybeSendPrimingEvent(STANDALONE_SSE_STREAM_ID, sseSession, call.request.header(MCP_PROTOCOL_VERSION_HEADER))
460460
sseSession.coroutineContext.job.invokeOnCompletion {
@@ -529,8 +529,8 @@ public class StreamableHttpServerTransport(private val configuration: Configurat
529529
}
530530
}
531531

532-
call.appendSseHeaders()
533-
flushSse(session) // flush headers immediately
532+
// SSE headers are already set by the framework's SSE handler.
533+
flushSse(session)
534534

535535
val streamId = store.replayEventsAfter(lastEventId) { eventId, message ->
536536
try {

kotlin-sdk-server/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransportTest.kt

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package io.modelcontextprotocol.kotlin.sdk.server
22

33
import io.kotest.matchers.collections.shouldContainAll
44
import io.kotest.matchers.equals.shouldBeEqual
5+
import io.kotest.matchers.nulls.shouldNotBeNull
56
import io.kotest.matchers.shouldBe
67
import io.ktor.client.HttpClient
78
import io.ktor.client.call.body
@@ -10,7 +11,9 @@ import io.ktor.client.plugins.logging.Logging
1011
import io.ktor.client.request.HttpRequestBuilder
1112
import io.ktor.client.request.header
1213
import io.ktor.client.request.post
14+
import io.ktor.client.request.prepareGet
1315
import io.ktor.client.request.setBody
16+
import io.ktor.client.statement.bodyAsChannel
1417
import io.ktor.http.ContentType
1518
import io.ktor.http.HttpHeaders
1619
import io.ktor.http.HttpStatusCode
@@ -21,6 +24,7 @@ import io.ktor.server.routing.post
2124
import io.ktor.server.routing.routing
2225
import io.ktor.server.testing.ApplicationTestBuilder
2326
import io.ktor.server.testing.testApplication
27+
import io.ktor.utils.io.readUTF8Line
2428
import io.modelcontextprotocol.kotlin.sdk.types.ClientCapabilities
2529
import io.modelcontextprotocol.kotlin.sdk.types.EmptyResult
2630
import io.modelcontextprotocol.kotlin.sdk.types.Implementation
@@ -36,6 +40,7 @@ import io.modelcontextprotocol.kotlin.sdk.types.ListToolsResult
3640
import io.modelcontextprotocol.kotlin.sdk.types.McpJson
3741
import io.modelcontextprotocol.kotlin.sdk.types.Method
3842
import io.modelcontextprotocol.kotlin.sdk.types.RequestId
43+
import io.modelcontextprotocol.kotlin.sdk.types.ServerCapabilities
3944
import io.modelcontextprotocol.kotlin.sdk.types.Tool
4045
import io.modelcontextprotocol.kotlin.sdk.types.ToolSchema
4146
import io.modelcontextprotocol.kotlin.sdk.types.toJSON
@@ -433,6 +438,47 @@ class StreamableHttpServerTransportTest {
433438
}
434439
}
435440

441+
@Test
442+
fun `GET SSE stream includes Mcp-Session-Id header and stays open`() = testApplication {
443+
val mcpPath = "/mcp"
444+
445+
application {
446+
mcpStreamableHttp(mcpPath) {
447+
Server(
448+
Implementation("test-server", "1.0.0"),
449+
ServerOptions(capabilities = ServerCapabilities()),
450+
)
451+
}
452+
}
453+
454+
val client = createTestClient()
455+
456+
// Step 1: Initialize session via POST
457+
val initResponse = client.post(mcpPath) {
458+
addStreamableHeaders()
459+
setBody(buildInitializeRequestPayload())
460+
}
461+
initResponse.status shouldBe HttpStatusCode.OK
462+
val sessionId = assertNotNull(initResponse.headers[MCP_SESSION_ID_HEADER])
463+
464+
// Step 2: Open GET SSE stream with session ID
465+
client.prepareGet(mcpPath) {
466+
header(HttpHeaders.Accept, ContentType.Text.EventStream.toString())
467+
header(MCP_SESSION_ID_HEADER, sessionId)
468+
header("mcp-protocol-version", LATEST_PROTOCOL_VERSION)
469+
}.execute { response ->
470+
// Verify Mcp-Session-Id is present on the SSE response
471+
response.status shouldBe HttpStatusCode.OK
472+
response.headers[MCP_SESSION_ID_HEADER] shouldBe sessionId
473+
474+
// Verify the stream is alive by reading at least one line (flush event)
475+
val channel = response.bodyAsChannel()
476+
val firstLine = channel.readUTF8Line()
477+
firstLine.shouldNotBeNull()
478+
channel.isClosedForRead shouldBe false
479+
}
480+
}
481+
436482
private fun ApplicationTestBuilder.configureTransportEndpoint(transport: StreamableHttpServerTransport) {
437483
application {
438484
routing {

0 commit comments

Comments
 (0)