Skip to content

Commit 750332d

Browse files
committed
test: add long-running tool resume tests
1 parent d4ea2e9 commit 750332d

2 files changed

Lines changed: 43 additions & 39 deletions

File tree

crates/rmcp/src/transport/streamable_http_server/session/local.rs

Lines changed: 21 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -356,29 +356,27 @@ pub struct StreamableHttpMessageReceiver {
356356

357357
impl LocalSessionWorker {
358358
fn unregister_resource(&mut self, resource: &ResourceKey) {
359-
if let Some(http_request_id) = self.resource_router.remove(resource) {
360-
tracing::trace!(?resource, http_request_id, "unregister resource");
361-
if let Some(channel) = self.tx_router.get_mut(&http_request_id) {
362-
// It's okey to do so, since we don't handle batch json rpc request anymore
363-
// and this can be refactored after the batch request is removed in the coming version.
364-
if channel.resources.is_empty() || matches!(resource, ResourceKey::McpRequestId(_))
365-
{
366-
tracing::debug!(http_request_id, "close http request wise channel");
367-
if let Some(channel) = self.tx_router.get_mut(&http_request_id) {
368-
for resource in channel.resources.drain() {
369-
self.resource_router.remove(&resource);
370-
}
371-
channel.completed_at = Some(Instant::now());
372-
// Close the sender so the client's SSE stream ends,
373-
// but keep the entry so the cache is available for
374-
// late resume requests.
375-
let (closed_tx, _) = tokio::sync::mpsc::channel(1);
376-
channel.tx.tx = closed_tx;
377-
}
378-
}
379-
} else {
380-
tracing::warn!(http_request_id, "http request wise channel not found");
381-
}
359+
let Some(http_request_id) = self.resource_router.remove(resource) else {
360+
return;
361+
};
362+
tracing::trace!(?resource, http_request_id, "unregister resource");
363+
let Some(channel) = self.tx_router.get_mut(&http_request_id) else {
364+
tracing::warn!(http_request_id, "http request wise channel not found");
365+
return;
366+
};
367+
if !channel.resources.is_empty() && !matches!(resource, ResourceKey::McpRequestId(_)) {
368+
return;
369+
}
370+
tracing::debug!(http_request_id, "close http request wise channel");
371+
let resources: Vec<_> = channel.resources.drain().collect();
372+
channel.completed_at = Some(Instant::now());
373+
// Close the sender so the client's SSE stream ends,
374+
// but keep the entry so the cache is available for
375+
// late resume requests.
376+
let (closed_tx, _) = tokio::sync::mpsc::channel(1);
377+
channel.tx.tx = closed_tx;
378+
for resource in resources {
379+
self.resource_router.remove(&resource);
382380
}
383381
}
384382
fn register_resource(&mut self, resource: ResourceKey, http_request_id: HttpRequestId) {

crates/rmcp/tests/test_streamable_http_priming.rs

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -270,34 +270,40 @@ async fn test_resume_after_request_wise_channel_completed() -> anyhow::Result<()
270270
assert!(events[0].contains("id: 0/0"));
271271
assert!(events[1].contains(r#""id":2"#));
272272

273+
// Allow the session worker to finish processing the response
274+
// and mark the channel as completed before resuming.
275+
tokio::time::sleep(Duration::from_millis(100)).await;
276+
273277
// Resume with Last-Event-ID after the channel has completed.
274-
// The cached events should be replayed and the stream should end.
278+
// The cached events should be replayed.
275279
let resume_response = client
276280
.get(format!("http://{addr}/mcp"))
277281
.header("Accept", "text/event-stream")
278282
.header("mcp-session-id", session_id.to_string())
279283
.header("Mcp-Protocol-Version", "2025-06-18")
280284
.header("last-event-id", "0/0")
281-
.timeout(std::time::Duration::from_secs(5))
282285
.send()
283286
.await?;
284287
assert_eq!(resume_response.status(), 200);
285288

286-
let resume_body = resume_response.text().await?;
287-
let resume_events: Vec<&str> = resume_body
288-
.split("\n\n")
289-
.filter(|e| !e.is_empty())
290-
.collect();
291-
assert!(
292-
!resume_events.is_empty(),
293-
"expected replayed events on resume, got empty"
294-
);
295-
296-
// The replayed event should contain the original response
297-
let replayed = resume_events[0];
289+
// Read incrementally — the resumed stream may stay open as a
290+
// standalone fallback, so we read just enough to verify replay.
291+
use futures::StreamExt;
292+
let mut stream = resume_response.bytes_stream();
293+
let mut buf = String::new();
294+
let read_result = tokio::time::timeout(Duration::from_secs(10), async {
295+
while let Some(Ok(chunk)) = stream.next().await {
296+
buf.push_str(&String::from_utf8_lossy(&chunk));
297+
if buf.contains(r#""id":2"#) {
298+
return true;
299+
}
300+
}
301+
false
302+
})
303+
.await;
298304
assert!(
299-
replayed.contains(r#""id":2"#),
300-
"replayed event should contain the tool response, got: {replayed}"
305+
read_result.unwrap_or(false),
306+
"expected replayed tool response on resume, got: {buf}"
301307
);
302308

303309
ct.cancel();

0 commit comments

Comments
 (0)