@@ -6,6 +6,7 @@ use http::{Method, Request, Response, header::ALLOW};
66use http_body:: Body ;
77use http_body_util:: { BodyExt , Full , combinators:: BoxBody } ;
88use tokio_stream:: wrappers:: ReceiverStream ;
9+ use tokio_util:: sync:: CancellationToken ;
910
1011use super :: session:: SessionManager ;
1112use crate :: {
@@ -33,13 +34,19 @@ pub struct StreamableHttpServerConfig {
3334 pub sse_keep_alive : Option < Duration > ,
3435 /// If true, the server will create a session for each request and keep it alive.
3536 pub stateful_mode : bool ,
37+ /// Cancellation token for the Streamable HTTP server.
38+ ///
39+ /// When this token is cancelled, all active sessions are terminated and
40+ /// the server stops accepting new requests.
41+ pub cancellation_token : CancellationToken ,
3642}
3743
3844impl Default for StreamableHttpServerConfig {
3945 fn default ( ) -> Self {
4046 Self {
4147 sse_keep_alive : Some ( Duration :: from_secs ( 15 ) ) ,
4248 stateful_mode : true ,
49+ cancellation_token : CancellationToken :: new ( ) ,
4350 }
4451 }
4552}
@@ -209,15 +216,23 @@ where
209216 . resume ( & session_id, last_event_id)
210217 . await
211218 . map_err ( internal_error_response ( "resume session" ) ) ?;
212- Ok ( sse_stream_response ( stream, self . config . sse_keep_alive ) )
219+ Ok ( sse_stream_response (
220+ stream,
221+ self . config . sse_keep_alive ,
222+ self . config . cancellation_token . child_token ( ) ,
223+ ) )
213224 } else {
214225 // create standalone stream
215226 let stream = self
216227 . session_manager
217228 . create_standalone_stream ( & session_id)
218229 . await
219230 . map_err ( internal_error_response ( "create standalone stream" ) ) ?;
220- Ok ( sse_stream_response ( stream, self . config . sse_keep_alive ) )
231+ Ok ( sse_stream_response (
232+ stream,
233+ self . config . sse_keep_alive ,
234+ self . config . cancellation_token . child_token ( ) ,
235+ ) )
221236 }
222237 }
223238
@@ -307,7 +322,11 @@ where
307322 . create_stream ( & session_id, message)
308323 . await
309324 . map_err ( internal_error_response ( "get session" ) ) ?;
310- Ok ( sse_stream_response ( stream, self . config . sse_keep_alive ) )
325+ Ok ( sse_stream_response (
326+ stream,
327+ self . config . sse_keep_alive ,
328+ self . config . cancellation_token . child_token ( ) ,
329+ ) )
311330 }
312331 ClientJsonRpcMessage :: Notification ( _)
313332 | ClientJsonRpcMessage :: Response ( _)
@@ -380,6 +399,7 @@ where
380399 }
381400 } ) ,
382401 self . config . sse_keep_alive ,
402+ self . config . cancellation_token . child_token ( ) ,
383403 ) ;
384404
385405 response. headers_mut ( ) . insert (
@@ -413,6 +433,7 @@ where
413433 }
414434 } ) ,
415435 self . config . sse_keep_alive ,
436+ self . config . cancellation_token . child_token ( ) ,
416437 ) )
417438 }
418439 ClientJsonRpcMessage :: Notification ( _notification) => {
0 commit comments