Skip to content

Commit 84db305

Browse files
committed
feat: optional session store
1 parent 52c93e9 commit 84db305

File tree

7 files changed

+908
-43
lines changed

7 files changed

+908
-43
lines changed

crates/rmcp/Cargo.toml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,3 +307,12 @@ required-features = [
307307
]
308308
path = "tests/test_streamable_http_stale_session.rs"
309309

310+
[[test]]
311+
name = "test_streamable_http_session_store"
312+
required-features = [
313+
"client",
314+
"server",
315+
"transport-streamable-http-client-reqwest",
316+
"transport-streamable-http-server",
317+
]
318+
path = "tests/test_streamable_http_session_store.rs"
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
pub mod session;
22
#[cfg(all(feature = "transport-streamable-http-server", not(feature = "local")))]
33
pub mod tower;
4-
pub use session::{SessionId, SessionManager};
4+
pub use session::{RestoreOutcome, SessionId, SessionManager, SessionRestoreMarker};
55
#[cfg(all(feature = "transport-streamable-http-server", not(feature = "local")))]
66
pub use tower::{StreamableHttpServerConfig, StreamableHttpService};

crates/rmcp/src/transport/streamable_http_server/session.rs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,39 @@ use crate::{
3030

3131
pub mod local;
3232
pub mod never;
33+
pub mod store;
34+
35+
pub use store::{SessionState, SessionStore, SessionStoreError};
36+
37+
/// Extension marker inserted into the `initialize` request extensions during a
38+
/// session restore replay. Handlers can check for its presence to distinguish a
39+
/// cross-instance restore from a genuine client-initiated `initialize` request.
40+
///
41+
/// ```rust,ignore
42+
/// if req.extensions().get::<SessionRestoreMarker>().is_some() {
43+
/// // this is a restore replay, not a fresh client connection
44+
/// }
45+
/// ```
46+
#[derive(Debug, Clone)]
47+
pub struct SessionRestoreMarker {
48+
pub id: SessionId,
49+
}
50+
51+
/// The outcome of a [`SessionManager::restore_session`] call.
52+
#[derive(Debug)]
53+
pub enum RestoreOutcome<T> {
54+
/// The session was just re-created from external state; the caller must
55+
/// spawn an MCP handler against the returned transport and replay the
56+
/// `initialize` handshake.
57+
Restored(T),
58+
/// The session was already present in memory (e.g. a concurrent request
59+
/// already restored it). The caller should proceed as if `has_session`
60+
/// had returned `true` — no further action is required.
61+
AlreadyPresent,
62+
/// This session manager does not support external-store restore.
63+
/// The caller should fall through to the normal 404 response.
64+
NotSupported,
65+
}
3366

3467
/// Controls how MCP sessions are created, validated, and closed.
3568
///
@@ -98,4 +131,22 @@ pub trait SessionManager: Send + Sync + 'static {
98131
) -> impl Future<
99132
Output = Result<impl Stream<Item = ServerSseMessage> + Send + Sync + 'static, Self::Error>,
100133
> + Send;
134+
135+
/// Attempt to restore a previously-known session from external state,
136+
/// creating a fresh in-memory session worker with the given `id`.
137+
///
138+
/// See [`RestoreOutcome`] for the three possible results:
139+
/// - [`RestoreOutcome::Restored`] — session re-created; caller must spawn
140+
/// an MCP handler and replay the `initialize` handshake.
141+
/// - [`RestoreOutcome::AlreadyPresent`] — session is already in memory
142+
/// (e.g. a concurrent request restored it first); caller proceeds
143+
/// normally.
144+
/// - [`RestoreOutcome::NotSupported`] (default) — this session manager
145+
/// does not support external-store restore; caller returns 404.
146+
fn restore_session(
147+
&self,
148+
_id: SessionId,
149+
) -> impl Future<Output = Result<RestoreOutcome<Self::Transport>, Self::Error>> + Send {
150+
futures::future::ready(Ok(RestoreOutcome::NotSupported))
151+
}
101152
}

crates/rmcp/src/transport/streamable_http_server/session/local.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,20 @@ impl SessionManager for LocalSessionManager {
130130
handle.push_message(message, None).await?;
131131
Ok(())
132132
}
133+
134+
async fn restore_session(
135+
&self,
136+
id: SessionId,
137+
) -> Result<RestoreOutcome<Self::Transport>, Self::Error> {
138+
let mut sessions = self.sessions.write().await;
139+
if sessions.contains_key(&id) {
140+
// A concurrent request already restored this session.
141+
return Ok(RestoreOutcome::AlreadyPresent);
142+
}
143+
let (handle, worker) = create_local_session(id.clone(), self.session_config.clone());
144+
sessions.insert(id, handle);
145+
Ok(RestoreOutcome::Restored(WorkerTransport::spawn(worker)))
146+
}
133147
}
134148

135149
/// `<index>/request_id>`
@@ -182,7 +196,7 @@ impl std::str::FromStr for EventId {
182196
}
183197
}
184198

185-
use super::{ServerSseMessage, SessionManager};
199+
use super::{RestoreOutcome, ServerSseMessage, SessionManager};
186200

187201
struct CachedTx {
188202
tx: Sender<ServerSseMessage>,
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
use crate::model::InitializeRequestParams;
2+
3+
/// State persisted to an external store for cross-instance session recovery.
4+
///
5+
/// When a client reconnects to a different server instance, the new instance
6+
/// loads this state to transparently replay the `initialize` handshake without
7+
/// the client needing to re-initialize.
8+
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
9+
pub struct SessionState {
10+
/// Parameters from the client's original `initialize` request.
11+
pub initialize_params: InitializeRequestParams,
12+
}
13+
14+
/// Type alias for boxed session store errors.
15+
pub type SessionStoreError = Box<dyn std::error::Error + Send + Sync + 'static>;
16+
17+
/// Pluggable external session store for cross-instance recovery.
18+
///
19+
/// Implement this trait to back sessions with Redis, a database, or any
20+
/// key-value store. The simplest usage is to set
21+
/// [`StreamableHttpServerConfig::session_store`] to an `Arc<impl SessionStore>`.
22+
///
23+
/// # Example (in-memory, for testing)
24+
///
25+
/// ```rust,ignore
26+
/// use std::{collections::HashMap, sync::Arc};
27+
/// use tokio::sync::RwLock;
28+
/// use rmcp::transport::streamable_http_server::session::store::{
29+
/// SessionState, SessionStore, SessionStoreError,
30+
/// };
31+
///
32+
/// #[derive(Default)]
33+
/// struct InMemoryStore(Arc<RwLock<HashMap<String, SessionState>>>);
34+
///
35+
/// #[async_trait::async_trait]
36+
/// impl SessionStore for InMemoryStore {
37+
/// async fn load(&self, id: &str) -> Result<Option<SessionState>, SessionStoreError> {
38+
/// Ok(self.0.read().await.get(id).cloned())
39+
/// }
40+
/// async fn store(&self, id: &str, state: &SessionState) -> Result<(), SessionStoreError> {
41+
/// self.0.write().await.insert(id.to_owned(), state.clone());
42+
/// Ok(())
43+
/// }
44+
/// async fn delete(&self, id: &str) -> Result<(), SessionStoreError> {
45+
/// self.0.write().await.remove(id);
46+
/// Ok(())
47+
/// }
48+
/// }
49+
/// ```
50+
#[async_trait::async_trait]
51+
pub trait SessionStore: Send + Sync + 'static {
52+
/// Load session state for the given `session_id`.
53+
///
54+
/// Returns `Ok(None)` when no entry exists (i.e. session is unknown to the store).
55+
async fn load(&self, session_id: &str) -> Result<Option<SessionState>, SessionStoreError>;
56+
57+
/// Persist session state for the given `session_id`.
58+
async fn store(&self, session_id: &str, state: &SessionState) -> Result<(), SessionStoreError>;
59+
60+
/// Remove session state for the given `session_id`.
61+
async fn delete(&self, session_id: &str) -> Result<(), SessionStoreError>;
62+
}

0 commit comments

Comments
 (0)