Skip to content

messages are not handled asynchronously #176

Description

@phemmer

Describe the bug
The server (and probably client, but didn't test) do not handle messages concurrently. Any time handling of an incoming message is blocked, all incoming messages are blocked. No matter what that message is (a request, a response, or a notification).

In my case, I am supporting roots. In order to provide the correct list of tools, I need to request the roots from the client. If a tools/list or tool call comes in before I have obtained the roots, I have to block handling of the call until I have obtained roots. However as soon as I block a call, I can't receive the client's response to roots/list.
While that's the original scenario that prompted me to chase down the issue, the core issue seems to apply to all input handling. Any time the handling of any message from the client is blocked, all message handling from the client is blocked.

To Reproduce
This is a test which reproduces the issue with 2 concurrent tool calls. A "slow" tool and a "fast" tool. In the test you can see that the slow tool blocks the fast tool from being able to complete. It's simpler than my "roots" scenario I listed above, but the underlying cause seems to be the same.

package server

import io.github.oshai.kotlinlogging.KotlinLogging
import io.modelcontextprotocol.kotlin.sdk.CallToolResult
import io.modelcontextprotocol.kotlin.sdk.ClientCapabilities
import io.modelcontextprotocol.kotlin.sdk.Implementation
import io.modelcontextprotocol.kotlin.sdk.JSONRPCMessage
import io.modelcontextprotocol.kotlin.sdk.ServerCapabilities
import io.modelcontextprotocol.kotlin.sdk.TextContent
import io.modelcontextprotocol.kotlin.sdk.client.Client
import io.modelcontextprotocol.kotlin.sdk.client.ClientOptions
import io.modelcontextprotocol.kotlin.sdk.server.Server
import io.modelcontextprotocol.kotlin.sdk.server.ServerOptions
import io.modelcontextprotocol.kotlin.sdk.shared.AbstractTransport
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.currentTime
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.withTimeout
import org.junit.jupiter.api.Test
import kotlin.test.assertNotNull
import kotlin.test.assertTrue

private val logger = KotlinLogging.logger {}

/**
 * ChannelBasedInMemoryTransport is a transport that uses a Channel to simulate the asynchronous nature of a real
 * network socket.
 * The normal InMemoryTransport is not susceptible to the issue because it uses the client's coroutine for server-side
 * processing of the request. The ChannelBasedInMemoryTransport, as well as StdioServerTransport and likely any other
 * real transport, are susceptible to the issue as client and server are in separate execution contexts.
 */
private class ChannelBasedInMemoryTransport(private val scope: CoroutineScope, private val name: String) :
    AbstractTransport() {
    private lateinit var otherTransport: ChannelBasedInMemoryTransport
    private val incomingChannel = Channel<JSONRPCMessage>(Channel.UNLIMITED)

    companion object {
        fun createLinkedPair(scope: CoroutineScope): Pair<ChannelBasedInMemoryTransport, ChannelBasedInMemoryTransport> {
            val clientTransport = ChannelBasedInMemoryTransport(scope, "ClientTransport")
            val serverTransport = ChannelBasedInMemoryTransport(scope, "ServerTransport")
            clientTransport.otherTransport = serverTransport
            serverTransport.otherTransport = clientTransport
            return Pair(clientTransport, serverTransport)
        }
    }

    override suspend fun start() {
        scope.launch {
            for (message in incomingChannel) {
                logger.debug { "[$name] RECV: $message" }
                _onMessage(message)
            }
        }
    }

    override suspend fun close() {
        if (!incomingChannel.isClosedForReceive) {
            incomingChannel.close()
            otherTransport.incomingChannel.close()
            _onClose()
        }
    }

    override suspend fun send(message: JSONRPCMessage) {
        logger.debug { "[$name] SEND: $message" }
        if (otherTransport.incomingChannel.isClosedForSend) {
            throw IllegalStateException("Not connected")
        }
        otherTransport.incomingChannel.send(message)
    }
}

class ConcurrencyTest {

    @Test
    fun `server blocks concurrent requests due to serial processing`() = runTest {
        val slowToolDelay = 1000L
        // The test timeout must be longer than the slow tool's delay.
        withTimeout(slowToolDelay + 500) {
            // Arrange: Server with tools capability
            val server = Server(
                serverInfo = Implementation("test-server", "1.0"),
                options = ServerOptions(capabilities = ServerCapabilities(tools = ServerCapabilities.Tools(null)))
            )

            // Arrange: Add a "slow" tool and a "fast" tool using the convenient overload
            server.addTool("slow_tool", "A tool that takes a while") {
                logger.info { "[Server] 'slow_tool' handler started." }
                delay(slowToolDelay)
                logger.info { "[Server] 'slow_tool' handler finished." }
                CallToolResult(content = listOf(TextContent("slow_tool_done")))
            }
            server.addTool("fast_tool", "A tool that is quick") {
                logger.info { "[Server] 'fast_tool' handler executed." }
                CallToolResult(content = listOf(TextContent("fast_tool_done")))
            }

            // Arrange: Client
            val client = Client(
                clientInfo = Implementation("test-client", "1.0"),
                options = ClientOptions()
            )

            // Arrange: Transport and connection
            val (clientTransport, serverTransport) = ChannelBasedInMemoryTransport.createLinkedPair(this)
            val serverInitialized = CompletableDeferred<Unit>()
            server.onInitialized { serverInitialized.complete(Unit) }

            launch { server.connect(serverTransport) }
            launch { client.connect(clientTransport) }
            serverInitialized.await()

            // Act: Launch two concurrent tool calls. The slow one first, then the fast one.
            val startTime = currentTime

            val slowJob = launch {
                logger.info { "[Client] Calling 'slow_tool'..." }
                val result = client.callTool("slow_tool", emptyMap())
                assertNotNull(result)
            }
            // Give the first request a moment to be sent and start processing on the server
            delay(50)

            val fastJob = launch {
                logger.info { "[Client] Calling 'fast_tool'..." }
                val result = client.callTool("fast_tool", emptyMap())
                assertNotNull(result)
            }

            // Wait for both to complete
            slowJob.join()
            fastJob.join()

            val duration = currentTime - startTime

            // Assert: The total time should be greater than the slow tool's delay,
            // proving the fast call was blocked by the slow one. If they ran in parallel,
            // the total duration would be only slightly more than the slow tool's delay.
            // Because they run serially, the duration is > slow + fast + overhead.
            logger.info { "Total execution time: ${duration}ms" }
            assertTrue(
                duration >= slowToolDelay,
                "Fast tool was blocked by slow tool. Total duration (${duration}ms) should be >= slow tool delay (${slowToolDelay}ms)."
            )
        }
    }
}

Expected behavior
The server should be able to handle concurrent message processing. Able to handle an incoming request or response from the client while there is an outstanding response to the client.

Logs
The above test fails with:

[Test worker @kotlinx.coroutines.test runner#2] INFO io.modelcontextprotocol.kotlin.sdk.server.Server - Registering tool: slow_tool
[Test worker @kotlinx.coroutines.test runner#2] INFO io.modelcontextprotocol.kotlin.sdk.server.Server - Registering tool: fast_tool
[Test worker @coroutine#5] INFO io.modelcontextprotocol.kotlin.sdk.server.Server - Handling initialize request from client Implementation(name=test-client, version=1.0)
[Test worker @coroutine#7] INFO server.ConcurrencyTest - [Client] Calling 'slow_tool'...
[Test worker @coroutine#5] INFO server.ConcurrencyTest - [Server] 'slow_tool' handler started.
[Test worker @coroutine#8] INFO server.ConcurrencyTest - [Client] Calling 'fast_tool'...
[Test worker @coroutine#5] INFO server.ConcurrencyTest - [Server] 'slow_tool' handler finished.
[Test worker @coroutine#5] INFO server.ConcurrencyTest - [Server] 'fast_tool' handler executed.
[Test worker @kotlinx.coroutines.test runner#2] INFO server.ConcurrencyTest - Total execution time: 1000ms

Timed out after 1.5s of _virtual_ (kotlinx.coroutines.test) time. To use the real time, wrap 'withTimeout' in 'withContext(Dispatchers.Default.limitedParallelism(1))'
kotlinx.coroutines.TimeoutCancellationException: Timed out after 1.5s of _virtual_ (kotlinx.coroutines.test) time. To use the real time, wrap 'withTimeout' in 'withContext(Dispatchers.Default.limitedParallelism(1))'
	at kotlinx.coroutines.TimeoutKt.TimeoutCancellationException(Timeout.kt:189)
	at kotlinx.coroutines.TimeoutCoroutine.run(Timeout.kt:157)
	at kotlinx.coroutines.test.TestDispatcher.processEvent$kotlinx_coroutines_test(TestDispatcher.kt:24)
	at kotlinx.coroutines.test.TestCoroutineScheduler.tryRunNextTaskUnless$kotlinx_coroutines_test(TestCoroutineScheduler.kt:99)
	at kotlinx.coroutines.test.TestBuildersKt__TestBuildersKt$runTest$2$1$workRunner$1.invokeSuspend(TestBuilders.kt:326)
	at _COROUTINE._BOUNDARY._(CoroutineDebugging.kt:42)
	at server.ConcurrencyTest$server blocks concurrent requests due to serial processing$1.invokeSuspend(ConcurrencyTest.kt:83)
	at kotlinx.coroutines.test.TestBuildersKt__TestBuildersKt$runTest$2$1$1.invokeSuspend(TestBuilders.kt:317)
Caused by: kotlinx.coroutines.TimeoutCancellationException: Timed out after 1.5s of _virtual_ (kotlinx.coroutines.test) time. To use the real time, wrap 'withTimeout' in 'withContext(Dispatchers.Default.limitedParallelism(1))'
	at kotlinx.coroutines.TimeoutKt.TimeoutCancellationException(Timeout.kt:189)
	at kotlinx.coroutines.TimeoutCoroutine.run(Timeout.kt:157)
	at kotlinx.coroutines.test.TestDispatcher.processEvent$kotlinx_coroutines_test(TestDispatcher.kt:24)
	at kotlinx.coroutines.test.TestCoroutineScheduler.tryRunNextTaskUnless$kotlinx_coroutines_test(TestCoroutineScheduler.kt:99)
	at kotlinx.coroutines.test.TestBuildersKt__TestBuildersKt$runTest$2$1$workRunner$1.invokeSuspend(TestBuilders.kt:326)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:100)
	at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:263)

Additional context
reproduced with 5347a7a

Metadata

Metadata

Assignees

No one assigned

    Labels

    P1Significant bug affecting many usersbugSomething isn't workingready for workHas enough information to start

    Type

    No fields configured for Bug.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions