Skip to content

Commit 1fe5d1e

Browse files
fix(streamable-http): map stale session 401 to status-aware error (#709)
* fix(streamable-http): map stale session 401 to status-aware error * test(streamable-http): expect 404 for stale session
1 parent 28beb95 commit 1fe5d1e

File tree

2 files changed

+106
-0
lines changed

2 files changed

+106
-0
lines changed

crates/rmcp/src/transport/common/reqwest/streamable_http_client.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,15 @@ impl StreamableHttpClient for reqwest::Client {
186186
) {
187187
return Ok(StreamableHttpPostResponse::Accepted);
188188
}
189+
if !status.is_success() {
190+
let body = response
191+
.text()
192+
.await
193+
.unwrap_or_else(|_| "<failed to read response body>".to_owned());
194+
return Err(StreamableHttpError::UnexpectedServerResponse(Cow::Owned(
195+
format!("HTTP {status}: {body}"),
196+
)));
197+
}
189198
let content_type = response.headers().get(reqwest::header::CONTENT_TYPE);
190199
let session_id = response.headers().get(HEADER_SESSION_ID);
191200
let session_id = session_id
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
#![cfg(all(
2+
feature = "transport-streamable-http-client",
3+
feature = "transport-streamable-http-client-reqwest",
4+
feature = "transport-streamable-http-server"
5+
))]
6+
7+
use std::{collections::HashMap, sync::Arc};
8+
9+
use rmcp::{
10+
model::{ClientJsonRpcMessage, ClientRequest, PingRequest, RequestId},
11+
transport::{
12+
streamable_http_client::{StreamableHttpClient, StreamableHttpError},
13+
streamable_http_server::{
14+
StreamableHttpServerConfig, StreamableHttpService, session::local::LocalSessionManager,
15+
},
16+
},
17+
};
18+
use tokio_util::sync::CancellationToken;
19+
20+
mod common;
21+
use common::calculator::Calculator;
22+
23+
#[tokio::test]
24+
async fn test_stale_session_id_returns_status_aware_error() -> anyhow::Result<()> {
25+
let ct = CancellationToken::new();
26+
let service: StreamableHttpService<Calculator, LocalSessionManager> =
27+
StreamableHttpService::new(
28+
|| Ok(Calculator::new()),
29+
Default::default(),
30+
StreamableHttpServerConfig {
31+
stateful_mode: true,
32+
sse_keep_alive: None,
33+
cancellation_token: ct.child_token(),
34+
..Default::default()
35+
},
36+
);
37+
38+
let router = axum::Router::new().nest_service("/mcp", service);
39+
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
40+
let addr = listener.local_addr()?;
41+
42+
let handle = tokio::spawn({
43+
let ct = ct.clone();
44+
async move {
45+
let _ = axum::serve(listener, router)
46+
.with_graceful_shutdown(async move { ct.cancelled_owned().await })
47+
.await;
48+
}
49+
});
50+
51+
let uri = Arc::<str>::from(format!("http://{addr}/mcp"));
52+
let message = ClientJsonRpcMessage::request(
53+
ClientRequest::PingRequest(PingRequest::default()),
54+
RequestId::Number(1),
55+
);
56+
57+
let client = reqwest::Client::new();
58+
let result = client
59+
.post_message(
60+
uri.clone(),
61+
message,
62+
Some(Arc::from("stale-session-id")),
63+
None,
64+
HashMap::new(),
65+
)
66+
.await;
67+
68+
let raw_response = reqwest::Client::new()
69+
.post(uri.as_ref())
70+
.header("accept", "application/json, text/event-stream")
71+
.header("content-type", "application/json")
72+
.header("mcp-session-id", "stale-session-id")
73+
.body(r#"{"jsonrpc":"2.0","id":1,"method":"ping","params":{}}"#)
74+
.send()
75+
.await?;
76+
77+
assert_eq!(raw_response.status(), reqwest::StatusCode::NOT_FOUND);
78+
match result {
79+
Err(StreamableHttpError::UnexpectedServerResponse(message)) => {
80+
let message = message.to_string();
81+
assert!(
82+
message.contains("404"),
83+
"error should include HTTP status code, got: {message}"
84+
);
85+
assert!(
86+
message.to_ascii_lowercase().contains("session not found"),
87+
"error should include session-not-found hint, got: {message}"
88+
);
89+
}
90+
other => panic!("expected UnexpectedServerResponse, got: {other:?}"),
91+
}
92+
93+
ct.cancel();
94+
handle.await?;
95+
96+
Ok(())
97+
}

0 commit comments

Comments
 (0)