@@ -89,17 +89,14 @@ impl SessionManager for LocalSessionManager {
8989 let http_request_id = receiver. http_request_id ;
9090 handle. push_message ( message, http_request_id) . await ?;
9191
92- let priming_events: Vec < ServerSseMessage > = match self . session_config . sse_retry {
93- Some ( retry) => {
94- let event_id = match http_request_id {
95- Some ( id) => format ! ( "0/{id}" ) ,
96- None => "0" . into ( ) ,
97- } ;
98- vec ! [ ServerSseMessage :: priming( event_id, retry) ]
99- }
100- None => vec ! [ ] ,
101- } ;
102- Ok ( futures:: stream:: iter ( priming_events) . chain ( ReceiverStream :: new ( receiver. inner ) ) )
92+ let priming = self . session_config . sse_retry . map ( |retry| {
93+ let event_id = match http_request_id {
94+ Some ( id) => format ! ( "0/{id}" ) ,
95+ None => "0" . into ( ) ,
96+ } ;
97+ ServerSseMessage :: priming ( event_id, retry)
98+ } ) ;
99+ Ok ( futures:: stream:: iter ( priming) . chain ( ReceiverStream :: new ( receiver. inner ) ) )
103100 }
104101
105102 async fn create_standalone_stream (
@@ -421,11 +418,7 @@ impl LocalSessionWorker {
421418 ) -> Result < StreamableHttpMessageReceiver , SessionError > {
422419 let http_request_id = self . next_http_request_id ( ) ;
423420 let ( tx, rx) = tokio:: sync:: mpsc:: channel ( self . session_config . channel_capacity ) ;
424- let starting_index = if self . session_config . sse_retry . is_some ( ) {
425- 1
426- } else {
427- 0
428- } ;
421+ let starting_index = usize:: from ( self . session_config . sse_retry . is_some ( ) ) ;
429422 self . tx_router . insert (
430423 http_request_id,
431424 HttpRequestWise {
0 commit comments