Skip to content

Commit cb82a02

Browse files
committed
refactor(acp-nats): simplify client dispatch and terminal_output handling
- Centralize client dispatch reply handling - Add Content-Type header to centralized reply publishing - Simplify terminal_output by removing custom error types - Preserve semantic JSON-RPC error codes - Restore session ID cross-check in terminal_output - Clean up session_update and ext_session_prompt_response Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent 01be2e5 commit cb82a02

4 files changed

Lines changed: 160 additions & 364 deletions

File tree

rsworkspace/crates/acp-nats/src/client/ext_session_prompt_response.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ mod tests {
142142

143143
let payload = response_with_prompt_id(StopReason::EndTurn, token);
144144

145-
handle("prompt-resp-001", &payload, None, &bridge).await;
145+
super::handle("prompt-resp-001", &payload, None, &bridge).await;
146146

147147
let result = rx
148148
.await
@@ -156,7 +156,7 @@ mod tests {
156156
let bridge = make_bridge();
157157
let payload = response_with_prompt_id(StopReason::EndTurn, PromptToken(0));
158158

159-
handle("no-waiter-session", &payload, None, &bridge).await;
159+
super::handle("no-waiter-session", &payload, None, &bridge).await;
160160
}
161161

162162
#[tokio::test]
@@ -174,7 +174,7 @@ mod tests {
174174
token.0
175175
);
176176

177-
handle("bad-payload-001", payload.as_bytes(), None, &bridge).await;
177+
super::handle("bad-payload-001", payload.as_bytes(), None, &bridge).await;
178178

179179
let result = rx
180180
.await
@@ -196,7 +196,7 @@ mod tests {
196196
let response = PromptResponse::new(StopReason::EndTurn);
197197
let payload = serde_json::to_vec(&response).unwrap();
198198

199-
handle("no-token-session", &payload, None, &bridge).await;
199+
super::handle("no-token-session", &payload, None, &bridge).await;
200200

201201
assert!(
202202
bridge
@@ -222,9 +222,9 @@ mod tests {
222222

223223
let payload = response_with_prompt_id(StopReason::EndTurn, token);
224224

225-
handle("session.with.dots", &payload, None, &bridge).await;
226-
handle("session*wild", &payload, None, &bridge).await;
227-
handle("session id", &payload, None, &bridge).await;
225+
super::handle("session.with.dots", &payload, None, &bridge).await;
226+
super::handle("session*wild", &payload, None, &bridge).await;
227+
super::handle("session id", &payload, None, &bridge).await;
228228

229229
assert!(
230230
bridge
@@ -267,7 +267,7 @@ mod tests {
267267
.unwrap();
268268

269269
let late_payload = response_with_prompt_id(StopReason::EndTurn, token1);
270-
handle("same-session", &late_payload, None, &bridge).await;
270+
super::handle("same-session", &late_payload, None, &bridge).await;
271271

272272
assert!(
273273
bridge

rsworkspace/crates/acp-nats/src/client/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ pub async fn run<
7171
let wildcard = client::wildcards::all(bridge.config.acp_prefix());
7272
info!("Starting client proxy - subscribing to {}", wildcard);
7373

74-
// TODO: change `run` to return `Result` and propagate this error once there is a caller.
7574
let mut subscriber = match nats.subscribe(wildcard).await {
7675
Ok(sub) => sub,
7776
Err(e) => {
@@ -221,7 +220,7 @@ async fn dispatch_client_method<
221220
.await;
222221
}
223222
ClientMethod::SessionUpdate => {
224-
session_update::handle(&payload, ctx.client, &parsed.session_id).await;
223+
session_update::handle(&payload, ctx.client, reply.is_some()).await;
225224
}
226225
ClientMethod::ExtSessionPromptResponse => {
227226
ext_session_prompt_response::handle(

rsworkspace/crates/acp-nats/src/client/session_update.rs

Lines changed: 18 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1-
use crate::session_id::AcpSessionId;
21
use agent_client_protocol::{Client, SessionNotification};
3-
use tracing::{info, instrument, warn};
2+
use tracing::{instrument, warn};
43

5-
#[instrument(name = "acp.client.session.update", skip(payload, client), fields(session_id = %session_id))]
6-
pub async fn handle<C: Client>(payload: &[u8], client: &C, session_id: &AcpSessionId) {
7-
info!(session_id = %session_id, "Forwarding session update to client");
4+
#[instrument(name = "acp.client.session.update", skip(payload, client))]
5+
pub async fn handle<C: Client>(payload: &[u8], client: &C, has_reply: bool) {
6+
if has_reply {
7+
warn!("Unexpected reply subject on notification request");
8+
}
89
match serde_json::from_slice::<SessionNotification>(payload) {
910
Ok(notification) => {
1011
if let Err(e) = client.session_notification(notification).await {
@@ -21,17 +22,12 @@ pub async fn handle<C: Client>(payload: &[u8], client: &C, session_id: &AcpSessi
2122
mod tests {
2223
use super::*;
2324
use agent_client_protocol::{
24-
ContentBlock, ContentChunk, RequestPermissionRequest, RequestPermissionResponse,
25-
SessionUpdate,
25+
ContentBlock, ContentChunk, RequestPermissionOutcome, RequestPermissionRequest,
26+
RequestPermissionResponse, SessionUpdate,
2627
};
2728
use async_trait::async_trait;
2829
use std::cell::RefCell;
2930

30-
fn session_id(s: &str) -> AcpSessionId {
31-
AcpSessionId::new(s).unwrap()
32-
}
33-
34-
#[derive(Debug)]
3531
struct MockClient {
3632
notifications_received: RefCell<Vec<String>>,
3733
should_fail: bool,
@@ -62,7 +58,7 @@ mod tests {
6258
async fn session_notification(
6359
&self,
6460
notification: SessionNotification,
65-
) -> Result<(), agent_client_protocol::Error> {
61+
) -> agent_client_protocol::Result<()> {
6662
if self.should_fail {
6763
return Err(agent_client_protocol::Error::new(-1, "mock failure"));
6864
}
@@ -75,57 +71,43 @@ mod tests {
7571
async fn request_permission(
7672
&self,
7773
_: RequestPermissionRequest,
78-
) -> Result<RequestPermissionResponse, agent_client_protocol::Error> {
79-
Err(agent_client_protocol::Error::new(
80-
-32603,
81-
"not implemented in test mock",
74+
) -> agent_client_protocol::Result<RequestPermissionResponse> {
75+
Ok(RequestPermissionResponse::new(
76+
RequestPermissionOutcome::Cancelled,
8277
))
8378
}
8479
}
8580

8681
#[tokio::test]
87-
async fn session_update_forwards_notification_to_client() {
82+
async fn forwards_notification_to_client() {
8883
let client = MockClient::new();
8984
let notification = SessionNotification::new(
9085
"session-001",
9186
SessionUpdate::AgentMessageChunk(ContentChunk::new(ContentBlock::from("hello"))),
9287
);
9388
let payload = serde_json::to_vec(&notification).unwrap();
9489

95-
handle(&payload, &client, &session_id("session-001")).await;
90+
super::handle(&payload, &client, false).await;
9691

9792
assert_eq!(client.notification_count(), 1);
9893
}
9994

10095
#[tokio::test]
101-
async fn session_update_invalid_payload_does_not_panic() {
96+
async fn invalid_payload_does_not_panic() {
10297
let client = MockClient::new();
103-
handle(b"not json", &client, &session_id("session-001")).await;
98+
super::handle(b"not json", &client, false).await;
10499
assert_eq!(client.notification_count(), 0);
105100
}
106101

107102
#[tokio::test]
108-
async fn session_update_client_error_does_not_panic() {
103+
async fn client_error_does_not_panic() {
109104
let client = MockClient::failing();
110105
let notification = SessionNotification::new(
111106
"session-001",
112107
SessionUpdate::AgentMessageChunk(ContentChunk::new(ContentBlock::from("hello"))),
113108
);
114109
let payload = serde_json::to_vec(&notification).unwrap();
115110

116-
handle(&payload, &client, &session_id("session-001")).await;
117-
}
118-
119-
#[tokio::test]
120-
async fn mock_client_request_permission_returns_err() {
121-
let client = MockClient::new();
122-
let req: RequestPermissionRequest = serde_json::from_value(serde_json::json!({
123-
"sessionId": "sess-1",
124-
"toolCall": { "toolCallId": "call-1" },
125-
"options": []
126-
}))
127-
.unwrap();
128-
let result = client.request_permission(req).await;
129-
assert!(result.is_err());
111+
super::handle(&payload, &client, false).await;
130112
}
131113
}

0 commit comments

Comments
 (0)