Skip to content

Commit 2a9b911

Browse files
committed
feat: add Default and constructors to ServerSseMessage
1 parent cabf71a commit 2a9b911

3 files changed

Lines changed: 34 additions & 37 deletions

File tree

crates/rmcp/src/transport/common/server_side_http.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ impl sse_stream::Timer for TokioTimer {
5757
}
5858
}
5959

60-
#[derive(Debug, Clone)]
60+
#[derive(Debug, Clone, Default)]
6161
#[non_exhaustive]
6262
pub struct ServerSseMessage {
6363
/// The event ID for this message. When set, clients can use this ID
@@ -71,6 +71,28 @@ pub struct ServerSseMessage {
7171
pub retry: Option<Duration>,
7272
}
7373

74+
impl ServerSseMessage {
75+
/// Create a message carrying a JSON-RPC response/notification with an event ID.
76+
pub fn new(event_id: impl Into<String>, message: ServerJsonRpcMessage) -> Self {
77+
Self {
78+
event_id: Some(event_id.into()),
79+
message: Some(Arc::new(message)),
80+
retry: None,
81+
}
82+
}
83+
84+
/// Create a priming event that tells the client to reconnect after `retry`
85+
/// if the connection drops.
86+
/// See [SEP-1699](https://github.com/modelcontextprotocol/modelcontextprotocol/issues/1699).
87+
pub fn priming(event_id: impl Into<String>, retry: Duration) -> Self {
88+
Self {
89+
event_id: Some(event_id.into()),
90+
message: None,
91+
retry: Some(retry),
92+
}
93+
}
94+
}
95+
7496
pub(crate) fn sse_stream_response(
7597
stream: impl futures::Stream<Item = ServerSseMessage> + Send + Sync + 'static,
7698
keep_alive: Option<Duration>,

crates/rmcp/src/transport/streamable_http_server/session/local.rs

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use std::{
22
collections::{HashMap, HashSet, VecDeque},
33
num::ParseIntError,
4-
sync::Arc,
54
time::Duration,
65
};
76

@@ -222,21 +221,13 @@ impl CachedTx {
222221

223222
async fn send(&mut self, message: ServerJsonRpcMessage) {
224223
let event_id = self.next_event_id();
225-
let message = ServerSseMessage {
226-
event_id: Some(event_id.to_string()),
227-
message: Some(Arc::new(message)),
228-
retry: None,
229-
};
224+
let message = ServerSseMessage::new(event_id.to_string(), message);
230225
self.cache_and_send(message).await;
231226
}
232227

233228
async fn send_priming(&mut self, retry: Duration) {
234229
let event_id = self.next_event_id();
235-
let message = ServerSseMessage {
236-
event_id: Some(event_id.to_string()),
237-
message: None,
238-
retry: Some(retry),
239-
};
230+
let message = ServerSseMessage::priming(event_id.to_string(), retry);
240231
self.cache_and_send(message).await;
241232
}
242233

crates/rmcp/src/transport/streamable_http_server/tower.rs

Lines changed: 9 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -499,11 +499,7 @@ where
499499
.map_err(internal_error_response("create standalone stream"))?;
500500
// Prepend priming event if sse_retry configured
501501
let stream = if let Some(retry) = self.config.sse_retry {
502-
let priming = ServerSseMessage {
503-
event_id: Some("0".into()),
504-
message: None,
505-
retry: Some(retry),
506-
};
502+
let priming = ServerSseMessage::priming("0", retry);
507503
futures::stream::once(async move { priming })
508504
.chain(stream)
509505
.left_stream()
@@ -609,11 +605,7 @@ where
609605
.map_err(internal_error_response("get session"))?;
610606
// Prepend priming event if sse_retry configured
611607
let stream = if let Some(retry) = self.config.sse_retry {
612-
let priming = ServerSseMessage {
613-
event_id: Some("0".into()),
614-
message: None,
615-
retry: Some(retry),
616-
};
608+
let priming = ServerSseMessage::priming("0", retry);
617609
futures::stream::once(async move { priming })
618610
.chain(stream)
619611
.left_stream()
@@ -688,19 +680,13 @@ where
688680
.await
689681
.map_err(internal_error_response("create stream"))?;
690682
let stream = futures::stream::once(async move {
691-
ServerSseMessage {
692-
event_id: None,
693-
message: Some(Arc::new(response)),
694-
retry: None,
695-
}
683+
let mut msg = ServerSseMessage::default();
684+
msg.message = Some(Arc::new(response));
685+
msg
696686
});
697687
// Prepend priming event if sse_retry configured
698688
let stream = if let Some(retry) = self.config.sse_retry {
699-
let priming = ServerSseMessage {
700-
event_id: Some("0".into()),
701-
message: None,
702-
retry: Some(retry),
703-
};
689+
let priming = ServerSseMessage::priming("0", retry);
704690
futures::stream::once(async move { priming })
705691
.chain(stream)
706692
.left_stream()
@@ -774,11 +760,9 @@ where
774760
// SSE mode (default): original behaviour preserved unchanged
775761
let stream = ReceiverStream::new(receiver).map(|message| {
776762
tracing::trace!(?message);
777-
ServerSseMessage {
778-
event_id: None,
779-
message: Some(Arc::new(message)),
780-
retry: None,
781-
}
763+
let mut msg = ServerSseMessage::default();
764+
msg.message = Some(Arc::new(message));
765+
msg
782766
});
783767
Ok(sse_stream_response(
784768
stream,

0 commit comments

Comments
 (0)