Skip to content

Commit 44d5ef8

Browse files
committed
refactor(solon-webrx): mcp adaptation
1 parent 4506611 commit 44d5ef8

4 files changed

Lines changed: 78 additions & 56 deletions

File tree

mcp-solon/mcp-solon-webrx/src/main/java/io/modelcontextprotocol/client/transport/WebRxStreamableHttpTransport.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
* This implementation does not handle backwards compatibility with the <a href=
4949
* "https://modelcontextprotocol.io/specification/2024-11-05/basic/transports#http-with-sse">"HTTP
5050
* with SSE" transport</a>. In order to communicate over the phased-out
51-
* <code>2024-11-05</code> protocol, use {@link HttpClientSseClientTransport} or
51+
* <code>2024-11-05</code> protocol, use {@link WebRxSseClientTransport} or
5252
* {@link WebRxSseClientTransport}.
5353
* </p>
5454
*
@@ -103,7 +103,7 @@ private WebRxStreamableHttpTransport(McpJsonMapper jsonMapper,
103103
this.resumableStreams = resumableStreams;
104104
this.openConnectionOnStartup = openConnectionOnStartup;
105105
this.activeSession.set(createTransportSession());
106-
this.supportedProtocolVersions = List.copyOf(supportedProtocolVersions);
106+
this.supportedProtocolVersions = new ArrayList<>(supportedProtocolVersions);
107107
this.latestSupportedProtocolVersion = this.supportedProtocolVersions.stream()
108108
.sorted(Comparator.reverseOrder())
109109
.findFirst()
@@ -160,7 +160,7 @@ private DefaultMcpTransportSession createTransportSession() {
160160

161161
private McpTransportSession<Disposable> createClosedSession(McpTransportSession<Disposable> existingSession) {
162162
var existingSessionId = Optional.ofNullable(existingSession)
163-
.filter(session -> !(session instanceof ClosedMcpTransportSession<Disposable>))
163+
.filter(session -> !(session instanceof ClosedMcpTransportSession))
164164
.flatMap(McpTransportSession::sessionId)
165165
.orElse(null);
166166
return new ClosedMcpTransportSession<>(existingSessionId);
@@ -496,7 +496,7 @@ private Tuple2<Optional<String>, Iterable<McpSchema.JSONRPCMessage>> parse(Serve
496496
// We don't support batching ATM and probably won't since the next version
497497
// considers removing it.
498498
McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(this.jsonMapper, event.data());
499-
return Tuples.of(Optional.ofNullable(event.id()), List.of(message));
499+
return Tuples.of(Optional.ofNullable(event.id()), Arrays.asList(message));
500500
} catch (IOException ioException) {
501501
throw new McpTransportException("Error parsing JSON-RPC message: " + event.data(), ioException);
502502
}
@@ -521,7 +521,7 @@ public static class Builder {
521521

522522
private boolean openConnectionOnStartup = false;
523523

524-
private List<String> supportedProtocolVersions = List.of(ProtocolVersions.MCP_2024_11_05,
524+
private List<String> supportedProtocolVersions = Arrays.asList(ProtocolVersions.MCP_2024_11_05,
525525
ProtocolVersions.MCP_2025_03_26, ProtocolVersions.MCP_2025_06_18);
526526

527527
private Builder(HttpUtilsBuilder webClientBuilder) {

mcp-solon/mcp-solon-webrx/src/main/java/io/modelcontextprotocol/server/transport/WebRxSseServerTransportProvider.java

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@
2727

2828
import java.io.IOException;
2929
import java.time.Duration;
30+
import java.util.Arrays;
31+
import java.util.HashMap;
3032
import java.util.List;
33+
import java.util.Map;
3134
import java.util.concurrent.ConcurrentHashMap;
3235

3336
/**
@@ -109,6 +112,7 @@ public class WebRxSseServerTransportProvider implements McpServerTransportProvid
109112
* Map of active client sessions, keyed by session ID.
110113
*/
111114
private final ConcurrentHashMap<String, McpServerSession> sessions = new ConcurrentHashMap<>();
115+
private final ConcurrentHashMap<String, Context> sessionRequests = new ConcurrentHashMap<>();
112116

113117
private McpTransportContextExtractor<Context> contextExtractor;
114118

@@ -183,7 +187,7 @@ public String getMcpEndpoint() {
183187

184188
@Override
185189
public List<String> protocolVersions() {
186-
return List.of(ProtocolVersions.MCP_2024_11_05);
190+
return Arrays.asList(ProtocolVersions.MCP_2024_11_05);
187191
}
188192

189193
@Override
@@ -267,20 +271,22 @@ private Mono<Entity> doHandleSseConnection(Context request) {
267271
return RxEntity.ok()
268272
.contentType(MimeType.TEXT_EVENT_STREAM_VALUE)
269273
.body(Flux.<SseEvent>create(sink -> {
270-
WebFluxMcpSessionTransport sessionTransport = new WebFluxMcpSessionTransport(sink);
274+
WebRxSseMcpSessionTransport sessionTransport = new WebRxSseMcpSessionTransport(sink);
271275

272276
McpServerSession session = sessionFactory.create(sessionTransport);
273277
String sessionId = session.getId();
274278

275279
logger.debug("Created new SSE connection for session: {}", sessionId);
276280
sessions.put(sessionId, session);
281+
sessionRequests.put(sessionId, request);
277282

278283
// Send initial endpoint event
279284
logger.debug("Sending initial endpoint event to session: {}", sessionId);
280285
sink.next(new SseEvent().name(ENDPOINT_EVENT_TYPE).data(buildEndpointUrl(sessionId)));
281286
sink.onCancel(() -> {
282287
logger.debug("Session {} cancelled", sessionId);
283288
sessions.remove(sessionId);
289+
sessionRequests.remove(sessionId);
284290
});
285291
}).contextWrite(ctx -> ctx.put(McpTransportContext.KEY, transportContext)));
286292
}
@@ -339,14 +345,16 @@ private Mono<Entity> doHandleMessage(Context request) throws Throwable {
339345
return RxEntity.badRequest().body(new McpError("Session ID missing in message endpoint"));
340346
}
341347

342-
McpServerSession session = sessions.get(request.param("sessionId"));
348+
String sessionId = request.param("sessionId");
349+
McpServerSession session = sessions.get(sessionId);
350+
Context sessionRequest = sessionRequests.get(sessionId);
343351

344352
if (session == null) {
345353
return RxEntity.status(StatusCodes.CODE_NOT_FOUND)
346-
.body(new McpError("Session not found: " + request.param("sessionId")));
354+
.body(new McpError("Session not found: " + sessionId));
347355
}
348356

349-
McpTransportContext transportContext = this.contextExtractor.extract(request);
357+
McpTransportContext transportContext = this.contextExtractor.extract(sessionRequest);
350358

351359

352360
return Mono.just(request.body()).flatMap(body -> {
@@ -368,11 +376,11 @@ private Mono<Entity> doHandleMessage(Context request) throws Throwable {
368376
}).contextWrite(ctx -> ctx.put(McpTransportContext.KEY, transportContext));
369377
}
370378

371-
private class WebFluxMcpSessionTransport implements McpServerTransport {
379+
private class WebRxSseMcpSessionTransport implements McpServerTransport {
372380

373381
private final FluxSink<SseEvent> sink;
374382

375-
public WebFluxMcpSessionTransport(FluxSink<SseEvent> sink) {
383+
public WebRxSseMcpSessionTransport(FluxSink<SseEvent> sink) {
376384
this.sink = sink;
377385
}
378386

@@ -436,8 +444,11 @@ public static class Builder {
436444

437445
private Duration keepAliveInterval;
438446

439-
private McpTransportContextExtractor<Context> contextExtractor = (
440-
serverRequest) -> McpTransportContext.EMPTY;
447+
private McpTransportContextExtractor<Context> contextExtractor = (serverRequest) -> {
448+
Map<String,Object> context = new HashMap<>();
449+
context.put(Context.class.getName(), serverRequest);
450+
return McpTransportContext.create(context);
451+
};
441452

442453
/**
443454
* Sets the McpJsonMapper to use for JSON serialization/deserialization of MCP

mcp-solon/mcp-solon-webrx/src/main/java/io/modelcontextprotocol/server/transport/WebRxStatelessServerTransport.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,13 @@
1414
import org.noear.solon.core.handle.StatusCodes;
1515
import org.noear.solon.core.util.MimeType;
1616
import org.noear.solon.rx.handle.RxEntity;
17-
import org.noear.solon.web.sse.SseEvent;
1817
import org.slf4j.Logger;
1918
import org.slf4j.LoggerFactory;
2019
import reactor.core.publisher.Mono;
2120

2221
import java.io.IOException;
23-
import java.util.List;
22+
import java.util.HashMap;
23+
import java.util.Map;
2424

2525
/**
2626
* Implementation of a WebFlux based {@link McpStatelessServerTransport}.
@@ -108,7 +108,8 @@ private Mono<Entity> doHandlePost(Context request) throws Throwable {
108108
try {
109109
McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(jsonMapper, body);
110110

111-
if (message instanceof McpSchema.JSONRPCRequest jsonrpcRequest) {
111+
if (message instanceof McpSchema.JSONRPCRequest) {
112+
McpSchema.JSONRPCRequest jsonrpcRequest = (McpSchema.JSONRPCRequest)message;
112113
return this.mcpHandler.handleRequest(transportContext, jsonrpcRequest).flatMap(jsonrpcResponse -> {
113114
try {
114115
String json = jsonMapper.writeValueAsString(jsonrpcResponse);
@@ -121,7 +122,8 @@ private Mono<Entity> doHandlePost(Context request) throws Throwable {
121122
}
122123
});
123124
}
124-
else if (message instanceof McpSchema.JSONRPCNotification jsonrpcNotification) {
125+
else if (message instanceof McpSchema.JSONRPCNotification) {
126+
McpSchema.JSONRPCNotification jsonrpcNotification = (McpSchema.JSONRPCNotification)message;
125127
return this.mcpHandler.handleNotification(transportContext, jsonrpcNotification)
126128
.then(RxEntity.accepted().build());
127129
}
@@ -157,8 +159,11 @@ public static class Builder {
157159

158160
private String mcpEndpoint = "/mcp";
159161

160-
private McpTransportContextExtractor<Context> contextExtractor = (
161-
serverRequest) -> McpTransportContext.EMPTY;
162+
private McpTransportContextExtractor<Context> contextExtractor = (serverRequest) -> {
163+
Map<String,Object> context = new HashMap<>();
164+
context.put(Context.class.getName(), serverRequest);
165+
return McpTransportContext.create(context);
166+
};
162167

163168
private Builder() {
164169
// used by a static method

mcp-solon/mcp-solon-webrx/src/main/java/io/modelcontextprotocol/server/transport/WebRxStreamableServerTransportProvider.java

Lines changed: 42 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import org.noear.solon.core.handle.StatusCodes;
1818
import org.noear.solon.core.util.MimeType;
1919
import org.noear.solon.rx.handle.RxEntity;
20-
import org.noear.solon.web.sse.SseEmitter;
2120
import org.noear.solon.web.sse.SseEvent;
2221
import org.slf4j.Logger;
2322
import org.slf4j.LoggerFactory;
@@ -30,9 +29,10 @@
3029
import java.io.IOException;
3130
import java.time.Duration;
3231
import java.util.Arrays;
32+
import java.util.HashMap;
3333
import java.util.List;
34+
import java.util.Map;
3435
import java.util.concurrent.ConcurrentHashMap;
35-
import java.util.concurrent.locks.ReentrantLock;
3636

3737
/**
3838
* Server-side implementation of the Model Context Protocol (MCP) streamable transport
@@ -146,7 +146,7 @@ public String getMcpEndpoint() {
146146

147147
@Override
148148
public List<String> protocolVersions() {
149-
return List.of(ProtocolVersions.MCP_2024_11_05, ProtocolVersions.MCP_2025_03_26,
149+
return Arrays.asList(ProtocolVersions.MCP_2024_11_05, ProtocolVersions.MCP_2025_03_26,
150150
ProtocolVersions.MCP_2025_06_18);
151151
}
152152

@@ -240,7 +240,7 @@ private Mono<Entity> doHandleGet(Context request) {
240240
return RxEntity.ok()
241241
.contentType(MimeType.TEXT_EVENT_STREAM_VALUE)
242242
.body(Flux.<SseEvent>create(sink -> {
243-
WebFluxStreamableMcpSessionTransport sessionTransport = new WebFluxStreamableMcpSessionTransport(
243+
WebRxStreamableMcpSessionTransport sessionTransport = new WebRxStreamableMcpSessionTransport(
244244
sink);
245245
McpStreamableServerSession.McpStreamableServerSessionStream listeningStream = session
246246
.listeningStream(sessionTransport);
@@ -284,30 +284,30 @@ private Mono<Entity> doHandlePost(Context request) throws Throwable {
284284
return Mono.just(request.body()).<Entity>flatMap(body -> {
285285
try {
286286
McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(jsonMapper, body);
287-
if (message instanceof McpSchema.JSONRPCRequest jsonrpcRequest
288-
&& jsonrpcRequest.method().equals(McpSchema.METHOD_INITIALIZE)) {
289-
var typeReference = new TypeRef<McpSchema.InitializeRequest>() {
290-
};
291-
McpSchema.InitializeRequest initializeRequest = jsonMapper.convertValue(jsonrpcRequest.params(),
292-
typeReference);
293-
McpStreamableServerSession.McpStreamableServerSessionInit init = this.sessionFactory
294-
.startSession(initializeRequest);
295-
sessions.put(init.session().getId(), init.session());
296-
return init.initResult().map(initializeResult -> {
297-
McpSchema.JSONRPCResponse jsonrpcResponse = new McpSchema.JSONRPCResponse(
298-
McpSchema.JSONRPC_VERSION, jsonrpcRequest.id(), initializeResult, null);
299-
try {
300-
return this.jsonMapper.writeValueAsString(jsonrpcResponse);
301-
}
302-
catch (IOException e) {
303-
logger.warn("Failed to serialize initResponse", e);
304-
throw Exceptions.propagate(e);
305-
}
306-
})
307-
.flatMap(initResult -> RxEntity.ok()
308-
.contentType(MimeType.APPLICATION_JSON_VALUE)
309-
.headerSet(HttpHeaders.MCP_SESSION_ID, init.session().getId())
310-
.body(initResult));
287+
if (message instanceof McpSchema.JSONRPCRequest) {
288+
McpSchema.JSONRPCRequest jsonrpcRequest = (McpSchema.JSONRPCRequest) message;
289+
if (jsonrpcRequest.method().equals(McpSchema.METHOD_INITIALIZE)) {
290+
var typeReference = new TypeRef<McpSchema.InitializeRequest>() {};
291+
McpSchema.InitializeRequest initializeRequest = jsonMapper.convertValue(jsonrpcRequest.params(),
292+
typeReference);
293+
McpStreamableServerSession.McpStreamableServerSessionInit init = this.sessionFactory
294+
.startSession(initializeRequest);
295+
sessions.put(init.session().getId(), init.session());
296+
return init.initResult().map(initializeResult -> {
297+
McpSchema.JSONRPCResponse jsonrpcResponse = new McpSchema.JSONRPCResponse(
298+
McpSchema.JSONRPC_VERSION, jsonrpcRequest.id(), initializeResult, null);
299+
try {
300+
return this.jsonMapper.writeValueAsString(jsonrpcResponse);
301+
} catch (IOException e) {
302+
logger.warn("Failed to serialize initResponse", e);
303+
throw Exceptions.propagate(e);
304+
}
305+
})
306+
.flatMap(initResult -> RxEntity.ok()
307+
.contentType(MimeType.APPLICATION_JSON_VALUE)
308+
.headerSet(HttpHeaders.MCP_SESSION_ID, init.session().getId())
309+
.body(initResult));
310+
}
311311
}
312312

313313
if (request.headerMap().containsKey(HttpHeaders.MCP_SESSION_ID) == false) {
@@ -322,17 +322,20 @@ private Mono<Entity> doHandlePost(Context request) throws Throwable {
322322
.body(new McpError("Session not found: " + sessionId));
323323
}
324324

325-
if (message instanceof McpSchema.JSONRPCResponse jsonrpcResponse) {
325+
if (message instanceof McpSchema.JSONRPCResponse) {
326+
McpSchema.JSONRPCResponse jsonrpcResponse = (McpSchema.JSONRPCResponse)message;
326327
return session.accept(jsonrpcResponse).then(RxEntity.accepted().build());
327328
}
328-
else if (message instanceof McpSchema.JSONRPCNotification jsonrpcNotification) {
329+
else if (message instanceof McpSchema.JSONRPCNotification) {
330+
McpSchema.JSONRPCNotification jsonrpcNotification = (McpSchema.JSONRPCNotification)message;
329331
return session.accept(jsonrpcNotification).then(RxEntity.accepted().build());
330332
}
331-
else if (message instanceof McpSchema.JSONRPCRequest jsonrpcRequest) {
333+
else if (message instanceof McpSchema.JSONRPCRequest) {
334+
McpSchema.JSONRPCRequest jsonrpcRequest = (McpSchema.JSONRPCRequest)message;
332335
return RxEntity.ok()
333336
.contentType(MimeType.TEXT_EVENT_STREAM_VALUE)
334337
.body(Flux.<SseEvent>create(sink -> {
335-
WebFluxStreamableMcpSessionTransport st = new WebFluxStreamableMcpSessionTransport(sink);
338+
WebRxStreamableMcpSessionTransport st = new WebRxStreamableMcpSessionTransport(sink);
336339
Mono<Void> stream = session.responseStream(jsonrpcRequest, st);
337340
Disposable streamSubscription = stream.onErrorComplete(err -> {
338341
sink.error(err);
@@ -390,11 +393,11 @@ private Mono<Entity> doHandleDelete(Context request) {
390393
}).contextWrite(ctx -> ctx.put(McpTransportContext.KEY, transportContext));
391394
}
392395

393-
private class WebFluxStreamableMcpSessionTransport implements McpStreamableServerTransport {
396+
private class WebRxStreamableMcpSessionTransport implements McpStreamableServerTransport {
394397

395398
private final FluxSink<SseEvent> sink;
396399

397-
public WebFluxStreamableMcpSessionTransport(FluxSink<SseEvent> sink) {
400+
public WebRxStreamableMcpSessionTransport(FluxSink<SseEvent> sink) {
398401
this.sink = sink;
399402
}
400403

@@ -458,8 +461,11 @@ public static class Builder {
458461

459462
private String mcpEndpoint = "/mcp";
460463

461-
private McpTransportContextExtractor<Context> contextExtractor = (
462-
serverRequest) -> McpTransportContext.EMPTY;
464+
private McpTransportContextExtractor<Context> contextExtractor = (serverRequest) -> {
465+
Map<String,Object> context = new HashMap<>();
466+
context.put(Context.class.getName(), serverRequest);
467+
return McpTransportContext.create(context);
468+
};
463469

464470
private boolean disallowDelete;
465471

0 commit comments

Comments
 (0)