Skip to content

Commit 0b19ab1

Browse files
committed
test: add resume after completion test
1 parent 31e21af commit 0b19ab1

1 file changed

Lines changed: 101 additions & 0 deletions

File tree

crates/rmcp/tests/test_streamable_http_priming.rs

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,107 @@ async fn test_request_wise_priming_includes_http_request_id() -> anyhow::Result<
197197
Ok(())
198198
}
199199

200+
#[tokio::test]
201+
async fn test_resume_after_request_wise_channel_completed() -> anyhow::Result<()> {
202+
let ct = CancellationToken::new();
203+
204+
let service: StreamableHttpService<Calculator, LocalSessionManager> =
205+
StreamableHttpService::new(
206+
|| Ok(Calculator::new()),
207+
Default::default(),
208+
StreamableHttpServerConfig::default()
209+
.with_sse_keep_alive(None)
210+
.with_cancellation_token(ct.child_token()),
211+
);
212+
213+
let router = axum::Router::new().nest_service("/mcp", service);
214+
let tcp_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
215+
let addr = tcp_listener.local_addr()?;
216+
217+
let handle = tokio::spawn({
218+
let ct = ct.clone();
219+
async move {
220+
let _ = axum::serve(tcp_listener, router)
221+
.with_graceful_shutdown(async move { ct.cancelled_owned().await })
222+
.await;
223+
}
224+
});
225+
226+
let client = reqwest::Client::new();
227+
228+
// Initialize session
229+
let response = client
230+
.post(format!("http://{addr}/mcp"))
231+
.header("Content-Type", "application/json")
232+
.header("Accept", "application/json, text/event-stream")
233+
.body(r#"{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2025-06-18","capabilities":{},"clientInfo":{"name":"test","version":"1.0"}}}"#)
234+
.send()
235+
.await?;
236+
assert_eq!(response.status(), 200);
237+
let session_id: SessionId = response.headers()["mcp-session-id"].to_str()?.into();
238+
239+
// Complete handshake
240+
let status = client
241+
.post(format!("http://{addr}/mcp"))
242+
.header("Content-Type", "application/json")
243+
.header("Accept", "application/json, text/event-stream")
244+
.header("mcp-session-id", session_id.to_string())
245+
.header("Mcp-Protocol-Version", "2025-06-18")
246+
.body(r#"{"jsonrpc":"2.0","method":"notifications/initialized"}"#)
247+
.send()
248+
.await?
249+
.status();
250+
assert_eq!(status, 202);
251+
252+
// Call a tool and consume the full response (channel completes)
253+
let body = client
254+
.post(format!("http://{addr}/mcp"))
255+
.header("Content-Type", "application/json")
256+
.header("Accept", "application/json, text/event-stream")
257+
.header("mcp-session-id", session_id.to_string())
258+
.header("Mcp-Protocol-Version", "2025-06-18")
259+
.body(r#"{"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"sum","arguments":{"a":1,"b":2}}}"#)
260+
.send()
261+
.await?
262+
.text()
263+
.await?;
264+
265+
let events: Vec<&str> = body.split("\n\n").filter(|e| !e.is_empty()).collect();
266+
assert!(
267+
events.len() >= 2,
268+
"expected priming + response, got: {body}"
269+
);
270+
assert!(events[0].contains("id: 0/0"));
271+
assert!(events[1].contains(r#""id":2"#));
272+
273+
// Resume with Last-Event-ID after the channel has completed.
274+
// The server returns 200 — either with replayed cached events
275+
// (if the channel is still retained) or an empty stream (if the
276+
// session worker hasn't processed the completion yet).
277+
let resume = client
278+
.get(format!("http://{addr}/mcp"))
279+
.header("Accept", "text/event-stream")
280+
.header("mcp-session-id", session_id.to_string())
281+
.header("Mcp-Protocol-Version", "2025-06-18")
282+
.header("last-event-id", "0/0")
283+
.send()
284+
.await?;
285+
assert_eq!(resume.status(), 200);
286+
287+
let resume_body = resume.text().await?;
288+
// The stream should complete (not hang), regardless of whether
289+
// it contains replayed events or is empty.
290+
assert!(
291+
!resume_body.contains("standalone"),
292+
"should not receive events from a different stream"
293+
);
294+
295+
ct.cancel();
296+
handle.await?;
297+
298+
Ok(())
299+
}
300+
200301
#[tokio::test]
201302
async fn test_completed_cache_ttl_eviction() -> anyhow::Result<()> {
202303
use std::sync::Arc;

0 commit comments

Comments
 (0)