Skip to content

Commit 3f0a81f

Browse files
committed
fix: release owned HttpClient instances on transport close
1 parent 5e77762 commit 3f0a81f

File tree

9 files changed

+393
-14
lines changed

9 files changed

+393
-14
lines changed

mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public class HttpClientSseClientTransport implements McpClientTransport {
9292
* HTTP client for sending messages to the server. Uses HTTP POST over the message
9393
* endpoint
9494
*/
95-
private final HttpClient httpClient;
95+
private final OwnedHttpClient ownedHttpClient;
9696

9797
/** HTTP request builder for building requests to send messages to the server */
9898
private final HttpRequest.Builder requestBuilder;
@@ -140,11 +140,15 @@ public class HttpClientSseClientTransport implements McpClientTransport {
140140
this.baseUri = URI.create(baseUri);
141141
this.sseEndpoint = sseEndpoint;
142142
this.jsonMapper = jsonMapper;
143-
this.httpClient = httpClient;
143+
this.ownedHttpClient = OwnedHttpClient.create(httpClient);
144144
this.requestBuilder = requestBuilder;
145145
this.httpRequestCustomizer = httpRequestCustomizer;
146146
}
147147

148+
private HttpClient httpClient() {
149+
return this.ownedHttpClient.currentClientOrThrow();
150+
}
151+
148152
@Override
149153
public List<String> protocolVersions() {
150154
return List.of(ProtocolVersions.MCP_2024_11_05);
@@ -323,7 +327,7 @@ public Mono<Void> connect(Function<Mono<JSONRPCMessage>, Mono<JSONRPCMessage>> h
323327
var transportContext = ctx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY);
324328
return Mono.from(this.httpRequestCustomizer.customize(builder, "GET", uri, null, transportContext));
325329
}).flatMap(requestBuilder -> Mono.create(sink -> {
326-
Disposable connection = Flux.<ResponseEvent>create(sseSink -> this.httpClient
330+
Disposable connection = Flux.<ResponseEvent>create(sseSink -> this.httpClient()
327331
.sendAsync(requestBuilder.build(),
328332
responseInfo -> ResponseSubscribers.sseToBodySubscriber(responseInfo, sseSink))
329333
.exceptionallyCompose(e -> {
@@ -452,7 +456,7 @@ private Mono<HttpResponse<String>> sendHttpPost(final String endpoint, final Str
452456
return Mono.from(this.httpRequestCustomizer.customize(builder, "POST", requestUri, body, transportContext));
453457
}).flatMap(customizedBuilder -> {
454458
var request = customizedBuilder.build();
455-
return Mono.fromFuture(httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()));
459+
return Mono.fromFuture(this.httpClient().sendAsync(request, HttpResponse.BodyHandlers.ofString()));
456460
});
457461
}
458462

@@ -472,7 +476,7 @@ public Mono<Void> closeGracefully() {
472476
if (subscription != null && !subscription.isDisposed()) {
473477
subscription.dispose();
474478
}
475-
});
479+
}).then(this.ownedHttpClient.releaseAfterClose());
476480
}
477481

478482
/**

mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport {
8989
* HTTP client for sending messages to the server. Uses HTTP POST over the message
9090
* endpoint
9191
*/
92-
private final HttpClient httpClient;
92+
private final OwnedHttpClient ownedHttpClient;
9393

9494
/** HTTP request builder for building requests to send messages to the server */
9595
private final HttpRequest.Builder requestBuilder;
@@ -139,7 +139,7 @@ private HttpClientStreamableHttpTransport(McpJsonMapper jsonMapper, HttpClient h
139139
boolean openConnectionOnStartup, McpAsyncHttpClientRequestCustomizer httpRequestCustomizer,
140140
McpHttpClientAuthorizationErrorHandler authorizationErrorHandler, List<String> supportedProtocolVersions) {
141141
this.jsonMapper = jsonMapper;
142-
this.httpClient = httpClient;
142+
this.ownedHttpClient = OwnedHttpClient.create(httpClient);
143143
this.requestBuilder = requestBuilder;
144144
this.baseUri = URI.create(baseUri);
145145
this.endpoint = endpoint;
@@ -155,6 +155,10 @@ private HttpClientStreamableHttpTransport(McpJsonMapper jsonMapper, HttpClient h
155155
.get();
156156
}
157157

158+
private HttpClient httpClient() {
159+
return this.ownedHttpClient.currentClientOrThrow();
160+
}
161+
158162
@Override
159163
public List<String> protocolVersions() {
160164
return supportedProtocolVersions;
@@ -209,7 +213,7 @@ private Publisher<Void> createDelete(String sessionId) {
209213
return Mono.from(this.httpRequestCustomizer.customize(builder, "DELETE", uri, null, transportContext));
210214
}).flatMap(requestBuilder -> {
211215
var request = requestBuilder.build();
212-
return Mono.fromFuture(() -> this.httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()));
216+
return Mono.fromFuture(() -> this.httpClient().sendAsync(request, HttpResponse.BodyHandlers.ofString()));
213217
}).then();
214218
}
215219

@@ -237,10 +241,10 @@ public Mono<Void> closeGracefully() {
237241
return Mono.defer(() -> {
238242
logger.debug("Graceful close triggered");
239243
McpTransportSession<Disposable> currentSession = this.activeSession.getAndUpdate(this::createClosedSession);
240-
if (currentSession != null) {
241-
return Mono.from(currentSession.closeGracefully());
242-
}
243-
return Mono.empty();
244+
Mono<Void> sessionClose = currentSession != null ? Mono.from(currentSession.closeGracefully())
245+
: Mono.empty();
246+
return sessionClose.onErrorResume(error -> this.ownedHttpClient.releaseAfterClose().then(Mono.error(error)))
247+
.then(this.ownedHttpClient.releaseAfterClose());
244248
});
245249
}
246250

@@ -280,7 +284,7 @@ private Mono<Disposable> reconnect(McpTransportStream<Disposable> stream) {
280284
var transportContext = connectionCtx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY);
281285
return Mono.from(this.httpRequestCustomizer.customize(builder, "GET", uri, null, transportContext));
282286
})
283-
.flatMapMany(requestBuilder -> Flux.<ResponseEvent>create(sseSink -> this.httpClient
287+
.flatMapMany(requestBuilder -> Flux.<ResponseEvent>create(sseSink -> this.httpClient()
284288
.sendAsync(requestBuilder.build(), this.toSendMessageBodySubscriber(sseSink))
285289
.whenComplete((response, throwable) -> {
286290
if (throwable != null) {
@@ -489,7 +493,7 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage sentMessage) {
489493
}).flatMapMany(requestBuilder -> Flux.<ResponseEvent>create(responseEventSink -> {
490494

491495
// Create the async request with proper body subscriber selection
492-
Mono.fromFuture(this.httpClient
496+
Mono.fromFuture(this.httpClient()
493497
.sendAsync(requestBuilder.build(), this.toSendMessageBodySubscriber(responseEventSink))
494498
.whenComplete((response, throwable) -> {
495499
if (throwable != null) {
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2024-2026 the original author or authors.
3+
*/
4+
5+
package io.modelcontextprotocol.client.transport;
6+
7+
import java.net.http.HttpClient;
8+
import java.util.concurrent.atomic.AtomicReference;
9+
10+
import io.modelcontextprotocol.spec.McpTransportException;
11+
import io.modelcontextprotocol.util.Assert;
12+
import reactor.core.publisher.Mono;
13+
14+
/**
15+
* Tracks a transport-owned {@link HttpClient} so the transport can release its strong
16+
* reference on close without changing the public builder API.
17+
*/
18+
final class OwnedHttpClient {
19+
20+
private final AtomicReference<HttpClient> clientRef;
21+
22+
private OwnedHttpClient(HttpClient httpClient) {
23+
Assert.notNull(httpClient, "httpClient must not be null");
24+
this.clientRef = new AtomicReference<>(httpClient);
25+
}
26+
27+
static OwnedHttpClient create(HttpClient httpClient) {
28+
return new OwnedHttpClient(httpClient);
29+
}
30+
31+
/**
32+
* Return the currently owned client, or fail fast after the transport released it.
33+
*/
34+
HttpClient currentClientOrThrow() {
35+
HttpClient client = this.clientRef.get();
36+
if (client == null) {
37+
throw new McpTransportException("Transport is closed and no longer owns an HttpClient");
38+
}
39+
return client;
40+
}
41+
42+
Mono<Void> releaseAfterClose() {
43+
return Mono.fromRunnable(() -> this.clientRef.set(null));
44+
}
45+
46+
}

mcp-core/src/main/java/io/modelcontextprotocol/spec/DefaultMcpTransportSession.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ public void close() {
7878
@Override
7979
public Mono<Void> closeGracefully() {
8080
return Mono.from(this.onClose.apply(this.sessionId.get()))
81+
.onErrorResume(error -> Mono.fromRunnable(this.openConnections::dispose).then(Mono.error(error)))
8182
.then(Mono.fromRunnable(this.openConnections::dispose));
8283
}
8384

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright 2024-2026 the original author or authors.
3+
*/
4+
5+
package io.modelcontextprotocol.client.transport;
6+
7+
import java.util.List;
8+
9+
final class HttpClientLeakTestSupport {
10+
11+
private HttpClientLeakTestSupport() {
12+
}
13+
14+
static int selectorManagerThreadCount() {
15+
return selectorManagerThreadNames().size();
16+
}
17+
18+
static List<String> selectorManagerThreadNames() {
19+
return Thread.getAllStackTraces()
20+
.keySet()
21+
.stream()
22+
.map(Thread::getName)
23+
.filter(name -> name.contains("HttpClient") && name.contains("SelectorManager"))
24+
.sorted()
25+
.toList();
26+
}
27+
28+
static int forceGcUntilStable() throws InterruptedException {
29+
int previousCount = Integer.MAX_VALUE;
30+
int stableIterations = 0;
31+
int currentCount = previousCount;
32+
33+
for (int i = 0; i < 40; i++) {
34+
System.gc();
35+
System.runFinalization();
36+
Thread.sleep(250);
37+
38+
currentCount = selectorManagerThreadCount();
39+
if (currentCount == previousCount) {
40+
stableIterations++;
41+
if (stableIterations >= 4) {
42+
break;
43+
}
44+
}
45+
else {
46+
stableIterations = 0;
47+
previousCount = currentCount;
48+
}
49+
}
50+
51+
return currentCount;
52+
}
53+
54+
static void pauseForSelectorStartup() throws InterruptedException {
55+
Thread.sleep(150);
56+
}
57+
58+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright 2024-2026 the original author or authors.
3+
*/
4+
5+
package io.modelcontextprotocol.client.transport;
6+
7+
import java.util.ArrayList;
8+
import java.util.List;
9+
import java.util.function.Function;
10+
11+
import io.modelcontextprotocol.spec.McpSchema;
12+
import io.modelcontextprotocol.spec.json.gson.GsonMcpJsonMapper;
13+
import org.junit.jupiter.api.Test;
14+
import reactor.test.StepVerifier;
15+
16+
import static org.assertj.core.api.Assertions.assertThat;
17+
18+
class HttpClientSseClientTransportLeakTests {
19+
20+
@Test
21+
void closeDoesNotRetainOwnedHttpClientWhenClosedTransportRemainsReachable() throws Exception {
22+
try (LoopbackMcpHttpServer server = LoopbackMcpHttpServer.start()) {
23+
List<HttpClientSseClientTransport> retainedClosedTransports = new ArrayList<>();
24+
int selectorThreadsBefore = HttpClientLeakTestSupport.selectorManagerThreadCount();
25+
Function<reactor.core.publisher.Mono<McpSchema.JSONRPCMessage>, reactor.core.publisher.Mono<McpSchema.JSONRPCMessage>> handler = Function
26+
.identity();
27+
28+
for (int i = 0; i < 12; i++) {
29+
HttpClientSseClientTransport transport = HttpClientSseClientTransport
30+
.builder(server.baseUri().toString())
31+
.jsonMapper(new GsonMcpJsonMapper())
32+
.build();
33+
34+
StepVerifier.create(transport.connect(handler)).verifyComplete();
35+
HttpClientLeakTestSupport.pauseForSelectorStartup();
36+
StepVerifier.create(transport.closeGracefully()).verifyComplete();
37+
retainedClosedTransports.add(transport);
38+
}
39+
40+
int selectorThreadsAfter = HttpClientLeakTestSupport.forceGcUntilStable();
41+
42+
assertThat(selectorThreadsAfter)
43+
.describedAs(
44+
"closed transports should not keep owned HttpClient instances alive, remaining threads: %s",
45+
HttpClientLeakTestSupport.selectorManagerThreadNames())
46+
.isLessThanOrEqualTo(selectorThreadsBefore + 1);
47+
}
48+
}
49+
50+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright 2024-2026 the original author or authors.
3+
*/
4+
5+
package io.modelcontextprotocol.client.transport;
6+
7+
import java.util.Map;
8+
import java.util.ArrayList;
9+
import java.util.List;
10+
import java.util.function.Function;
11+
12+
import io.modelcontextprotocol.spec.McpSchema;
13+
import io.modelcontextprotocol.spec.json.gson.GsonMcpJsonMapper;
14+
import org.junit.jupiter.api.Test;
15+
import reactor.test.StepVerifier;
16+
17+
import static org.assertj.core.api.Assertions.assertThat;
18+
19+
class HttpClientStreamableHttpTransportLeakTests {
20+
21+
@Test
22+
void closeDoesNotRetainOwnedHttpClientWhenClosedTransportRemainsReachable() throws Exception {
23+
try (LoopbackMcpHttpServer server = LoopbackMcpHttpServer.start()) {
24+
List<HttpClientStreamableHttpTransport> retainedClosedTransports = new ArrayList<>();
25+
int selectorThreadsBefore = HttpClientLeakTestSupport.selectorManagerThreadCount();
26+
Function<reactor.core.publisher.Mono<McpSchema.JSONRPCMessage>, reactor.core.publisher.Mono<McpSchema.JSONRPCMessage>> handler = Function
27+
.identity();
28+
29+
for (int i = 0; i < 12; i++) {
30+
HttpClientStreamableHttpTransport transport = HttpClientStreamableHttpTransport
31+
.builder(server.baseUri().toString())
32+
.jsonMapper(new GsonMcpJsonMapper())
33+
.build();
34+
35+
StepVerifier.create(transport.connect(handler)).verifyComplete();
36+
StepVerifier.create(transport.sendMessage(
37+
new McpSchema.JSONRPCNotification(McpSchema.JSONRPC_VERSION, "ping", Map.of("iteration", i))))
38+
.verifyComplete();
39+
HttpClientLeakTestSupport.pauseForSelectorStartup();
40+
StepVerifier.create(transport.closeGracefully()).verifyComplete();
41+
retainedClosedTransports.add(transport);
42+
}
43+
44+
int selectorThreadsAfter = HttpClientLeakTestSupport.forceGcUntilStable();
45+
46+
assertThat(selectorThreadsAfter)
47+
.describedAs(
48+
"closed transports should not keep owned HttpClient instances alive, remaining threads: %s",
49+
HttpClientLeakTestSupport.selectorManagerThreadNames())
50+
.isLessThanOrEqualTo(selectorThreadsBefore + 1);
51+
}
52+
}
53+
54+
}

0 commit comments

Comments
 (0)