|
30 | 30 | import io.modelcontextprotocol.spec.McpTransportSessionNotFoundException; |
31 | 31 | import io.modelcontextprotocol.spec.McpTransportStream; |
32 | 32 | import io.modelcontextprotocol.util.Assert; |
| 33 | +import io.modelcontextprotocol.util.Utils; |
33 | 34 | import reactor.core.Disposable; |
34 | 35 | import reactor.core.publisher.Flux; |
35 | 36 | import reactor.core.publisher.Mono; |
@@ -244,7 +245,7 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) { |
244 | 245 |
|
245 | 246 | Disposable connection = webClient.post() |
246 | 247 | .uri(this.endpoint) |
247 | | - .accept(MediaType.TEXT_EVENT_STREAM, MediaType.APPLICATION_JSON) |
| 248 | + .accept(MediaType.APPLICATION_JSON, MediaType.TEXT_EVENT_STREAM) |
248 | 249 | .headers(httpHeaders -> { |
249 | 250 | transportSession.sessionId().ifPresent(id -> httpHeaders.add("mcp-session-id", id)); |
250 | 251 | }) |
@@ -287,7 +288,7 @@ else if (mediaType.isCompatibleWith(MediaType.APPLICATION_JSON)) { |
287 | 288 | logger.trace("Received response to POST for session {}", sessionRepresentation); |
288 | 289 | // communicate to caller the message was delivered |
289 | 290 | sink.success(); |
290 | | - return responseFlux(response); |
| 291 | + return directResponseFlux(message, response); |
291 | 292 | } |
292 | 293 | else { |
293 | 294 | logger.warn("Unknown media type {} returned for POST in session {}", contentType, |
@@ -385,14 +386,22 @@ private static String sessionIdOrPlaceholder(McpTransportSession<?> transportSes |
385 | 386 | return transportSession.sessionId().orElse("[missing_session_id]"); |
386 | 387 | } |
387 | 388 |
|
388 | | - private Flux<McpSchema.JSONRPCMessage> responseFlux(ClientResponse response) { |
| 389 | + private Flux<McpSchema.JSONRPCMessage> directResponseFlux(McpSchema.JSONRPCMessage sentMessage, |
| 390 | + ClientResponse response) { |
389 | 391 | return response.bodyToMono(String.class).<Iterable<McpSchema.JSONRPCMessage>>handle((responseMessage, s) -> { |
390 | 392 | try { |
391 | | - McpSchema.JSONRPCMessage jsonRpcResponse = McpSchema.deserializeJsonRpcMessage(objectMapper, |
392 | | - responseMessage); |
393 | | - s.next(List.of(jsonRpcResponse)); |
| 393 | + if (sentMessage instanceof McpSchema.JSONRPCNotification && Utils.hasText(responseMessage)) { |
| 394 | + logger.warn("Notification: {} received non-compliant response: {}", sentMessage, responseMessage); |
| 395 | + s.complete(); |
| 396 | + } |
| 397 | + else { |
| 398 | + McpSchema.JSONRPCMessage jsonRpcResponse = McpSchema.deserializeJsonRpcMessage(objectMapper, |
| 399 | + responseMessage); |
| 400 | + s.next(List.of(jsonRpcResponse)); |
| 401 | + } |
394 | 402 | } |
395 | 403 | catch (IOException e) { |
| 404 | + // TODO: this should be a McpTransportError |
396 | 405 | s.error(e); |
397 | 406 | } |
398 | 407 | }).flatMapIterable(Function.identity()); |
@@ -424,7 +433,8 @@ private Tuple2<Optional<String>, Iterable<McpSchema.JSONRPCMessage>> parse(Serve |
424 | 433 | } |
425 | 434 | } |
426 | 435 | else { |
427 | | - throw new McpError("Received unrecognized SSE event type: " + event.event()); |
| 436 | + logger.debug("Received SSE event with type: {}", event); |
| 437 | + return Tuples.of(Optional.empty(), List.of()); |
428 | 438 | } |
429 | 439 | } |
430 | 440 |
|
|
0 commit comments