-
Notifications
You must be signed in to change notification settings - Fork 244
Expand file tree
/
Copy pathstream_broadcast.rs
More file actions
278 lines (252 loc) · 9.34 KB
/
stream_broadcast.rs
File metadata and controls
278 lines (252 loc) · 9.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
//! JSON-RPC Stream broadcasting for debugging and monitoring communication.
//!
//! This module provides functionality to observe the JSON-RPC message stream between
//! clients and agents. It's primarily used for debugging, logging, and building
//! development tools that need to monitor the protocol communication.
use std::sync::Arc;
use anyhow::Result;
use serde::Serialize;
use serde_json::value::RawValue;
use crate::{
Error,
rpc::{Id, OutgoingMessage, ResponseResult, Side},
};
/// A message that flows through the RPC stream.
///
/// This represents any JSON-RPC message (request, response, or notification)
/// along with its direction (incoming or outgoing).
///
/// Stream messages are used for observing and debugging the protocol communication
/// without interfering with the actual message handling.
#[derive(Debug, Clone)]
pub struct StreamMessage {
/// The direction of the message relative to this side of the connection.
pub direction: StreamMessageDirection,
/// The actual content of the message.
pub message: StreamMessageContent,
}
/// The direction of a message in the RPC stream.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StreamMessageDirection {
/// A message received from the other side of the connection.
Incoming,
/// A message sent to the other side of the connection.
Outgoing,
}
/// The content of a stream message.
///
/// This enum represents the three types of JSON-RPC messages:
/// - Requests: Method calls that expect a response
/// - Responses: Replies to previous requests
/// - Notifications: One-way messages that don't expect a response
#[derive(Debug, Clone)]
pub enum StreamMessageContent {
/// A JSON-RPC request message.
Request {
/// The unique identifier for this request.
id: Id,
/// The name of the method being called.
method: Arc<str>,
/// Optional parameters for the method.
params: Option<serde_json::Value>,
},
/// A JSON-RPC response message.
Response {
/// The ID of the request this response is for.
id: Id,
/// The result of the request (success or error).
result: Result<Option<serde_json::Value>, Error>,
},
/// A JSON-RPC notification message.
Notification {
/// The name of the notification method.
method: Arc<str>,
/// Optional parameters for the notification.
params: Option<serde_json::Value>,
},
}
/// A receiver for observing the message stream.
///
/// This allows you to receive copies of all messages flowing through the connection,
/// useful for debugging, logging, or building development tools.
///
/// # Example
///
/// ```no_run
/// use agent_client_protocol::{StreamReceiver, StreamMessageDirection};
///
/// async fn monitor_messages(mut receiver: StreamReceiver) {
/// while let Ok(message) = receiver.recv().await {
/// match message.direction {
/// StreamMessageDirection::Incoming => println!("← Received: {:?}", message.message),
/// StreamMessageDirection::Outgoing => println!("→ Sent: {:?}", message.message),
/// }
/// }
/// }
/// ```
pub struct StreamReceiver(async_broadcast::Receiver<StreamMessage>);
impl StreamReceiver {
/// Receives the next message from the stream.
///
/// This method will wait until a message is available or the sender is dropped.
///
/// # Returns
///
/// - `Ok(StreamMessage)` when a message is received
/// - `Err` when the sender is dropped or the receiver is lagged
pub async fn recv(&mut self) -> Result<StreamMessage> {
Ok(self.0.recv().await?)
}
}
/// Internal sender for broadcasting stream messages.
///
/// This is used internally by the RPC system to broadcast messages to all receivers.
/// You typically won't interact with this directly.
pub(crate) struct StreamSender(async_broadcast::Sender<StreamMessage>);
impl StreamSender {
/// Broadcasts an outgoing message to all receivers.
pub(crate) fn outgoing<L: Side, R: Side>(&self, message: &OutgoingMessage<L, R>) {
if self.0.receiver_count() == 0 {
return;
}
let message = StreamMessage {
direction: StreamMessageDirection::Outgoing,
message: match message {
OutgoingMessage::Request { id, method, params } => StreamMessageContent::Request {
id: id.clone(),
method: method.clone(),
params: serde_json::to_value(params).ok(),
},
OutgoingMessage::Response { id, result } => StreamMessageContent::Response {
id: id.clone(),
result: match result {
ResponseResult::Result(value) => Ok(serde_json::to_value(value).ok()),
ResponseResult::Error(error) => Err(error.clone()),
},
},
OutgoingMessage::Notification { method, params } => {
StreamMessageContent::Notification {
method: method.clone(),
params: serde_json::to_value(params).ok(),
}
}
},
};
self.0.try_broadcast(message).ok();
}
/// Broadcasts an incoming request to all receivers.
pub(crate) fn incoming_request(
&self,
id: Id,
method: impl Into<Arc<str>>,
params: &impl Serialize,
) {
if self.0.receiver_count() == 0 {
return;
}
let message = StreamMessage {
direction: StreamMessageDirection::Incoming,
message: StreamMessageContent::Request {
id,
method: method.into(),
params: serde_json::to_value(params).ok(),
},
};
self.0.try_broadcast(message).ok();
}
/// Broadcasts an incoming response to all receivers.
pub(crate) fn incoming_response(&self, id: Id, result: Result<Option<&RawValue>, &Error>) {
if self.0.receiver_count() == 0 {
return;
}
let result = match result {
Ok(Some(value)) => Ok(serde_json::from_str(value.get()).ok()),
Ok(None) => Ok(None),
Err(err) => Err(err.clone()),
};
let message = StreamMessage {
direction: StreamMessageDirection::Incoming,
message: StreamMessageContent::Response { id, result },
};
self.0.try_broadcast(message).ok();
}
/// Broadcasts an incoming notification to all receivers.
pub(crate) fn incoming_notification(
&self,
method: impl Into<Arc<str>>,
params: &impl Serialize,
) {
if self.0.receiver_count() == 0 {
return;
}
let message = StreamMessage {
direction: StreamMessageDirection::Incoming,
message: StreamMessageContent::Notification {
method: method.into(),
params: serde_json::to_value(params).ok(),
},
};
self.0.try_broadcast(message).ok();
}
}
/// A broadcast for observing RPC message streams.
///
/// This is used internally by the RPC connection to allow multiple receivers
/// to observe the message stream.
pub(crate) struct StreamBroadcast {
receiver: async_broadcast::InactiveReceiver<StreamMessage>,
}
impl StreamBroadcast {
/// Creates a new broadcast.
///
/// Returns a sender for broadcasting messages and the broadcast instance
/// for creating receivers.
pub(crate) fn new() -> (StreamSender, Self) {
let (sender, receiver) = async_broadcast::broadcast(1);
(
StreamSender(sender),
Self {
receiver: receiver.deactivate(),
},
)
}
/// Creates a new receiver for observing the message stream.
///
/// Each receiver will get its own copy of every message.
pub(crate) fn receiver(&self) -> StreamReceiver {
let was_empty = self.receiver.receiver_count() == 0;
let mut new_receiver = self.receiver.activate_cloned();
if was_empty {
// Grow capacity once we actually have a receiver
new_receiver.set_capacity(64);
}
StreamReceiver(new_receiver)
}
}
impl<Local: Side, Remote: Side> From<OutgoingMessage<Local, Remote>> for StreamMessage {
fn from(message: OutgoingMessage<Local, Remote>) -> Self {
Self {
direction: StreamMessageDirection::Outgoing,
message: match message {
OutgoingMessage::Request { id, method, params } => StreamMessageContent::Request {
id,
method,
params: serde_json::to_value(params).ok(),
},
OutgoingMessage::Response { id, result } => StreamMessageContent::Response {
id,
result: match result {
ResponseResult::Result(value) => Ok(serde_json::to_value(value).ok()),
ResponseResult::Error(error) => Err(error),
},
},
OutgoingMessage::Notification { method, params } => {
StreamMessageContent::Notification {
method,
params: serde_json::to_value(params).ok(),
}
}
},
}
}
}