Skip to content

Commit 3ec3a2a

Browse files
committed
fix: add init_timeout for streamable-http sessions
1 parent 020a38b commit 3ec3a2a

2 files changed

Lines changed: 90 additions & 7 deletions

File tree

crates/rmcp/src/transport/streamable_http_server/session/local.rs

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -916,6 +916,8 @@ pub enum LocalSessionWorkerError {
916916
FailToHandleMessage(SessionError),
917917
#[error("keep alive timeout after {}ms", _0.as_millis())]
918918
KeepAliveTimeout(Duration),
919+
#[error("init timeout after {}ms", _0.as_millis())]
920+
InitTimeout(Duration),
919921
#[error("Transport closed")]
920922
TransportClosed,
921923
#[error("Tokio join error {0}")]
@@ -945,13 +947,25 @@ impl Worker for LocalSessionWorker {
945947
FromHttpService(SessionEvent),
946948
FromHandler(WorkerSendRequest<LocalSessionWorker>),
947949
}
948-
// waiting for initialize request
949-
let evt = self.event_rx.recv().await.ok_or_else(|| {
950-
WorkerQuitReason::fatal(
951-
LocalSessionWorkerError::TransportTerminated,
952-
"get initialize request",
953-
)
954-
})?;
950+
let init_timeout = self.session_config.init_timeout.unwrap_or(Duration::MAX);
951+
let init_timeout_sleep = tokio::time::sleep(init_timeout);
952+
let evt = tokio::select! {
953+
evt = self.event_rx.recv() => evt.ok_or_else(|| {
954+
WorkerQuitReason::fatal(
955+
LocalSessionWorkerError::TransportTerminated,
956+
"get initialize request",
957+
)
958+
})?,
959+
_ = context.cancellation_token.cancelled() => {
960+
return Err(WorkerQuitReason::Cancelled);
961+
}
962+
_ = init_timeout_sleep => {
963+
return Err(WorkerQuitReason::fatal(
964+
LocalSessionWorkerError::InitTimeout(init_timeout),
965+
"waiting for initialize request",
966+
));
967+
}
968+
};
955969
let SessionEvent::InitializeRequest { request, responder } = evt else {
956970
return Err(WorkerQuitReason::fatal(
957971
LocalSessionWorkerError::UnexpectedEvent(evt),
@@ -1108,13 +1122,18 @@ pub struct SessionConfig {
11081122
/// resume requests. After this duration, completed entries are evicted
11091123
/// and resume will return an error. Default is 60 seconds.
11101124
pub completed_cache_ttl: Duration,
1125+
/// Maximum duration to wait for the `initialize` request after session
1126+
/// creation. If not received within this window, the session is
1127+
/// terminated. Default is 60 seconds. Set to `None` to disable.
1128+
pub init_timeout: Option<Duration>,
11111129
}
11121130

11131131
impl SessionConfig {
11141132
pub const DEFAULT_CHANNEL_CAPACITY: usize = 16;
11151133
pub const DEFAULT_KEEP_ALIVE: Duration = Duration::from_secs(300);
11161134
pub const DEFAULT_SSE_RETRY: Duration = Duration::from_secs(3);
11171135
pub const DEFAULT_COMPLETED_CACHE_TTL: Duration = Duration::from_secs(60);
1136+
pub const DEFAULT_INIT_TIMEOUT: Duration = Duration::from_secs(60);
11181137
}
11191138

11201139
impl Default for SessionConfig {
@@ -1124,6 +1143,7 @@ impl Default for SessionConfig {
11241143
keep_alive: Some(Self::DEFAULT_KEEP_ALIVE),
11251144
sse_retry: Some(Self::DEFAULT_SSE_RETRY),
11261145
completed_cache_ttl: Self::DEFAULT_COMPLETED_CACHE_TTL,
1146+
init_timeout: Some(Self::DEFAULT_INIT_TIMEOUT),
11271147
}
11281148
}
11291149
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
#![cfg(all(feature = "transport-streamable-http-server", not(feature = "local")))]
2+
3+
use std::time::Duration;
4+
5+
use rmcp::{
6+
model::{ClientJsonRpcMessage, ClientRequest, PingRequest, RequestId},
7+
transport::streamable_http_server::session::{SessionManager, local::LocalSessionManager},
8+
};
9+
10+
#[tokio::test]
11+
async fn test_init_timeout_terminates_pre_init_session() -> anyhow::Result<()> {
12+
let mut manager = LocalSessionManager::default();
13+
manager.session_config.init_timeout = Some(Duration::from_millis(200));
14+
15+
// Bind the transport so its drop-guard doesn't cancel the worker — we
16+
// want termination via init_timeout, not via cancellation.
17+
let (session_id, _transport) = manager.create_session().await?;
18+
19+
tokio::time::sleep(Duration::from_millis(500)).await;
20+
21+
let message = ClientJsonRpcMessage::request(
22+
ClientRequest::PingRequest(PingRequest::default()),
23+
RequestId::Number(1),
24+
);
25+
let result = manager.initialize_session(&session_id, message).await;
26+
27+
assert!(
28+
result.is_err(),
29+
"expected worker to be dead; got: {result:?}"
30+
);
31+
32+
Ok(())
33+
}
34+
35+
#[tokio::test]
36+
async fn test_init_timeout_none_keeps_worker_alive() -> anyhow::Result<()> {
37+
let mut manager = LocalSessionManager::default();
38+
manager.session_config.init_timeout = None;
39+
40+
let (session_id, _transport) = manager.create_session().await?;
41+
42+
tokio::time::sleep(Duration::from_millis(500)).await;
43+
44+
let message = ClientJsonRpcMessage::request(
45+
ClientRequest::PingRequest(PingRequest::default()),
46+
RequestId::Number(1),
47+
);
48+
// Liveness probe: a live worker accepts the send then stalls waiting for
49+
// a handler response (none is wired up), tripping the outer timeout. A
50+
// dead worker would fail the send and return immediately.
51+
let probe = tokio::time::timeout(
52+
Duration::from_millis(200),
53+
manager.initialize_session(&session_id, message),
54+
)
55+
.await;
56+
57+
assert!(
58+
probe.is_err(),
59+
"expected worker to be alive; got: {probe:?}"
60+
);
61+
62+
Ok(())
63+
}

0 commit comments

Comments
 (0)