Skip to content

Commit 12fee57

Browse files
committed
fix: concurrent message processing for all transports
1 parent c136ad9 commit 12fee57

35 files changed

Lines changed: 789 additions & 373 deletions

File tree

AGENTS.md

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ Follow these rules to keep changes safe, comprehensible, and easy to maintain.
7777
- **Prioritize test readability**
7878
- Avoid creating too many test methods; use parametrized tests when testing multiple similar scenarios
7979
- When running tests on Kotlin Multiplatform projects, run JVM tests only unless asked for other platforms
80+
- **Concurrency in Tests**: Always use thread-safe collections (e.g., `Mutex`-protected lists or `Channel`) when
81+
collecting messages from transports that process messages concurrently in the background (like those inheriting from
82+
`AbstractTransport`). Using non-thread-safe `MutableList` will lead to flaky tests or missing messages.
8083

8184
### Test Framework Stack
8285

@@ -93,9 +96,12 @@ Follow these rules to keep changes safe, comprehensible, and easy to maintain.
9396
- **Ktor MockEngine**: For HTTP client mocking (`io.ktor:ktor-client-mock`)
9497
- **Java tests**: Use JUnit5, Mockito, AssertJ core
9598
- **Serialization test utilities** (`io.modelcontextprotocol.kotlin.test.utils`):
96-
- `verifySerialization(value, json, expectedJson)` — serializes, asserts match, round-trips back; use for most serialization tests
97-
- `verifyDeserialization(json, payload)` — deserializes from JSON, re-serializes, asserts match; returns the object for further assertions
98-
- Always test both empty/null/omitted and non-null cases for nullable fields; `McpJson` has `explicitNulls = false` so null properties must be absent from JSON, not `null`
99+
- `verifySerialization(value, json, expectedJson)` — serializes, asserts match, round-trips back; use for most
100+
serialization tests
101+
- `verifyDeserialization(json, payload)` — deserializes from JSON, re-serializes, asserts match; returns the object
102+
for further assertions
103+
- Always test both empty/null/omitted and non-null cases for nullable fields; `McpJson` has `explicitNulls = false`
104+
so null properties must be absent from JSON, not `null`
99105

100106
### Kotest Patterns
101107

@@ -169,6 +175,8 @@ prop.shouldNotBeNull {
169175
- Use Kotlinx Serialization with explicit `@Serializable` annotations
170176
- JSON config is defined in `jsonUtils.kt` as `McpJson` — use it consistently
171177
- Register custom serializers in companion objects
178+
- **SSE Data Concatenation**: When parsing Server-Sent Events (SSE) data, always ensure that multiple `data:` lines are
179+
concatenated with a newline (`\n`) separator, as per the SSE specification.
172180

173181
### Error Handling
174182

integration-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/ClientTest.kt

Lines changed: 119 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -61,32 +61,33 @@ class ClientTest {
6161
@Test
6262
fun `should initialize with matching protocol version`() = runTest {
6363
var initialised = false
64-
val clientTransport = object : AbstractTransport() {
65-
override suspend fun start() {}
66-
67-
override suspend fun send(message: JSONRPCMessage, options: TransportSendOptions?) {
68-
if (message !is JSONRPCRequest) return
69-
initialised = true
70-
val result = InitializeResult(
71-
protocolVersion = LATEST_PROTOCOL_VERSION,
72-
capabilities = ServerCapabilities(),
73-
serverInfo = Implementation(
74-
name = "test",
75-
version = "1.0",
76-
),
77-
)
78-
79-
val response = JSONRPCResponse(
80-
id = message.id,
81-
result = result,
82-
)
83-
84-
_onMessage.invoke(response)
85-
}
64+
val clientTransport =
65+
object : AbstractTransport(backgroundScope.coroutineContext, backgroundScope.coroutineContext) {
66+
override suspend fun start() {}
67+
68+
override suspend fun send(message: JSONRPCMessage, options: TransportSendOptions?) {
69+
if (message !is JSONRPCRequest) return
70+
initialised = true
71+
val result = InitializeResult(
72+
protocolVersion = LATEST_PROTOCOL_VERSION,
73+
capabilities = ServerCapabilities(),
74+
serverInfo = Implementation(
75+
name = "test",
76+
version = "1.0",
77+
),
78+
)
79+
80+
val response = JSONRPCResponse(
81+
id = message.id,
82+
result = result,
83+
)
84+
85+
handleMessage(response)
86+
}
8687

87-
override suspend fun close() {
88+
override suspend fun close() {
89+
}
8890
}
89-
}
9091

9192
val client = Client(
9293
clientInfo = Implementation(
@@ -107,32 +108,33 @@ class ClientTest {
107108
@Test
108109
fun `should initialize with supported older protocol version`() = runTest {
109110
val oldVersion = SUPPORTED_PROTOCOL_VERSIONS[1]
110-
val clientTransport = object : AbstractTransport() {
111-
override suspend fun start() {}
112-
113-
override suspend fun send(message: JSONRPCMessage, options: TransportSendOptions?) {
114-
if (message !is JSONRPCRequest) return
115-
check(message.method == Method.Defined.Initialize.value)
116-
117-
val result = InitializeResult(
118-
protocolVersion = oldVersion,
119-
capabilities = ServerCapabilities(),
120-
serverInfo = Implementation(
121-
name = "test",
122-
version = "1.0",
123-
),
124-
)
125-
126-
val response = JSONRPCResponse(
127-
id = message.id,
128-
result = result,
129-
)
130-
_onMessage.invoke(response)
131-
}
111+
val clientTransport =
112+
object : AbstractTransport(backgroundScope.coroutineContext, backgroundScope.coroutineContext) {
113+
override suspend fun start() {}
114+
115+
override suspend fun send(message: JSONRPCMessage, options: TransportSendOptions?) {
116+
if (message !is JSONRPCRequest) return
117+
check(message.method == Method.Defined.Initialize.value)
118+
119+
val result = InitializeResult(
120+
protocolVersion = oldVersion,
121+
capabilities = ServerCapabilities(),
122+
serverInfo = Implementation(
123+
name = "test",
124+
version = "1.0",
125+
),
126+
)
127+
128+
val response = JSONRPCResponse(
129+
id = message.id,
130+
result = result,
131+
)
132+
handleMessage(response)
133+
}
132134

133-
override suspend fun close() {
135+
override suspend fun close() {
136+
}
134137
}
135-
}
136138

137139
val client = Client(
138140
clientInfo = Implementation(
@@ -156,34 +158,35 @@ class ClientTest {
156158
@Test
157159
fun `should reject unsupported protocol version`() = runTest {
158160
var closed = false
159-
val clientTransport = object : AbstractTransport() {
160-
override suspend fun start() {}
161-
162-
override suspend fun send(message: JSONRPCMessage, options: TransportSendOptions?) {
163-
if (message !is JSONRPCRequest) return
164-
check(message.method == Method.Defined.Initialize.value)
165-
166-
val result = InitializeResult(
167-
protocolVersion = "invalid-version",
168-
capabilities = ServerCapabilities(),
169-
serverInfo = Implementation(
170-
name = "test",
171-
version = "1.0",
172-
),
173-
)
174-
175-
val response = JSONRPCResponse(
176-
id = message.id,
177-
result = result,
178-
)
179-
180-
_onMessage.invoke(response)
181-
}
161+
val clientTransport =
162+
object : AbstractTransport(backgroundScope.coroutineContext, backgroundScope.coroutineContext) {
163+
override suspend fun start() {}
164+
165+
override suspend fun send(message: JSONRPCMessage, options: TransportSendOptions?) {
166+
if (message !is JSONRPCRequest) return
167+
check(message.method == Method.Defined.Initialize.value)
168+
169+
val result = InitializeResult(
170+
protocolVersion = "invalid-version",
171+
capabilities = ServerCapabilities(),
172+
serverInfo = Implementation(
173+
name = "test",
174+
version = "1.0",
175+
),
176+
)
177+
178+
val response = JSONRPCResponse(
179+
id = message.id,
180+
result = result,
181+
)
182+
183+
handleMessage(response)
184+
}
182185

183-
override suspend fun close() {
184-
closed = true
186+
override suspend fun close() {
187+
closed = true
188+
}
185189
}
186-
}
187190

188191
val client = Client(
189192
clientInfo = Implementation(
@@ -203,19 +206,20 @@ class ClientTest {
203206
@Test
204207
fun `should reject due to non cancellation exception`() = runTest {
205208
var closed = false
206-
val failingTransport = object : AbstractTransport() {
207-
override suspend fun start() {}
208-
209-
override suspend fun send(message: JSONRPCMessage, options: TransportSendOptions?) {
210-
if (message !is JSONRPCRequest) return
211-
check(message.method == Method.Defined.Initialize.value)
212-
throw IllegalStateException("Test error")
213-
}
209+
val failingTransport =
210+
object : AbstractTransport(backgroundScope.coroutineContext, backgroundScope.coroutineContext) {
211+
override suspend fun start() {}
212+
213+
override suspend fun send(message: JSONRPCMessage, options: TransportSendOptions?) {
214+
if (message !is JSONRPCRequest) return
215+
check(message.method == Method.Defined.Initialize.value)
216+
throw IllegalStateException("Test error")
217+
}
214218

215-
override suspend fun close() {
216-
closed = true
219+
override suspend fun close() {
220+
closed = true
221+
}
217222
}
218-
}
219223

220224
val client = Client(
221225
clientInfo = Implementation(
@@ -237,22 +241,23 @@ class ClientTest {
237241
@Test
238242
fun `should rethrow McpException as is`() = runTest {
239243
var closed = false
240-
val failingTransport = object : AbstractTransport() {
241-
override suspend fun start() {}
242-
243-
override suspend fun send(message: JSONRPCMessage, options: TransportSendOptions?) {
244-
if (message !is JSONRPCRequest) return
245-
check(message.method == Method.Defined.Initialize.value)
246-
throw McpException(
247-
code = -32600,
248-
message = "Invalid Request",
249-
)
250-
}
244+
val failingTransport =
245+
object : AbstractTransport(backgroundScope.coroutineContext, backgroundScope.coroutineContext) {
246+
override suspend fun start() {}
247+
248+
override suspend fun send(message: JSONRPCMessage, options: TransportSendOptions?) {
249+
if (message !is JSONRPCRequest) return
250+
check(message.method == Method.Defined.Initialize.value)
251+
throw McpException(
252+
code = -32600,
253+
message = "Invalid Request",
254+
)
255+
}
251256

252-
override suspend fun close() {
253-
closed = true
257+
override suspend fun close() {
258+
closed = true
259+
}
254260
}
255-
}
256261

257262
val client = Client(
258263
clientInfo = Implementation(
@@ -275,22 +280,23 @@ class ClientTest {
275280
@Test
276281
fun `should rethrow StreamableHttpError as is`() = runTest {
277282
var closed = false
278-
val failingTransport = object : AbstractTransport() {
279-
override suspend fun start() {}
280-
281-
override suspend fun send(message: JSONRPCMessage, options: TransportSendOptions?) {
282-
if (message !is JSONRPCRequest) return
283-
check(message.method == Method.Defined.Initialize.value)
284-
throw StreamableHttpError(
285-
code = 500,
286-
message = "Internal Server Error",
287-
)
288-
}
283+
val failingTransport =
284+
object : AbstractTransport(backgroundScope.coroutineContext, backgroundScope.coroutineContext) {
285+
override suspend fun start() {}
286+
287+
override suspend fun send(message: JSONRPCMessage, options: TransportSendOptions?) {
288+
if (message !is JSONRPCRequest) return
289+
check(message.method == Method.Defined.Initialize.value)
290+
throw StreamableHttpError(
291+
code = 500,
292+
message = "Internal Server Error",
293+
)
294+
}
289295

290-
override suspend fun close() {
291-
closed = true
296+
override suspend fun close() {
297+
closed = true
298+
}
292299
}
293-
}
294300

295301
val client = Client(
296302
clientInfo = Implementation(

integration-test/src/commonTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/InMemoryTransportTest.kt

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import io.modelcontextprotocol.kotlin.sdk.types.ResourceUpdatedNotification
1010
import io.modelcontextprotocol.kotlin.sdk.types.ResourceUpdatedNotificationParams
1111
import io.modelcontextprotocol.kotlin.sdk.types.ToolListChangedNotification
1212
import io.modelcontextprotocol.kotlin.sdk.types.toJSON
13+
import kotlinx.coroutines.sync.Mutex
14+
import kotlinx.coroutines.sync.withLock
1315
import kotlinx.coroutines.test.runTest
1416
import kotlin.test.BeforeTest
1517
import kotlin.test.Test
@@ -44,15 +46,16 @@ class InMemoryTransportTest {
4446

4547
@Test
4648
fun `should send message from client to server`() = runTest {
49+
val (client, server) = InMemoryTransport.createLinkedPair(backgroundScope.coroutineContext)
4750
val message = InitializedNotification()
4851

4952
var receivedMessage: JSONRPCMessage? = null
50-
serverTransport.onMessage { msg ->
53+
server.onMessage { msg ->
5154
receivedMessage = msg
5255
}
5356

5457
val rpcNotification = message.toJSON()
55-
clientTransport.send(rpcNotification)
58+
client.send(rpcNotification)
5659
assertEquals(rpcNotification, receivedMessage)
5760
}
5861

@@ -190,8 +193,11 @@ class InMemoryTransportTest {
190193
)
191194

192195
val receivedMessages = mutableListOf<JSONRPCMessage>()
196+
val mutex = Mutex()
193197
clientTransport.onMessage { msg ->
194-
receivedMessages.add(msg)
198+
mutex.withLock {
199+
receivedMessages.add(msg)
200+
}
195201
}
196202

197203
notifications.forEach { notification ->

0 commit comments

Comments
 (0)