11package io.modelcontextprotocol.kotlin.sdk.client
22
3+ import io.kotest.assertions.nondeterministic.eventually
34import io.kotest.matchers.collections.shouldContain
45import io.kotest.matchers.shouldBe
56import io.ktor.http.ContentType
@@ -9,17 +10,22 @@ import io.ktor.sse.ServerSentEvent
910import io.modelcontextprotocol.kotlin.sdk.types.ClientCapabilities
1011import io.modelcontextprotocol.kotlin.sdk.types.EmptyJsonObject
1112import io.modelcontextprotocol.kotlin.sdk.types.Implementation
13+ import io.modelcontextprotocol.kotlin.sdk.types.Method
14+ import io.modelcontextprotocol.kotlin.sdk.types.ProgressNotification
15+ import io.modelcontextprotocol.kotlin.sdk.types.ProgressToken
1216import io.modelcontextprotocol.kotlin.sdk.types.Tool
1317import io.modelcontextprotocol.kotlin.sdk.types.ToolSchema
18+ import kotlinx.coroutines.CompletableDeferred
19+ import kotlinx.coroutines.awaitCancellation
1420import kotlinx.coroutines.delay
1521import kotlinx.coroutines.flow.emptyFlow
1622import kotlinx.coroutines.flow.flow
17- import kotlinx.coroutines.flow.flowOf
1823import kotlinx.coroutines.runBlocking
1924import kotlinx.serialization.json.buildJsonObject
2025import kotlinx.serialization.json.put
2126import kotlinx.serialization.json.putJsonObject
2227import org.junit.jupiter.api.TestInstance
28+ import java.util.concurrent.CopyOnWriteArrayList
2329import kotlin.test.Test
2430import kotlin.time.Duration.Companion.milliseconds
2531import kotlin.time.Duration.Companion.seconds
@@ -46,28 +52,28 @@ internal class StreamableHttpClientTest : AbstractStreamableHttpClientTest() {
4652 mockMcp.onJSONRPCRequest(
4753 httpMethod = HttpMethod .Post ,
4854 jsonRpcMethod = " initialize" ,
49- ).respondsWithStream {
55+ ) respondsWithSseStream {
5056 headers + = MCP_SESSION_ID_HEADER to sessionId
51- flow = flowOf(
52- " id: ${ Uuid .random()} \n " ,
53- " data: \n " , // empty data
54- " \n " ,
55- " id: ${ Uuid .random()} \n " ,
56- " data: \t \n " , // tabs and spaces
57- " \n " ,
58- " id: ${ Uuid .random()} \n " ,
59- " event: message \n " ,
60- // multiline data
61- " data: { \n " ,
62- " data: \ " result\ " :{\n " +
63- " data: \ " protocolVersion\" : \ " 2025-06-18\" , \n " +
64- " data: \ " capabilities\ " :{},\n " +
65- " data: \ " serverInfo\ " :{\ " name\" : \ " simple-streamable-http-server\" , \ " version\" : \ " 1.0.0\" } \n " +
66- " data: }, \n " +
67- " data: \" jsonrpc\" : \ " 2.0\" , \n " +
68- " data: \" id \" : \ " 7ce065b0678f49e5b04ce5a0fcc7d518\"\n " +
69- " data: } \n " ,
70- " \n " ,
57+ // empty data — should be skipped by client
58+ chunks + = ServerSentEvent (data = " " , id = Uuid .random().toString())
59+ // whitespace-only data — should be skipped by client
60+ chunks + = ServerSentEvent (data = " \t " , id = Uuid .random().toString())
61+ // valid initialize response with multiline JSON
62+ @Suppress( " MaxLineLength " )
63+ chunks + = ServerSentEvent (
64+ event = " message " ,
65+ id = Uuid .random().toString() ,
66+ // language=json
67+ data = """ {
68+ | "result":{
69+ | "protocolVersion": "2025-06-18",
70+ | "capabilities":{},
71+ | "serverInfo":{"name": "simple-streamable-http-server", "version": "1.0.0"}
72+ |},
73+ |" jsonrpc": "2.0",
74+ |"id": "7ce065b0678f49e5b04ce5a0fcc7d518"
75+ |}
76+ | """ .trimMargin() ,
7177 )
7278 }
7379
@@ -111,30 +117,30 @@ internal class StreamableHttpClientTest : AbstractStreamableHttpClientTest() {
111117 statusCode = HttpStatusCode .Accepted ,
112118 )
113119
120+ @Suppress(" MaxLineLength" )
114121 mockMcp.handleSubscribeWithGet(sessionId) {
115122 flow {
116- delay(500 .milliseconds)
117- emit(
118- ServerSentEvent (
119- event = " message" ,
120- id = " 1" ,
121- data = @Suppress(" MaxLineLength" )
122- // language=json
123- """ {"jsonrpc":"2.0","method":"notifications/progress","params":{"progressToken":"upload-123","progress":50,"total":100}}""" ,
124- ),
125- )
126- delay(200 .milliseconds)
127- emit(
128- ServerSentEvent (
129- data = @Suppress(" MaxLineLength" )
130- // language=json
131- """ {"jsonrpc":"2.0","method":"notifications/progress","params":{"progressToken":"upload-123","progress":50,"total":100}}""" ,
132- ),
133- )
123+ for (i in 0 .. 10 ) {
124+ delay(100 .milliseconds)
125+ emit(
126+ ServerSentEvent (
127+ event = " message" ,
128+ id = " 1" ,
129+ data =
130+ // language=json
131+ """ {"jsonrpc":"2.0","method":"notifications/progress","params":{"progressToken":"upload-123","progress":${i * 10 } ,"total":100}}""" ,
132+ ),
133+ )
134+ }
135+ awaitCancellation()
134136 }
135137 }
136138
137- // TODO: how to get notifications via Client API?
139+ val receivedNotifications = CopyOnWriteArrayList <ProgressNotification >()
140+ client.setNotificationHandler<ProgressNotification >(Method .Defined .NotificationsProgress ) {
141+ receivedNotifications.add(it)
142+ CompletableDeferred (Unit )
143+ }
138144
139145 mockMcp.handleWithResult(
140146 jsonRpcMethod = " tools/list" ,
@@ -176,6 +182,16 @@ internal class StreamableHttpClientTest : AbstractStreamableHttpClientTest() {
176182
177183 connect(client)
178184
185+ eventually(5 .seconds) {
186+ receivedNotifications.size shouldBe 11 // 0..100 with step 10
187+
188+ receivedNotifications.forEachIndexed { index, notification ->
189+ notification.params.progressToken shouldBe ProgressToken (" upload-123" )
190+ notification.params.progress shouldBe index * 10.0
191+ notification.params.total shouldBe 100.0
192+ }
193+ }
194+
179195 val listToolsResult = client.listTools()
180196
181197 listToolsResult.tools shouldContain Tool (
0 commit comments