diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 78f55fe46..13cabf4e9 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -21,7 +21,7 @@ kotlinx-io = "0.9.0" ktor = "3.3.3" logging = "8.0.01" mockk = "1.14.9" -mokksy = "0.8.1" +mokksy = "0.9.1" serialization = "1.10.0" slf4j = "2.0.17" logback = "1.5.32" diff --git a/kotlin-sdk-client/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/MockMcp.kt b/kotlin-sdk-client/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/MockMcp.kt index fb719041c..512dc878d 100644 --- a/kotlin-sdk-client/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/MockMcp.kt +++ b/kotlin-sdk-client/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/MockMcp.kt @@ -25,7 +25,7 @@ const val MCP_SESSION_ID_HEADER = "MCP-Session-Id" internal class MockMcp(verbose: Boolean = false) : AutoCloseable { - private val mokksy: Mokksy = Mokksy(verbose = verbose).apply { + private val mokksy = Mokksy(verbose = verbose).apply { start() } @@ -90,7 +90,7 @@ internal class MockMcp(verbose: Boolean = false) : AutoCloseable { expectedSessionId: String? = null, vararg bodyPredicates: (JSONRPCRequest) -> Boolean, ): BuildingStep = mokksy.method( - configuration = StubConfiguration(removeAfterMatch = true), + configuration = StubConfiguration.once(), httpMethod = httpMethod, requestType = JSONRPCRequest::class, ) { @@ -219,20 +219,18 @@ internal class MockMcp(verbose: Boolean = false) : AutoCloseable { sessionId = sessionId, ) respondsWithSseStream { headers += MCP_SESSION_ID_HEADER to sessionId - this.flow = block.invoke() + flow = block.invoke() } } fun mockUnsubscribeRequest(sessionId: String) { mokksy.delete( - configuration = StubConfiguration(removeAfterMatch = true), + configuration = StubConfiguration.once(), requestType = JSONRPCRequest::class, ) { path("/mcp") containsHeader(MCP_SESSION_ID_HEADER, sessionId) - } respondsWith { - body = null - } + } respondsWithStatus HttpStatusCode.OK } override fun close() { diff --git a/kotlin-sdk-client/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTest.kt b/kotlin-sdk-client/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTest.kt index a1fe89e66..8cb96aa1c 100644 --- a/kotlin-sdk-client/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTest.kt +++ b/kotlin-sdk-client/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTest.kt @@ -1,5 +1,6 @@ package io.modelcontextprotocol.kotlin.sdk.client +import io.kotest.assertions.nondeterministic.eventually import io.kotest.matchers.collections.shouldContain import io.kotest.matchers.shouldBe import io.ktor.http.ContentType @@ -9,17 +10,22 @@ import io.ktor.sse.ServerSentEvent import io.modelcontextprotocol.kotlin.sdk.types.ClientCapabilities import io.modelcontextprotocol.kotlin.sdk.types.EmptyJsonObject import io.modelcontextprotocol.kotlin.sdk.types.Implementation +import io.modelcontextprotocol.kotlin.sdk.types.Method +import io.modelcontextprotocol.kotlin.sdk.types.ProgressNotification +import io.modelcontextprotocol.kotlin.sdk.types.ProgressToken import io.modelcontextprotocol.kotlin.sdk.types.Tool import io.modelcontextprotocol.kotlin.sdk.types.ToolSchema +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.awaitCancellation import kotlinx.coroutines.delay import kotlinx.coroutines.flow.emptyFlow import kotlinx.coroutines.flow.flow -import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.runBlocking import kotlinx.serialization.json.buildJsonObject import kotlinx.serialization.json.put import kotlinx.serialization.json.putJsonObject import org.junit.jupiter.api.TestInstance +import java.util.concurrent.CopyOnWriteArrayList import kotlin.test.Test import kotlin.time.Duration.Companion.milliseconds import kotlin.time.Duration.Companion.seconds @@ -46,28 +52,29 @@ internal class StreamableHttpClientTest : AbstractStreamableHttpClientTest() { mockMcp.onJSONRPCRequest( httpMethod = HttpMethod.Post, jsonRpcMethod = "initialize", - ).respondsWithStream { + ) respondsWithSseStream { headers += MCP_SESSION_ID_HEADER to sessionId - flow = flowOf( - "id: ${Uuid.random()}\n", - "data:\n", // empty data - "\n", - "id: ${Uuid.random()}\n", - "data: \t \n", // tabs and spaces - "\n", - "id: ${Uuid.random()}\n", - "event: message\n", - // multiline data - "data: {\n", - "data: \"result\":{\n" + - "data: \"protocolVersion\":\"2025-06-18\",\n" + - "data: \"capabilities\":{},\n" + - "data: \"serverInfo\":{\"name\":\"simple-streamable-http-server\",\"version\":\"1.0.0\"}\n" + - "data: },\n" + - "data: \"jsonrpc\":\"2.0\",\n" + - "data: \"id\":\"7ce065b0678f49e5b04ce5a0fcc7d518\"\n" + - "data: }\n", - "\n", + // empty data — should be skipped by client + chunks += ServerSentEvent(data = "", id = Uuid.random().toString()) + // whitespace-only data — should be skipped by client + chunks += ServerSentEvent(data = " \t ", id = Uuid.random().toString()) + // valid initialize response with multiline JSON + @Suppress("MaxLineLength") + chunks += ServerSentEvent( + event = "message", + id = Uuid.random().toString(), + //language=json + data = """{ + |"result":{ + | "protocolVersion":"2025-06-18", + | "capabilities":{}, + | "serverInfo":{"name":"simple-streamable-http-server","version":"1.0.0"} + |}, + |"jsonrpc":"2.0", + |"id":"7ce065b0678f49e5b04ce5a0fcc7d518" + |} + | + """.trimMargin(), ) } @@ -111,30 +118,30 @@ internal class StreamableHttpClientTest : AbstractStreamableHttpClientTest() { statusCode = HttpStatusCode.Accepted, ) + @Suppress("MaxLineLength") mockMcp.handleSubscribeWithGet(sessionId) { flow { - delay(500.milliseconds) - emit( - ServerSentEvent( - event = "message", - id = "1", - data = @Suppress("MaxLineLength") - //language=json - """{"jsonrpc":"2.0","method":"notifications/progress","params":{"progressToken":"upload-123","progress":50,"total":100}}""", - ), - ) - delay(200.milliseconds) - emit( - ServerSentEvent( - data = @Suppress("MaxLineLength") - //language=json - """{"jsonrpc":"2.0","method":"notifications/progress","params":{"progressToken":"upload-123","progress":50,"total":100}}""", - ), - ) + for (i in 0..10) { + delay(100.milliseconds) + emit( + ServerSentEvent( + event = "message", + id = "1", + //language=json + data = + """{"jsonrpc":"2.0","method":"notifications/progress","params":{"progressToken":"upload-123","progress":${i * 10},"total":100}}""", + ), + ) + } + awaitCancellation() } } - // TODO: how to get notifications via Client API? + val receivedNotifications = CopyOnWriteArrayList() + client.setNotificationHandler(Method.Defined.NotificationsProgress) { + receivedNotifications.add(it) + CompletableDeferred(Unit) + } mockMcp.handleWithResult( jsonRpcMethod = "tools/list", @@ -176,6 +183,16 @@ internal class StreamableHttpClientTest : AbstractStreamableHttpClientTest() { connect(client) + eventually(5.seconds) { + receivedNotifications.size shouldBe 11 // 0..100 with step 10 + + receivedNotifications.forEachIndexed { index, notification -> + notification.params.progressToken shouldBe ProgressToken("upload-123") + notification.params.progress shouldBe index * 10.0 + notification.params.total shouldBe 100.0 + } + } + val listToolsResult = client.listTools() listToolsResult.tools shouldContain Tool(