Skip to content

Commit 475ddeb

Browse files
committed
fix(http): reduce latency on subsequent StreamableHttp calls
1 parent 8e22aa2 commit 475ddeb

4 files changed

Lines changed: 224 additions & 69 deletions

File tree

crates/rmcp/src/transport/common/reqwest/streamable_http_client.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ impl StreamableHttpClientTransport<reqwest::Client> {
262262
/// This method requires the `transport-streamable-http-client-reqwest` feature.
263263
pub fn from_uri(uri: impl Into<Arc<str>>) -> Self {
264264
StreamableHttpClientTransport::with_client(
265-
reqwest::Client::default(),
265+
Self::default_http_client(),
266266
StreamableHttpClientTransportConfig {
267267
uri: uri.into(),
268268
auth_header: None,
@@ -277,7 +277,21 @@ impl StreamableHttpClientTransport<reqwest::Client> {
277277
///
278278
/// * `config` - The config to use with this transport
279279
pub fn from_config(config: StreamableHttpClientTransportConfig) -> Self {
280-
StreamableHttpClientTransport::with_client(reqwest::Client::default(), config)
280+
StreamableHttpClientTransport::with_client(Self::default_http_client(), config)
281+
}
282+
283+
/// Build the default reqwest client for this transport.
284+
///
285+
/// Disables idle connection pooling because POST SSE response bodies
286+
/// cannot be fully consumed before the next request starts, causing
287+
/// reqwest to stall on pool checkout (~40 ms). The long-lived GET
288+
/// SSE stream already holds its own connection, so there is no
289+
/// benefit to pooling POST connections with HTTP/1.1.
290+
fn default_http_client() -> reqwest::Client {
291+
reqwest::Client::builder()
292+
.pool_max_idle_per_host(0)
293+
.build()
294+
.expect("failed to build default reqwest client")
281295
}
282296
}
283297

crates/rmcp/src/transport/streamable_http_client.rs

Lines changed: 63 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,39 @@ impl<C: StreamableHttpClient> StreamableHttpClientWorker<C> {
281281
}
282282

283283
impl<C: StreamableHttpClient> StreamableHttpClientWorker<C> {
284+
/// Convert a raw SSE stream into a JSON-RPC message stream without
285+
/// reconnection logic. Used for per-request POST SSE responses where
286+
/// we close the stream after the first response and want the underlying
287+
/// HTTP connection to be returned to the pool promptly.
288+
fn raw_sse_to_jsonrpc(
289+
stream: BoxedSseStream,
290+
) -> impl Stream<Item = Result<ServerJsonRpcMessage, StreamableHttpError<C::Error>>> + Send + 'static
291+
{
292+
stream.filter_map(|event| async {
293+
match event {
294+
Err(e) => Some(Err(StreamableHttpError::Sse(e))),
295+
Ok(sse) => {
296+
let is_message =
297+
matches!(sse.event.as_deref(), None | Some("") | Some("message"));
298+
if !is_message {
299+
return None;
300+
}
301+
let data = sse.data?;
302+
if data.trim().is_empty() {
303+
return None;
304+
}
305+
match serde_json::from_str::<ServerJsonRpcMessage>(&data) {
306+
Ok(msg) => Some(Ok(msg)),
307+
Err(e) => {
308+
tracing::debug!("failed to deserialize server message: {e}");
309+
None
310+
}
311+
}
312+
}
313+
}
314+
})
315+
}
316+
284317
async fn execute_sse_stream(
285318
sse_stream: impl Stream<Item = Result<ServerJsonRpcMessage, StreamableHttpError<C::Error>>>
286319
+ Send
@@ -303,14 +336,26 @@ impl<C: StreamableHttpClient> StreamableHttpClientWorker<C> {
303336
let Some(message) = message.transpose()? else {
304337
break;
305338
};
306-
let is_response = matches!(message, ServerJsonRpcMessage::Response(_));
339+
let is_response = matches!(
340+
message,
341+
ServerJsonRpcMessage::Response(_) | ServerJsonRpcMessage::Error(_)
342+
);
307343
let yield_result = sse_worker_tx.send(message).await;
308344
if yield_result.is_err() {
309345
tracing::trace!("streamable http transport worker dropped, exiting");
310346
break;
311347
}
312348
if close_on_response && is_response {
313-
tracing::debug!("got response, closing sse stream");
349+
tracing::debug!("got response, draining sse stream for connection reuse");
350+
// Drain remaining stream bytes so the HTTP/1.1 connection can
351+
// be returned to the pool instead of being discarded. The
352+
// server closes the channel shortly after sending the response,
353+
// so this normally completes in microseconds on localhost. The
354+
// timeout guards against servers that keep the stream open.
355+
let _ = tokio::time::timeout(std::time::Duration::from_millis(50), async {
356+
while sse_stream.next().await.is_some() {}
357+
})
358+
.await;
314359
break;
315360
}
316361
}
@@ -718,38 +763,12 @@ impl<C: StreamableHttpClient> Worker for StreamableHttpClientWorker<C> {
718763
Ok(())
719764
}
720765
Ok(StreamableHttpPostResponse::Sse(stream, ..)) => {
721-
if let Some(sid) = &session_id {
722-
let sse_stream = SseAutoReconnectStream::new(
723-
stream,
724-
StreamableHttpClientReconnect {
725-
client: self.client.clone(),
726-
session_id: sid.clone(),
727-
uri: config.uri.clone(),
728-
auth_header: config.auth_header.clone(),
729-
custom_headers: protocol_headers
730-
.clone(),
731-
},
732-
self.config.retry_config.clone(),
733-
);
734-
streams.spawn(Self::execute_sse_stream(
735-
sse_stream,
736-
sse_worker_tx.clone(),
737-
true,
738-
transport_task_ct.child_token(),
739-
));
740-
} else {
741-
let sse_stream =
742-
SseAutoReconnectStream::never_reconnect(
743-
stream,
744-
StreamableHttpError::<C::Error>::UnexpectedEndOfStream,
745-
);
746-
streams.spawn(Self::execute_sse_stream(
747-
sse_stream,
748-
sse_worker_tx.clone(),
749-
true,
750-
transport_task_ct.child_token(),
751-
));
752-
}
766+
streams.spawn(Self::execute_sse_stream(
767+
Self::raw_sse_to_jsonrpc(stream),
768+
sse_worker_tx.clone(),
769+
true,
770+
transport_task_ct.child_token(),
771+
));
753772
tracing::trace!("got new sse stream after re-init");
754773
Ok(())
755774
}
@@ -769,36 +788,16 @@ impl<C: StreamableHttpClient> Worker for StreamableHttpClientWorker<C> {
769788
Ok(())
770789
}
771790
Ok(StreamableHttpPostResponse::Sse(stream, ..)) => {
772-
if let Some(session_id) = &session_id {
773-
let sse_stream = SseAutoReconnectStream::new(
774-
stream,
775-
StreamableHttpClientReconnect {
776-
client: self.client.clone(),
777-
session_id: session_id.clone(),
778-
uri: config.uri.clone(),
779-
auth_header: config.auth_header.clone(),
780-
custom_headers: protocol_headers.clone(),
781-
},
782-
self.config.retry_config.clone(),
783-
);
784-
streams.spawn(Self::execute_sse_stream(
785-
sse_stream,
786-
sse_worker_tx.clone(),
787-
true,
788-
transport_task_ct.child_token(),
789-
));
790-
} else {
791-
let sse_stream = SseAutoReconnectStream::never_reconnect(
792-
stream,
793-
StreamableHttpError::<C::Error>::UnexpectedEndOfStream,
794-
);
795-
streams.spawn(Self::execute_sse_stream(
796-
sse_stream,
797-
sse_worker_tx.clone(),
798-
true,
799-
transport_task_ct.child_token(),
800-
));
801-
}
791+
// Per-request POST SSE streams use a thin
792+
// adapter instead of SseAutoReconnectStream so
793+
// the stream ends immediately when the server
794+
// closes the channel, enabling connection reuse.
795+
streams.spawn(Self::execute_sse_stream(
796+
Self::raw_sse_to_jsonrpc(stream),
797+
sse_worker_tx.clone(),
798+
true,
799+
transport_task_ct.child_token(),
800+
));
802801
tracing::trace!("got new sse stream");
803802
Ok(())
804803
}

crates/rmcp/src/transport/streamable_http_server/session/local.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -479,7 +479,7 @@ impl LocalSessionWorker {
479479
{
480480
OutboundChannel::RequestWise {
481481
id: *id,
482-
close: false,
482+
close: true,
483483
}
484484
} else {
485485
OutboundChannel::Common
@@ -492,7 +492,7 @@ impl LocalSessionWorker {
492492
{
493493
OutboundChannel::RequestWise {
494494
id: *id,
495-
close: false,
495+
close: true,
496496
}
497497
} else {
498498
OutboundChannel::Common
@@ -510,7 +510,11 @@ impl LocalSessionWorker {
510510
if let Some(request_wise) = self.tx_router.get_mut(&id) {
511511
request_wise.tx.send(message).await;
512512
if close {
513-
self.tx_router.remove(&id);
513+
if let Some(channel) = self.tx_router.remove(&id) {
514+
for resource in channel.resources {
515+
self.resource_router.remove(&resource);
516+
}
517+
}
514518
}
515519
} else {
516520
return Err(SessionError::ChannelClosed(Some(id)));
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
#![cfg(all(
2+
feature = "transport-streamable-http-client",
3+
feature = "transport-streamable-http-client-reqwest",
4+
feature = "transport-streamable-http-server",
5+
not(feature = "local")
6+
))]
7+
8+
use std::time::Instant;
9+
10+
use rmcp::{
11+
ServerHandler, ServiceExt,
12+
handler::server::{router::tool::ToolRouter, wrapper::Parameters},
13+
model::{CallToolRequestParams, ClientInfo, ServerCapabilities, ServerInfo},
14+
schemars, tool, tool_handler, tool_router,
15+
transport::{
16+
StreamableHttpClientTransport,
17+
streamable_http_client::StreamableHttpClientTransportConfig,
18+
streamable_http_server::{
19+
StreamableHttpServerConfig, StreamableHttpService, session::local::LocalSessionManager,
20+
},
21+
},
22+
};
23+
use tokio_util::sync::CancellationToken;
24+
25+
#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
26+
struct SumRequest {
27+
a: i32,
28+
b: i32,
29+
}
30+
31+
#[derive(Debug, Clone)]
32+
struct EchoServer {
33+
tool_router: ToolRouter<Self>,
34+
}
35+
36+
impl EchoServer {
37+
fn new() -> Self {
38+
Self {
39+
tool_router: Self::tool_router(),
40+
}
41+
}
42+
}
43+
44+
#[tool_router]
45+
impl EchoServer {
46+
#[tool(description = "Sum two numbers")]
47+
fn sum(&self, Parameters(SumRequest { a, b }): Parameters<SumRequest>) -> String {
48+
(a + b).to_string()
49+
}
50+
}
51+
52+
#[tool_handler(router = self.tool_router)]
53+
impl ServerHandler for EchoServer {
54+
fn get_info(&self) -> ServerInfo {
55+
ServerInfo::new(ServerCapabilities::builder().enable_tools().build())
56+
}
57+
}
58+
59+
/// Verify that subsequent tool calls do not regress in latency due to
60+
/// HTTP/1.1 connection pool exhaustion. Before the fix, each POST SSE
61+
/// response was dropped without fully consuming the body, preventing
62+
/// connection reuse and forcing a new TCP connection (~40 ms) per call.
63+
#[tokio::test]
64+
async fn test_subsequent_tool_calls_reuse_connections() -> anyhow::Result<()> {
65+
let ct = CancellationToken::new();
66+
67+
let service: StreamableHttpService<EchoServer, LocalSessionManager> =
68+
StreamableHttpService::new(
69+
|| Ok(EchoServer::new()),
70+
Default::default(),
71+
StreamableHttpServerConfig::default()
72+
.with_sse_keep_alive(None)
73+
.with_cancellation_token(ct.child_token()),
74+
);
75+
76+
let router = axum::Router::new().nest_service("/mcp", service);
77+
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
78+
let addr = listener.local_addr()?;
79+
80+
let server_handle = tokio::spawn({
81+
let ct = ct.clone();
82+
async move {
83+
let _ = axum::serve(listener, router)
84+
.with_graceful_shutdown(async move { ct.cancelled_owned().await })
85+
.await;
86+
}
87+
});
88+
89+
let transport = StreamableHttpClientTransport::from_config(
90+
StreamableHttpClientTransportConfig::with_uri(format!("http://{addr}/mcp")),
91+
);
92+
let client = ClientInfo::default().serve(transport).await?;
93+
94+
// Warm up: first call may include one-time setup costs.
95+
let args: serde_json::Map<String, serde_json::Value> =
96+
serde_json::from_value(serde_json::json!({"a": 1, "b": 2}))?;
97+
let _ = client
98+
.call_tool(CallToolRequestParams::new("sum").with_arguments(args))
99+
.await?;
100+
101+
// Measure subsequent calls.
102+
let mut durations = Vec::new();
103+
for i in 0..5i32 {
104+
let args: serde_json::Map<String, serde_json::Value> =
105+
serde_json::from_value(serde_json::json!({"a": i, "b": i + 1}))?;
106+
let start = Instant::now();
107+
let result = client
108+
.call_tool(CallToolRequestParams::new("sum").with_arguments(args))
109+
.await?;
110+
let elapsed = start.elapsed();
111+
durations.push(elapsed);
112+
113+
assert!(
114+
result.is_error != Some(true),
115+
"tool call should succeed, got error: {:?}",
116+
result.content
117+
);
118+
}
119+
120+
let _ = client.cancel().await;
121+
ct.cancel();
122+
server_handle.await?;
123+
124+
// With connection reuse, localhost calls should complete well under 20 ms.
125+
// Before the fix, they consistently took ~42 ms due to new TCP connections.
126+
let max_allowed = std::time::Duration::from_millis(20);
127+
for (i, d) in durations.iter().enumerate() {
128+
assert!(
129+
*d < max_allowed,
130+
"call {} took {:?}, expected < {:?} (connection reuse may be broken)",
131+
i + 1,
132+
d,
133+
max_allowed,
134+
);
135+
}
136+
137+
Ok(())
138+
}

0 commit comments

Comments
 (0)