Skip to content

Commit c9ecfe0

Browse files
committed
fix(http): use raw SSE stream for POST responses
The SseAutoReconnectStream wrapper introduces a 3-second reconnect delay (from the priming event retry field) when the inner stream ends, preventing the 50ms drain from completing. Use a thin raw_sse_to_jsonrpc adapter for per-request POST SSE streams instead, so the stream ends immediately when the server closes the channel.
1 parent 114fc84 commit c9ecfe0

1 file changed

Lines changed: 49 additions & 62 deletions

File tree

crates/rmcp/src/transport/streamable_http_client.rs

Lines changed: 49 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,39 @@ impl<C: StreamableHttpClient> StreamableHttpClientWorker<C> {
281281
}
282282

283283
impl<C: StreamableHttpClient> StreamableHttpClientWorker<C> {
284+
/// Convert a raw SSE stream into a JSON-RPC message stream without
285+
/// reconnection logic. Used for per-request POST SSE responses where
286+
/// we close the stream after the first response and want the underlying
287+
/// HTTP connection to be returned to the pool promptly.
288+
fn raw_sse_to_jsonrpc(
289+
stream: BoxedSseStream,
290+
) -> impl Stream<Item = Result<ServerJsonRpcMessage, StreamableHttpError<C::Error>>> + Send + 'static
291+
{
292+
stream.filter_map(|event| async {
293+
match event {
294+
Err(e) => Some(Err(StreamableHttpError::Sse(e))),
295+
Ok(sse) => {
296+
let is_message =
297+
matches!(sse.event.as_deref(), None | Some("") | Some("message"));
298+
if !is_message {
299+
return None;
300+
}
301+
let data = sse.data?;
302+
if data.trim().is_empty() {
303+
return None;
304+
}
305+
match serde_json::from_str::<ServerJsonRpcMessage>(&data) {
306+
Ok(msg) => Some(Ok(msg)),
307+
Err(e) => {
308+
tracing::debug!("failed to deserialize server message: {e}");
309+
None
310+
}
311+
}
312+
}
313+
}
314+
})
315+
}
316+
284317
async fn execute_sse_stream(
285318
sse_stream: impl Stream<Item = Result<ServerJsonRpcMessage, StreamableHttpError<C::Error>>>
286319
+ Send
@@ -730,38 +763,12 @@ impl<C: StreamableHttpClient> Worker for StreamableHttpClientWorker<C> {
730763
Ok(())
731764
}
732765
Ok(StreamableHttpPostResponse::Sse(stream, ..)) => {
733-
if let Some(sid) = &session_id {
734-
let sse_stream = SseAutoReconnectStream::new(
735-
stream,
736-
StreamableHttpClientReconnect {
737-
client: self.client.clone(),
738-
session_id: sid.clone(),
739-
uri: config.uri.clone(),
740-
auth_header: config.auth_header.clone(),
741-
custom_headers: protocol_headers
742-
.clone(),
743-
},
744-
self.config.retry_config.clone(),
745-
);
746-
streams.spawn(Self::execute_sse_stream(
747-
sse_stream,
748-
sse_worker_tx.clone(),
749-
true,
750-
transport_task_ct.child_token(),
751-
));
752-
} else {
753-
let sse_stream =
754-
SseAutoReconnectStream::never_reconnect(
755-
stream,
756-
StreamableHttpError::<C::Error>::UnexpectedEndOfStream,
757-
);
758-
streams.spawn(Self::execute_sse_stream(
759-
sse_stream,
760-
sse_worker_tx.clone(),
761-
true,
762-
transport_task_ct.child_token(),
763-
));
764-
}
766+
streams.spawn(Self::execute_sse_stream(
767+
Self::raw_sse_to_jsonrpc(stream),
768+
sse_worker_tx.clone(),
769+
true,
770+
transport_task_ct.child_token(),
771+
));
765772
tracing::trace!("got new sse stream after re-init");
766773
Ok(())
767774
}
@@ -781,36 +788,16 @@ impl<C: StreamableHttpClient> Worker for StreamableHttpClientWorker<C> {
781788
Ok(())
782789
}
783790
Ok(StreamableHttpPostResponse::Sse(stream, ..)) => {
784-
if let Some(session_id) = &session_id {
785-
let sse_stream = SseAutoReconnectStream::new(
786-
stream,
787-
StreamableHttpClientReconnect {
788-
client: self.client.clone(),
789-
session_id: session_id.clone(),
790-
uri: config.uri.clone(),
791-
auth_header: config.auth_header.clone(),
792-
custom_headers: protocol_headers.clone(),
793-
},
794-
self.config.retry_config.clone(),
795-
);
796-
streams.spawn(Self::execute_sse_stream(
797-
sse_stream,
798-
sse_worker_tx.clone(),
799-
true,
800-
transport_task_ct.child_token(),
801-
));
802-
} else {
803-
let sse_stream = SseAutoReconnectStream::never_reconnect(
804-
stream,
805-
StreamableHttpError::<C::Error>::UnexpectedEndOfStream,
806-
);
807-
streams.spawn(Self::execute_sse_stream(
808-
sse_stream,
809-
sse_worker_tx.clone(),
810-
true,
811-
transport_task_ct.child_token(),
812-
));
813-
}
791+
// Per-request POST SSE streams use a thin
792+
// adapter instead of SseAutoReconnectStream so
793+
// the stream ends immediately when the server
794+
// closes the channel, enabling connection reuse.
795+
streams.spawn(Self::execute_sse_stream(
796+
Self::raw_sse_to_jsonrpc(stream),
797+
sse_worker_tx.clone(),
798+
true,
799+
transport_task_ct.child_token(),
800+
));
814801
tracing::trace!("got new sse stream");
815802
Ok(())
816803
}

0 commit comments

Comments
 (0)