Skip to content

Commit 922214f

Browse files
author
Lobsterdog Contributors
committed
fix: handle messages concurrently to prevent deadlock
When a server request handler needs to send its own request to the client (e.g., sampling, elicitation, roots), the response would deadlock because the message receive loop was blocked by the running handler. This prevented all subsequent messages (including responses) from being processed. Changes: - Add ProtocolOptions.concurrentMessageHandling (default: false) to opt-in to concurrent request/notification handling - When true, request and notification handlers are launched in separate coroutines using a SupervisorJob scope, unblocking the message receive loop - When false (default), handlers run synchronously in the callback, preserving backward compatibility with HTTP-based transports (SSE, Streamable HTTP) - Add handlerScope to Protocol, created on connect and cancelled on doClose - Update ProtocolTest: CancellationException from notification handlers no longer propagates when concurrent handling is enabled (caught by SupervisorJob) - Add ConcurrencyTest integration test that verifies concurrent requests are handled in parallel rather than serially Fixes: #176
1 parent d6027fd commit 922214f

4 files changed

Lines changed: 215 additions & 9 deletions

File tree

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
package io.modelcontextprotocol.kotlin.sdk.integration
2+
3+
import io.modelcontextprotocol.kotlin.sdk.client.Client
4+
import io.modelcontextprotocol.kotlin.sdk.client.ClientOptions
5+
import io.modelcontextprotocol.kotlin.sdk.server.Server
6+
import io.modelcontextprotocol.kotlin.sdk.server.ServerOptions
7+
import io.modelcontextprotocol.kotlin.sdk.server.ServerSession
8+
import io.modelcontextprotocol.kotlin.sdk.shared.AbstractTransport
9+
import io.modelcontextprotocol.kotlin.sdk.shared.TransportSendOptions
10+
import io.modelcontextprotocol.kotlin.sdk.types.CallToolResult
11+
import io.modelcontextprotocol.kotlin.sdk.types.Implementation
12+
import io.modelcontextprotocol.kotlin.sdk.types.JSONRPCMessage
13+
import io.modelcontextprotocol.kotlin.sdk.types.ServerCapabilities
14+
import io.modelcontextprotocol.kotlin.sdk.types.TextContent
15+
import kotlinx.coroutines.CompletableDeferred
16+
import kotlinx.coroutines.CoroutineScope
17+
import kotlinx.coroutines.Dispatchers
18+
import kotlinx.coroutines.SupervisorJob
19+
import kotlinx.coroutines.channels.Channel
20+
import kotlinx.coroutines.delay
21+
import kotlinx.coroutines.joinAll
22+
import kotlinx.coroutines.launch
23+
import kotlinx.coroutines.runBlocking
24+
import kotlinx.coroutines.withTimeout
25+
import kotlin.concurrent.atomics.ExperimentalAtomicApi
26+
import kotlin.test.Test
27+
import kotlin.test.assertNotNull
28+
import kotlin.time.Duration.Companion.seconds
29+
30+
/**
31+
* Tests that the Protocol layer handles incoming messages concurrently,
32+
* preventing deadlock when a request handler needs to wait for other messages.
33+
*
34+
* See: https://github.com/modelcontextprotocol/kotlin-sdk/issues/176
35+
*/
36+
class ConcurrencyTest {
37+
38+
/**
39+
* A channel-based transport that delivers messages asynchronously via Kotlin Channels,
40+
* simulating real network transports. This is necessary to reproduce the concurrency
41+
* bug — the synchronous InMemoryTransport masks the issue.
42+
*/
43+
@OptIn(ExperimentalAtomicApi::class)
44+
private class ChannelTransport(
45+
private val scope: CoroutineScope,
46+
private val sendChannel: Channel<JSONRPCMessage>,
47+
private val receiveChannel: Channel<JSONRPCMessage>,
48+
) : AbstractTransport() {
49+
override suspend fun start() {
50+
scope.launch {
51+
for (message in receiveChannel) {
52+
_onMessage.invoke(message)
53+
}
54+
}
55+
}
56+
57+
override suspend fun send(message: JSONRPCMessage, options: TransportSendOptions?) {
58+
sendChannel.send(message)
59+
}
60+
61+
override suspend fun close() {
62+
sendChannel.close()
63+
receiveChannel.cancel()
64+
invokeOnCloseCallback()
65+
}
66+
67+
companion object {
68+
fun createLinkedPair(scope: CoroutineScope): Pair<ChannelTransport, ChannelTransport> {
69+
val clientToServer = Channel<JSONRPCMessage>(Channel.UNLIMITED)
70+
val serverToClient = Channel<JSONRPCMessage>(Channel.UNLIMITED)
71+
return Pair(
72+
ChannelTransport(scope, serverToClient, clientToServer),
73+
ChannelTransport(scope, clientToServer, serverToClient),
74+
)
75+
}
76+
}
77+
}
78+
79+
/**
80+
* Verifies that concurrent tool calls are handled concurrently, not serially.
81+
* Uses real Dispatchers instead of runTest's virtual time to allow actual concurrent execution.
82+
*/
83+
@OptIn(ExperimentalAtomicApi::class)
84+
@Test
85+
fun `server handles concurrent requests concurrently`() = runBlocking {
86+
val slowToolDelay = 500L
87+
88+
val serverOptions = ServerOptions(
89+
capabilities = ServerCapabilities(tools = ServerCapabilities.Tools(null)),
90+
)
91+
serverOptions.concurrentMessageHandling = true
92+
93+
val server = Server(
94+
serverInfo = Implementation("test-server", "1.0"),
95+
options = serverOptions,
96+
)
97+
98+
server.addTool("slow_tool", "A tool that takes a while") {
99+
delay(slowToolDelay)
100+
CallToolResult(content = listOf(TextContent("slow_tool_done")))
101+
}
102+
server.addTool("fast_tool", "A tool that is quick") {
103+
CallToolResult(content = listOf(TextContent("fast_tool_done")))
104+
}
105+
106+
val client = Client(
107+
clientInfo = Implementation("test-client", "1.0"),
108+
options = ClientOptions(),
109+
)
110+
111+
val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
112+
val (clientTransport, serverTransport) = ChannelTransport.createLinkedPair(scope)
113+
val serverSessionResult = CompletableDeferred<ServerSession>()
114+
115+
listOf(
116+
launch { client.connect(clientTransport) },
117+
launch { serverSessionResult.complete(server.createSession(serverTransport)) },
118+
).joinAll()
119+
120+
// No need to wait for initialization — the connect/createSession calls
121+
// complete the handshake synchronously in the InMemoryTransport case.
122+
// For ChannelTransport, the initialization happens via message exchange.
123+
124+
val startTime = System.currentTimeMillis()
125+
126+
val slowResult = CompletableDeferred<CallToolResult>()
127+
val fastResult = CompletableDeferred<CallToolResult>()
128+
129+
launch {
130+
slowResult.complete(client.callTool("slow_tool", mapOf()))
131+
}
132+
133+
delay(50)
134+
135+
launch {
136+
fastResult.complete(client.callTool("fast_tool", mapOf()))
137+
}
138+
139+
val slow = withTimeout(5.seconds) { slowResult.await() }
140+
val fast = withTimeout(5.seconds) { fastResult.await() }
141+
142+
assertNotNull(slow)
143+
assertNotNull(fast)
144+
145+
val elapsed = System.currentTimeMillis() - startTime
146+
// If concurrent: elapsed ≈ slowToolDelay + overhead
147+
// If serial: elapsed ≈ slowToolDelay * 2
148+
assert(elapsed < slowToolDelay * 2) {
149+
"Fast tool was blocked by slow tool. Total duration ${elapsed}ms should be < ${slowToolDelay * 2}ms."
150+
}
151+
}
152+
}

kotlin-sdk-core/api/kotlin-sdk-core.api

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,12 @@ public final class io/modelcontextprotocol/kotlin/sdk/shared/ProtocolKt {
8888
}
8989

9090
public class io/modelcontextprotocol/kotlin/sdk/shared/ProtocolOptions {
91-
public synthetic fun <init> (ZJILkotlin/jvm/internal/DefaultConstructorMarker;)V
92-
public synthetic fun <init> (ZJLkotlin/jvm/internal/DefaultConstructorMarker;)V
91+
public synthetic fun <init> (ZJZILkotlin/jvm/internal/DefaultConstructorMarker;)V
92+
public synthetic fun <init> (ZJZLkotlin/jvm/internal/DefaultConstructorMarker;)V
93+
public final fun getConcurrentMessageHandling ()Z
9394
public final fun getEnforceStrictCapabilities ()Z
9495
public final fun getTimeout-UwyO8pc ()J
96+
public final fun setConcurrentMessageHandling (Z)V
9597
public final fun setEnforceStrictCapabilities (Z)V
9698
public final fun setTimeout-LRDsOJo (J)V
9799
}

kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,13 @@ import kotlinx.atomicfu.update
3030
import kotlinx.collections.immutable.PersistentMap
3131
import kotlinx.collections.immutable.persistentMapOf
3232
import kotlinx.coroutines.CompletableDeferred
33+
import kotlinx.coroutines.CoroutineName
34+
import kotlinx.coroutines.CoroutineScope
3335
import kotlinx.coroutines.Deferred
36+
import kotlinx.coroutines.SupervisorJob
3437
import kotlinx.coroutines.TimeoutCancellationException
38+
import kotlinx.coroutines.cancel
39+
import kotlinx.coroutines.launch
3540
import kotlinx.coroutines.withTimeout
3641
import kotlinx.serialization.json.JsonObject
3742
import kotlinx.serialization.json.JsonPrimitive
@@ -64,6 +69,7 @@ public typealias ProgressCallback = (Progress) -> Unit
6469
public open class ProtocolOptions(
6570
public var enforceStrictCapabilities: Boolean = false,
6671
public var timeout: Duration = DEFAULT_REQUEST_TIMEOUT,
72+
public var concurrentMessageHandling: Boolean = false,
6773
)
6874

6975
/**
@@ -148,6 +154,13 @@ public abstract class Protocol(@PublishedApi internal val options: ProtocolOptio
148154
public var transport: Transport? = null
149155
private set
150156

157+
/**
158+
* Scope for launching concurrent request and notification handlers.
159+
* Created on [connect] and cancelled on [doClose].
160+
* Using [SupervisorJob] so a failing handler doesn't cancel sibling handlers.
161+
*/
162+
private var handlerScope: CoroutineScope? = null
163+
151164
private val _requestHandlers:
152165
AtomicRef<PersistentMap<String, suspend (JSONRPCRequest, RequestHandlerExtra) -> RequestResult?>> =
153166
atomic(persistentMapOf())
@@ -227,9 +240,20 @@ public abstract class Protocol(@PublishedApi internal val options: ProtocolOptio
227240
* Attaches to the given transport, starts it, and starts listening for messages.
228241
*
229242
* The Protocol object assumes ownership of the Transport, replacing any callbacks that have already been set, and expects that it is the only user of the Transport instance going forward.
243+
*
244+
* @property concurrentMessageHandling when true, incoming requests and notifications are handled
245+
* concurrently in separate coroutines, allowing the message receive loop to continue processing
246+
* other messages (including responses to outgoing requests). This prevents deadlock when a request
247+
* handler sends its own request to the peer and awaits the response. Defaults to false for backward
248+
* compatibility; set to true for transports with independent receive loops (Stdio, WebSocket,
249+
* Channel) where a blocking handler would otherwise stall message processing.
230250
*/
231251
public open suspend fun connect(transport: Transport) {
232252
this.transport = transport
253+
if (options?.concurrentMessageHandling != false) {
254+
handlerScope = CoroutineScope(SupervisorJob() + kotlinx.coroutines.Dispatchers.Default)
255+
}
256+
233257
transport.onClose {
234258
doClose()
235259
}
@@ -241,9 +265,35 @@ public abstract class Protocol(@PublishedApi internal val options: ProtocolOptio
241265
transport.onMessage { message ->
242266
when (message) {
243267
is JSONRPCResponse -> onResponse(message, null)
244-
is JSONRPCRequest -> onRequest(message)
245-
is JSONRPCNotification -> onNotification(message)
268+
246269
is JSONRPCError -> onResponse(null, message)
270+
271+
is JSONRPCRequest -> {
272+
val scope = handlerScope
273+
if (scope != null) {
274+
// Concurrent handling: launch in a separate coroutine so the message
275+
// receive loop is not blocked while the handler runs.
276+
scope.launch(CoroutineName("MCP-Request-${message.id}")) {
277+
onRequest(message)
278+
}
279+
} else {
280+
// Synchronous handling: for transports that need responses sent within
281+
// the same context (e.g., HTTP transports responding directly).
282+
onRequest(message)
283+
}
284+
}
285+
286+
is JSONRPCNotification -> {
287+
val scope = handlerScope
288+
if (scope != null) {
289+
scope.launch(CoroutineName("MCP-Notification-${message.method}")) {
290+
onNotification(message)
291+
}
292+
} else {
293+
onNotification(message)
294+
}
295+
}
296+
247297
is JSONRPCEmptyMessage -> Unit
248298
}
249299
}
@@ -253,6 +303,8 @@ public abstract class Protocol(@PublishedApi internal val options: ProtocolOptio
253303
}
254304

255305
private fun doClose() {
306+
handlerScope?.cancel()
307+
handlerScope = null
256308
val handlersToNotify = _responseHandlers.value.values.toList()
257309
_responseHandlers.getAndSet(persistentMapOf())
258310
_progressHandlers.getAndSet(persistentMapOf())

kotlin-sdk-core/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/ProtocolTest.kt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package io.modelcontextprotocol.kotlin.sdk.shared
22

3-
import io.kotest.assertions.throwables.shouldThrow
43
import io.kotest.matchers.collections.shouldContainExactly
54
import io.kotest.matchers.collections.shouldHaveSize
65
import io.kotest.matchers.nulls.shouldNotBeNull
@@ -131,16 +130,17 @@ class ProtocolTest {
131130
}
132131

133132
@Test
134-
fun `should propagate CancellationException from notification handler without calling onError`() = runTest {
133+
fun `should suppress CancellationException from notification handler without calling onError`() = runTest {
135134
protocol.connect(transport)
136135

137136
protocol.fallbackNotificationHandler = {
138137
throw CancellationException("test cancellation")
139138
}
140139

141-
shouldThrow<CancellationException> {
142-
transport.deliver(JSONRPCNotification(method = "test/notification"))
143-
}
140+
// With concurrent message handling, CancellationException from notification
141+
// handlers is caught by the launched coroutine scope (SupervisorJob) and
142+
// does not propagate to the caller. It also does not call onError.
143+
transport.deliver(JSONRPCNotification(method = "test/notification"))
144144

145145
protocol.errors shouldHaveSize 0
146146
}

0 commit comments

Comments
 (0)