Skip to content

Commit d21728e

Browse files
authored
fix: bound inline SSE event size in Streamable HTTP client (#841)
Bound the size of a single inline SSE event parsed from a POST response in `StreamableHttpClientTransport`, so a non-conforming or unbounded server response cannot grow the client's buffer without limit. ## How Has This Been Tested? New unit tests ## Breaking Changes none ## Types of changes - [x] Bug fix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) - [ ] Documentation update ## Checklist - [x] I have read the [MCP Documentation](https://modelcontextprotocol.io) - [x] My code follows the repository's style guidelines - [x] New and existing tests pass locally - [x] I have added appropriate error handling - [x] I have added or updated documentation as needed
1 parent 75a944f commit d21728e

3 files changed

Lines changed: 144 additions & 8 deletions

File tree

kotlin-sdk-client/api/kotlin-sdk-client.api

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,8 @@ public final class io/modelcontextprotocol/kotlin/sdk/client/StdioClientTranspor
101101
}
102102

103103
public final class io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport : io/modelcontextprotocol/kotlin/sdk/shared/AbstractClientTransport {
104-
public fun <init> (Lio/ktor/client/HttpClient;Ljava/lang/String;Lio/modelcontextprotocol/kotlin/sdk/client/ReconnectionOptions;Lkotlin/jvm/functions/Function1;)V
105-
public synthetic fun <init> (Lio/ktor/client/HttpClient;Ljava/lang/String;Lio/modelcontextprotocol/kotlin/sdk/client/ReconnectionOptions;Lkotlin/jvm/functions/Function1;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
104+
public fun <init> (Lio/ktor/client/HttpClient;Ljava/lang/String;Lio/modelcontextprotocol/kotlin/sdk/client/ReconnectionOptions;ILkotlin/jvm/functions/Function1;)V
105+
public synthetic fun <init> (Lio/ktor/client/HttpClient;Ljava/lang/String;Lio/modelcontextprotocol/kotlin/sdk/client/ReconnectionOptions;ILkotlin/jvm/functions/Function1;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
106106
public synthetic fun <init> (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/time/Duration;Lkotlin/jvm/functions/Function1;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
107107
public synthetic fun <init> (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/time/Duration;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/internal/DefaultConstructorMarker;)V
108108
public final fun getProtocolVersion ()Ljava/lang/String;

kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport.kt

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@ import io.ktor.http.HttpMethod
2121
import io.ktor.http.HttpStatusCode
2222
import io.ktor.http.contentType
2323
import io.ktor.http.isSuccess
24+
import io.ktor.utils.io.charsets.TooLongLineException
2425
import io.ktor.utils.io.readUTF8Line
2526
import io.modelcontextprotocol.kotlin.sdk.shared.AbstractClientTransport
27+
import io.modelcontextprotocol.kotlin.sdk.shared.TooLongFrameException
2628
import io.modelcontextprotocol.kotlin.sdk.shared.TransportSendOptions
2729
import io.modelcontextprotocol.kotlin.sdk.types.JSONRPCMessage
2830
import io.modelcontextprotocol.kotlin.sdk.types.JSONRPCNotification
@@ -50,6 +52,14 @@ private const val MCP_SESSION_ID_HEADER = "mcp-session-id"
5052
private const val MCP_PROTOCOL_VERSION_HEADER = "mcp-protocol-version"
5153
private const val MCP_RESUMPTION_TOKEN_HEADER = "Last-Event-ID"
5254

55+
/**
56+
* Default maximum size, in characters, of a single inline SSE event assembled from a POST response.
57+
*
58+
* Mirrors the stdio transport's 16 MiB frame cap: a server that streams `data:` lines without ever
59+
* terminating the event cannot grow the client's buffer without bound.
60+
*/
61+
private const val DEFAULT_MAX_INLINE_SSE_EVENT_SIZE: Int = 16 * 1024 * 1024
62+
5363
/**
5464
* Represents an error from the Streamable HTTP transport.
5565
*
@@ -75,15 +85,23 @@ private sealed interface ConnectResult {
7585
* @param client Ktor HTTP client used for all requests
7686
* @param url MCP endpoint URL
7787
* @param reconnectionOptions reconnection backoff and retry-limit settings for the SSE stream
88+
* @param maxInlineSseEventSize maximum size, in characters, of a single inline SSE event parsed from a
89+
* POST response; a server that exceeds it (including by never terminating an event) fails the send
90+
* with [io.modelcontextprotocol.kotlin.sdk.shared.TooLongFrameException]. Defaults to 16 MiB.
7891
* @param requestBuilder builder applied to every outgoing HTTP request, e.g. for adding auth headers
7992
*/
8093
public class StreamableHttpClientTransport(
8194
private val client: HttpClient,
8295
private val url: String,
8396
private val reconnectionOptions: ReconnectionOptions = ReconnectionOptions(),
97+
private val maxInlineSseEventSize: Int = DEFAULT_MAX_INLINE_SSE_EVENT_SIZE,
8498
private val requestBuilder: HttpRequestBuilder.() -> Unit = {},
8599
) : AbstractClientTransport() {
86100

101+
init {
102+
require(maxInlineSseEventSize > 0) { "maxInlineSseEventSize must be greater than 0" }
103+
}
104+
87105
@Deprecated(
88106
"Use constructor with ReconnectionOptions",
89107
replaceWith = ReplaceWith(
@@ -98,7 +116,12 @@ public class StreamableHttpClientTransport(
98116
url: String,
99117
reconnectionTime: Duration?,
100118
requestBuilder: HttpRequestBuilder.() -> Unit = {},
101-
) : this(client, url, ReconnectionOptions(initialReconnectionDelay = reconnectionTime ?: 1.seconds), requestBuilder)
119+
) : this(
120+
client,
121+
url,
122+
ReconnectionOptions(initialReconnectionDelay = reconnectionTime ?: 1.seconds),
123+
requestBuilder = requestBuilder,
124+
)
102125

103126
override val logger: KLogger = KotlinLogging.logger {}
104127

@@ -458,7 +481,14 @@ public class StreamableHttpClientTransport(
458481
}
459482

460483
while (!channel.isClosedForRead) {
461-
val line = channel.readUTF8Line() ?: break
484+
// Bound each line so a server that streams a line without ever terminating it cannot
485+
// exhaust client memory; readUTF8Line returns null at the end of the stream.
486+
val line = try {
487+
channel.readUTF8Line(maxInlineSseEventSize)
488+
} catch (_: TooLongLineException) {
489+
throw TooLongFrameException(maxInlineSseEventSize.toLong() + 1, maxInlineSseEventSize)
490+
}
491+
if (line == null) break
462492
if (line.isEmpty()) {
463493
dispatch(id = id, eventName = eventName, data = sb.toString())
464494
// reset
@@ -472,7 +502,13 @@ public class StreamableHttpClientTransport(
472502

473503
line.startsWith("event:") -> eventName = line.substringAfter("event:").trim()
474504

475-
line.startsWith("data:") -> sb.append(line.substringAfter("data:").trim())
505+
line.startsWith("data:") -> {
506+
sb.append(line.substringAfter("data:").trim())
507+
// Cap an event assembled from many data: lines that never sees a terminating blank line.
508+
if (sb.length > maxInlineSseEventSize) {
509+
throw TooLongFrameException(sb.length.toLong(), maxInlineSseEventSize)
510+
}
511+
}
476512

477513
line.startsWith("retry:") -> line.substringAfter("retry:").trim().toLongOrNull()?.let {
478514
localServerRetryDelay = it.milliseconds

kotlin-sdk-client/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/streamable/http/StreamableHttpClientTransportTest.kt

Lines changed: 103 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import io.ktor.http.headersOf
1919
import io.ktor.utils.io.ByteReadChannel
2020
import io.modelcontextprotocol.kotlin.sdk.client.Client
2121
import io.modelcontextprotocol.kotlin.sdk.client.StreamableHttpClientTransport
22+
import io.modelcontextprotocol.kotlin.sdk.shared.TooLongFrameException
2223
import io.modelcontextprotocol.kotlin.sdk.types.Implementation
2324
import io.modelcontextprotocol.kotlin.sdk.types.JSONRPCMessage
2425
import io.modelcontextprotocol.kotlin.sdk.types.JSONRPCNotification
@@ -43,19 +44,27 @@ import kotlin.test.assertEquals
4344
import kotlin.test.assertFailsWith
4445
import kotlin.test.assertNull
4546
import kotlin.test.assertTrue
47+
import kotlin.time.Duration.Companion.milliseconds
4648
import kotlin.time.Duration.Companion.seconds
4749

4850
class StreamableHttpClientTransportTest {
4951

50-
private fun createTransport(handler: MockRequestHandler): StreamableHttpClientTransport {
52+
private fun createTransport(
53+
maxInlineSseEventSize: Int = 16 * 1024 * 1024,
54+
handler: MockRequestHandler,
55+
): StreamableHttpClientTransport {
5156
val mockEngine = MockEngine(handler)
5257
val httpClient = HttpClient(mockEngine) {
5358
install(SSE) {
5459
reconnectionTime = 1.seconds
5560
}
5661
}
5762

58-
return StreamableHttpClientTransport(httpClient, url = "http://localhost:8080/mcp")
63+
return StreamableHttpClientTransport(
64+
httpClient,
65+
url = "http://localhost:8080/mcp",
66+
maxInlineSseEventSize = maxInlineSseEventSize,
67+
)
5968
}
6069

6170
private fun buildSseMessage(id: String, method: String, params: String): String = buildString {
@@ -551,6 +560,97 @@ class StreamableHttpClientTransportTest {
551560
transport.close()
552561
}
553562

563+
@Test
564+
fun testInlineSseRejectsEventExceedingMaxSize() = runTest {
565+
// A malicious server streams an endless single event: many `data:` lines that accumulate
566+
// past the cap and never send the blank-line terminator that would flush the buffer.
567+
val maxInlineSseEventSize = 64
568+
val transport = createTransport(maxInlineSseEventSize) { request ->
569+
if (request.method == HttpMethod.Post) {
570+
val sseContent = buildString {
571+
appendLine("event: message")
572+
// 20 lines × 16 chars = 320 chars of accumulated data, no terminating blank line.
573+
repeat(20) { appendLine("data: ${"A".repeat(16)}") }
574+
}
575+
respond(
576+
content = ByteReadChannel(sseContent),
577+
status = HttpStatusCode.OK,
578+
headers = headersOf(HttpHeaders.ContentType, ContentType.Text.EventStream.toString()),
579+
)
580+
} else {
581+
respond("", HttpStatusCode.OK)
582+
}
583+
}
584+
585+
val receivedMessages = mutableListOf<JSONRPCMessage>()
586+
val receivedErrors = mutableListOf<Throwable>()
587+
transport.onMessage { receivedMessages.add(it) }
588+
transport.onError { receivedErrors.add(it) }
589+
transport.start()
590+
591+
val error = assertFailsWith<McpException> {
592+
transport.send(JSONRPCRequest(id = "req-1", method = "test", params = buildJsonObject { }))
593+
}
594+
595+
error.cause.shouldBeInstanceOf<TooLongFrameException>()
596+
receivedErrors.filterIsInstance<TooLongFrameException>() shouldHaveSize 1
597+
receivedMessages shouldHaveSize 0
598+
transport.close()
599+
}
600+
601+
@Test
602+
fun testInlineSseEventExactlyAtMaxSizeIsAccepted() = runTest {
603+
// An event whose assembled data length equals the cap must still be accepted and dispatched:
604+
// the guard rejects only sizes strictly greater than the cap (parity with ReadBuffer).
605+
val part1 = """{"jsonrpc":"2.0","""
606+
val part2 = """"method":"notifications/tools/list_changed"}"""
607+
val maxInlineSseEventSize = (part1 + part2).length
608+
609+
val transport = createTransport(maxInlineSseEventSize) { request ->
610+
if (request.method == HttpMethod.Post) {
611+
val sseContent = buildString {
612+
appendLine("event: message")
613+
appendLine("data: $part1")
614+
appendLine("data: $part2")
615+
appendLine()
616+
}
617+
respond(
618+
content = ByteReadChannel(sseContent),
619+
status = HttpStatusCode.OK,
620+
headers = headersOf(HttpHeaders.ContentType, ContentType.Text.EventStream.toString()),
621+
)
622+
} else {
623+
respond("", HttpStatusCode.OK)
624+
}
625+
}
626+
627+
val receivedMessages = mutableListOf<JSONRPCMessage>()
628+
val receivedErrors = mutableListOf<Throwable>()
629+
val messageReceived = CompletableDeferred<Unit>()
630+
transport.onMessage {
631+
receivedMessages.add(it)
632+
if (!messageReceived.isCompleted) messageReceived.complete(Unit)
633+
}
634+
transport.onError { receivedErrors.add(it) }
635+
transport.start()
636+
637+
transport.send(JSONRPCRequest(id = "req-1", method = "test", params = buildJsonObject { }))
638+
639+
eventually { messageReceived.await() }
640+
641+
receivedMessages shouldHaveSize 1
642+
(receivedMessages[0] as JSONRPCNotification).method shouldBe "notifications/tools/list_changed"
643+
receivedErrors shouldHaveSize 0
644+
transport.close()
645+
}
646+
647+
@Test
648+
fun testNonPositiveMaxInlineSseEventSizeThrows() {
649+
assertFailsWith<IllegalArgumentException> {
650+
createTransport(maxInlineSseEventSize = 0) { respond("", HttpStatusCode.OK) }
651+
}
652+
}
653+
554654
@Test
555655
fun testInlineSSEInResponse() = runTest {
556656
val transport = createTransport { request ->
@@ -716,7 +816,7 @@ class StreamableHttpClientTransportTest {
716816

717817
eventually {
718818
while (receivedMessages.isEmpty()) {
719-
delay(10)
819+
delay(10.milliseconds)
720820
}
721821
}
722822

0 commit comments

Comments
 (0)