Skip to content

Commit 45a4cc5

Browse files
authored
feat: add Default and constructors to ServerSseMessage (#794)
* feat: add Default and constructors to ServerSseMessage * fix: add tests, missing feature gates, small test issues
1 parent 5f43283 commit 45a4cc5

File tree

5 files changed

+92
-40
lines changed

5 files changed

+92
-40
lines changed

crates/rmcp/src/transport/common/http_header.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,10 @@ pub(crate) fn extract_scope_from_header(header: &str) -> Option<String> {
6565

6666
#[cfg(test)]
6767
mod tests {
68+
#[cfg(feature = "client-side-sse")]
6869
use super::*;
6970

71+
#[cfg(feature = "client-side-sse")]
7072
#[test]
7173
fn extract_scope_quoted() {
7274
let header = r#"Bearer error="insufficient_scope", scope="files:read files:write""#;
@@ -76,6 +78,7 @@ mod tests {
7678
);
7779
}
7880

81+
#[cfg(feature = "client-side-sse")]
7982
#[test]
8083
fn extract_scope_unquoted() {
8184
let header = r#"Bearer scope=read:data, error="insufficient_scope""#;
@@ -85,12 +88,14 @@ mod tests {
8588
);
8689
}
8790

91+
#[cfg(feature = "client-side-sse")]
8892
#[test]
8993
fn extract_scope_missing() {
9094
let header = r#"Bearer error="invalid_token""#;
9195
assert_eq!(extract_scope_from_header(header), None);
9296
}
9397

98+
#[cfg(feature = "client-side-sse")]
9499
#[test]
95100
fn extract_scope_empty_header() {
96101
assert_eq!(extract_scope_from_header("Bearer"), None);

crates/rmcp/src/transport/common/server_side_http.rs

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ impl sse_stream::Timer for TokioTimer {
5757
}
5858
}
5959

60-
#[derive(Debug, Clone)]
60+
#[derive(Debug, Clone, Default)]
6161
#[non_exhaustive]
6262
pub struct ServerSseMessage {
6363
/// The event ID for this message. When set, clients can use this ID
@@ -71,6 +71,37 @@ pub struct ServerSseMessage {
7171
pub retry: Option<Duration>,
7272
}
7373

74+
impl ServerSseMessage {
75+
/// Create a message carrying a JSON-RPC response/notification with an event ID.
76+
pub fn new(event_id: impl Into<String>, message: ServerJsonRpcMessage) -> Self {
77+
Self {
78+
event_id: Some(event_id.into()),
79+
message: Some(Arc::new(message)),
80+
retry: None,
81+
}
82+
}
83+
84+
/// Wrap a JSON-RPC message without an event ID or retry hint.
85+
pub fn from_message(message: ServerJsonRpcMessage) -> Self {
86+
Self {
87+
event_id: None,
88+
message: Some(Arc::new(message)),
89+
retry: None,
90+
}
91+
}
92+
93+
/// Create a priming event that tells the client to reconnect after `retry`
94+
/// if the connection drops.
95+
/// See [SEP-1699](https://github.com/modelcontextprotocol/modelcontextprotocol/issues/1699).
96+
pub fn priming(event_id: impl Into<String>, retry: Duration) -> Self {
97+
Self {
98+
event_id: Some(event_id.into()),
99+
message: None,
100+
retry: Some(retry),
101+
}
102+
}
103+
}
104+
74105
pub(crate) fn sse_stream_response(
75106
stream: impl futures::Stream<Item = ServerSseMessage> + Send + Sync + 'static,
76107
keep_alive: Option<Duration>,
@@ -169,3 +200,49 @@ where
169200
}
170201
}
171202
}
203+
204+
#[cfg(test)]
205+
mod tests {
206+
use super::*;
207+
use crate::model::{EmptyResult, JsonRpcResponse, JsonRpcVersion2_0, RequestId, ServerResult};
208+
209+
fn dummy_message() -> ServerJsonRpcMessage {
210+
ServerJsonRpcMessage::Response(JsonRpcResponse {
211+
jsonrpc: JsonRpcVersion2_0,
212+
id: RequestId::Number(1),
213+
result: ServerResult::EmptyResult(EmptyResult {}),
214+
})
215+
}
216+
217+
#[test]
218+
fn default_has_all_none() {
219+
let msg = ServerSseMessage::default();
220+
assert!(msg.event_id.is_none());
221+
assert!(msg.message.is_none());
222+
assert!(msg.retry.is_none());
223+
}
224+
225+
#[test]
226+
fn new_sets_event_id_and_message() {
227+
let msg = ServerSseMessage::new("42", dummy_message());
228+
assert_eq!(msg.event_id.as_deref(), Some("42"));
229+
assert!(msg.message.is_some());
230+
assert!(msg.retry.is_none());
231+
}
232+
233+
#[test]
234+
fn from_message_has_no_event_id() {
235+
let msg = ServerSseMessage::from_message(dummy_message());
236+
assert!(msg.event_id.is_none());
237+
assert!(msg.message.is_some());
238+
assert!(msg.retry.is_none());
239+
}
240+
241+
#[test]
242+
fn priming_sets_event_id_and_retry() {
243+
let msg = ServerSseMessage::priming("0", Duration::from_secs(5));
244+
assert_eq!(msg.event_id.as_deref(), Some("0"));
245+
assert!(msg.message.is_none());
246+
assert_eq!(msg.retry, Some(Duration::from_secs(5)));
247+
}
248+
}

crates/rmcp/src/transport/streamable_http_server/session/local.rs

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use std::{
22
collections::{HashMap, HashSet, VecDeque},
33
num::ParseIntError,
4-
sync::Arc,
54
time::Duration,
65
};
76

@@ -222,21 +221,13 @@ impl CachedTx {
222221

223222
async fn send(&mut self, message: ServerJsonRpcMessage) {
224223
let event_id = self.next_event_id();
225-
let message = ServerSseMessage {
226-
event_id: Some(event_id.to_string()),
227-
message: Some(Arc::new(message)),
228-
retry: None,
229-
};
224+
let message = ServerSseMessage::new(event_id.to_string(), message);
230225
self.cache_and_send(message).await;
231226
}
232227

233228
async fn send_priming(&mut self, retry: Duration) {
234229
let event_id = self.next_event_id();
235-
let message = ServerSseMessage {
236-
event_id: Some(event_id.to_string()),
237-
message: None,
238-
retry: Some(retry),
239-
};
230+
let message = ServerSseMessage::priming(event_id.to_string(), retry);
240231
self.cache_and_send(message).await;
241232
}
242233

crates/rmcp/src/transport/streamable_http_server/tower.rs

Lines changed: 6 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -499,11 +499,7 @@ where
499499
.map_err(internal_error_response("create standalone stream"))?;
500500
// Prepend priming event if sse_retry configured
501501
let stream = if let Some(retry) = self.config.sse_retry {
502-
let priming = ServerSseMessage {
503-
event_id: Some("0".into()),
504-
message: None,
505-
retry: Some(retry),
506-
};
502+
let priming = ServerSseMessage::priming("0", retry);
507503
futures::stream::once(async move { priming })
508504
.chain(stream)
509505
.left_stream()
@@ -609,11 +605,7 @@ where
609605
.map_err(internal_error_response("get session"))?;
610606
// Prepend priming event if sse_retry configured
611607
let stream = if let Some(retry) = self.config.sse_retry {
612-
let priming = ServerSseMessage {
613-
event_id: Some("0".into()),
614-
message: None,
615-
retry: Some(retry),
616-
};
608+
let priming = ServerSseMessage::priming("0", retry);
617609
futures::stream::once(async move { priming })
618610
.chain(stream)
619611
.left_stream()
@@ -687,20 +679,11 @@ where
687679
.initialize_session(&session_id, message)
688680
.await
689681
.map_err(internal_error_response("create stream"))?;
690-
let stream = futures::stream::once(async move {
691-
ServerSseMessage {
692-
event_id: None,
693-
message: Some(Arc::new(response)),
694-
retry: None,
695-
}
696-
});
682+
let stream =
683+
futures::stream::once(async move { ServerSseMessage::from_message(response) });
697684
// Prepend priming event if sse_retry configured
698685
let stream = if let Some(retry) = self.config.sse_retry {
699-
let priming = ServerSseMessage {
700-
event_id: Some("0".into()),
701-
message: None,
702-
retry: Some(retry),
703-
};
686+
let priming = ServerSseMessage::priming("0", retry);
704687
futures::stream::once(async move { priming })
705688
.chain(stream)
706689
.left_stream()
@@ -774,11 +757,7 @@ where
774757
// SSE mode (default): original behaviour preserved unchanged
775758
let stream = ReceiverStream::new(receiver).map(|message| {
776759
tracing::trace!(?message);
777-
ServerSseMessage {
778-
event_id: None,
779-
message: Some(Arc::new(message)),
780-
retry: None,
781-
}
760+
ServerSseMessage::from_message(message)
782761
});
783762
Ok(sse_stream_response(
784763
stream,

crates/rmcp/tests/test_inflight_response_drain.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
#![cfg(not(feature = "local"))]
1+
#![cfg(all(feature = "client", feature = "server", not(feature = "local")))]
22
// cargo test --test test_inflight_response_drain --features "client server"
33

44
use std::{

0 commit comments

Comments
 (0)