forked from modelcontextprotocol/java-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathWebFluxSseIntegrationTests.java
More file actions
94 lines (71 loc) · 3.02 KB
/
Copy pathWebFluxSseIntegrationTests.java
File metadata and controls
94 lines (71 loc) · 3.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
/*
* Copyright 2024 - 2024 the original author or authors.
*/
package io.modelcontextprotocol;
import java.time.Duration;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Timeout;
import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.server.RouterFunctions;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.modelcontextprotocol.client.McpClient;
import io.modelcontextprotocol.client.transport.HttpClientSseClientTransport;
import io.modelcontextprotocol.client.transport.WebFluxSseClientTransport;
import io.modelcontextprotocol.server.McpServer;
import io.modelcontextprotocol.server.McpServer.AsyncSpecification;
import io.modelcontextprotocol.server.McpServer.SingleSessionSyncSpecification;
import io.modelcontextprotocol.server.TestUtil;
import io.modelcontextprotocol.server.transport.WebFluxSseServerTransportProvider;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
@Timeout(15)
class WebFluxSseIntegrationTests extends AbstractMcpClientServerIntegrationTests {
private static final int PORT = TestUtil.findAvailablePort();
private static final String CUSTOM_SSE_ENDPOINT = "/somePath/sse";
private static final String CUSTOM_MESSAGE_ENDPOINT = "/otherPath/mcp/message";
private DisposableServer httpServer;
private WebFluxSseServerTransportProvider mcpServerTransportProvider;
@Override
protected void prepareClients(int port, String mcpEndpoint) {
clientBuilders
.put("httpclient",
McpClient.sync(HttpClientSseClientTransport.builder("http://localhost:" + PORT)
.sseEndpoint(CUSTOM_SSE_ENDPOINT)
.build()).requestTimeout(Duration.ofHours(10)));
clientBuilders.put("webflux",
McpClient
.sync(WebFluxSseClientTransport.builder(WebClient.builder().baseUrl("http://localhost:" + PORT))
.sseEndpoint(CUSTOM_SSE_ENDPOINT)
.build())
.requestTimeout(Duration.ofHours(10)));
}
@Override
protected AsyncSpecification<?> prepareAsyncServerBuilder() {
return McpServer.async(mcpServerTransportProvider);
}
@Override
protected SingleSessionSyncSpecification prepareSyncServerBuilder() {
return McpServer.sync(mcpServerTransportProvider);
}
@BeforeEach
public void before() {
this.mcpServerTransportProvider = new WebFluxSseServerTransportProvider.Builder()
.objectMapper(new ObjectMapper())
.messageEndpoint(CUSTOM_MESSAGE_ENDPOINT)
.sseEndpoint(CUSTOM_SSE_ENDPOINT)
.build();
HttpHandler httpHandler = RouterFunctions.toHttpHandler(mcpServerTransportProvider.getRouterFunction());
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);
this.httpServer = HttpServer.create().port(PORT).handle(adapter).bindNow();
prepareClients(PORT, null);
}
@AfterEach
public void after() {
if (httpServer != null) {
httpServer.disposeNow();
}
}
}