Skip to content

Commit e6a9fe4

Browse files
committed
feat(acp-nats): add terminal_output client handler
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent 81ba171 commit e6a9fe4

5 files changed

Lines changed: 758 additions & 1 deletion

File tree

rsworkspace/crates/acp-nats/src/client/mod.rs

Lines changed: 211 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ pub(crate) mod rpc_reply;
44
pub(crate) mod session_update;
55
pub(crate) mod terminal_create;
66
pub(crate) mod terminal_kill;
7+
pub(crate) mod terminal_output;
78

89
use crate::agent::Bridge;
910
use crate::error::AGENT_UNAVAILABLE;
@@ -225,6 +226,17 @@ async fn dispatch_client_method<
225226
)
226227
.await;
227228
}
229+
ClientMethod::TerminalOutput => {
230+
terminal_output::handle(
231+
&payload,
232+
ctx.client,
233+
reply.as_deref(),
234+
ctx.nats,
235+
parsed.session_id.as_str(),
236+
ctx.serializer,
237+
)
238+
.await;
239+
}
228240
}
229241
}
230242

@@ -237,7 +249,7 @@ mod tests {
237249
KillTerminalCommandRequest, KillTerminalCommandResponse, ReadTextFileRequest,
238250
ReadTextFileResponse, Request, RequestId, RequestPermissionOutcome,
239251
RequestPermissionRequest, RequestPermissionResponse, SessionNotification, SessionUpdate,
240-
ToolCallUpdate, ToolCallUpdateFields,
252+
TerminalOutputRequest, TerminalOutputResponse, ToolCallUpdate, ToolCallUpdateFields,
241253
};
242254
use async_trait::async_trait;
243255
use std::cell::RefCell;
@@ -248,19 +260,25 @@ mod tests {
248260
pub(super) struct MockClient {
249261
notifications: RefCell<Vec<String>>,
250262
kill_terminal_calls: RefCell<usize>,
263+
terminal_output_calls: RefCell<usize>,
251264
}
252265

253266
impl MockClient {
254267
pub(super) fn new() -> Self {
255268
Self {
256269
notifications: RefCell::new(Vec::new()),
257270
kill_terminal_calls: RefCell::new(0),
271+
terminal_output_calls: RefCell::new(0),
258272
}
259273
}
260274

261275
pub(super) fn kill_terminal_call_count(&self) -> usize {
262276
*self.kill_terminal_calls.borrow()
263277
}
278+
279+
pub(super) fn terminal_output_call_count(&self) -> usize {
280+
*self.terminal_output_calls.borrow()
281+
}
264282
}
265283

266284
#[async_trait(?Send)]
@@ -304,6 +322,17 @@ mod tests {
304322
*self.kill_terminal_calls.borrow_mut() += 1;
305323
Ok(KillTerminalCommandResponse::new())
306324
}
325+
326+
async fn terminal_output(
327+
&self,
328+
_: TerminalOutputRequest,
329+
) -> agent_client_protocol::Result<TerminalOutputResponse> {
330+
*self.terminal_output_calls.borrow_mut() += 1;
331+
Ok(TerminalOutputResponse::new(
332+
"mock output".to_string(),
333+
false,
334+
))
335+
}
307336
}
308337

309338
fn make_msg(subject: &str, payload: &[u8], reply: Option<&str>) -> async_nats::Message {
@@ -577,6 +606,170 @@ mod tests {
577606
assert!(response.get("error").is_none());
578607
}
579608

609+
#[tokio::test]
610+
async fn dispatch_client_method_dispatches_terminal_output() {
611+
let nats = MockNatsClient::new();
612+
let client = MockClient::new();
613+
let session_id = AcpSessionId::new("sess-1").unwrap();
614+
615+
let envelope = Request {
616+
id: RequestId::Number(1),
617+
method: std::sync::Arc::from("terminal/output"),
618+
params: Some(TerminalOutputRequest::new("sess-1", "term-001")),
619+
};
620+
let payload = bytes::Bytes::from(serde_json::to_vec(&envelope).unwrap());
621+
622+
let parsed = crate::nats::ParsedClientSubject {
623+
session_id,
624+
method: ClientMethod::TerminalOutput,
625+
};
626+
627+
let ctx = DispatchContext {
628+
nats: &nats,
629+
client: &client,
630+
serializer: &StdJsonSerialize,
631+
};
632+
dispatch_client_method(
633+
"acp.sess-1.client.terminal.output",
634+
parsed,
635+
payload,
636+
Some("_INBOX.reply".to_string()),
637+
&ctx,
638+
)
639+
.await;
640+
641+
assert_eq!(nats.published_messages(), vec!["_INBOX.reply"]);
642+
let payloads = nats.published_payloads();
643+
assert_eq!(payloads.len(), 1);
644+
let response: serde_json::Value = serde_json::from_slice(payloads[0].as_ref()).unwrap();
645+
assert_eq!(response.get("id"), Some(&serde_json::Value::from(1)));
646+
assert!(response.get("result").is_some());
647+
assert!(response.get("error").is_none());
648+
assert_eq!(
649+
client.terminal_output_call_count(),
650+
1,
651+
"terminal_output handler must run"
652+
);
653+
assert_eq!(
654+
client.kill_terminal_call_count(),
655+
0,
656+
"kill handler must not run"
657+
);
658+
}
659+
660+
#[tokio::test]
661+
async fn dispatch_client_method_dispatches_terminal_output_client_error_publishes_error_reply() {
662+
let nats = MockNatsClient::new();
663+
let client = TerminalKillFailingClient;
664+
let session_id = AcpSessionId::new("sess-1").unwrap();
665+
666+
let envelope = Request {
667+
id: RequestId::Number(1),
668+
method: std::sync::Arc::from("terminal/output"),
669+
params: Some(TerminalOutputRequest::new("sess-1", "term-001")),
670+
};
671+
let payload = bytes::Bytes::from(serde_json::to_vec(&envelope).unwrap());
672+
673+
let parsed = crate::nats::ParsedClientSubject {
674+
session_id,
675+
method: ClientMethod::TerminalOutput,
676+
};
677+
678+
let ctx = DispatchContext {
679+
nats: &nats,
680+
client: &client,
681+
serializer: &StdJsonSerialize,
682+
};
683+
dispatch_client_method(
684+
"acp.sess-1.client.terminal.output",
685+
parsed,
686+
payload,
687+
Some("_INBOX.reply".to_string()),
688+
&ctx,
689+
)
690+
.await;
691+
692+
assert_eq!(nats.published_messages(), vec!["_INBOX.reply"]);
693+
let payloads = nats.published_payloads();
694+
let response: serde_json::Value = serde_json::from_slice(payloads[0].as_ref()).unwrap();
695+
assert!(response.get("error").is_some());
696+
assert_eq!(
697+
response.get("error").and_then(|e| e.get("code")),
698+
Some(&serde_json::Value::from(-32603))
699+
);
700+
}
701+
702+
#[tokio::test]
703+
async fn dispatch_client_method_dispatches_terminal_output_with_rpc_mock_client() {
704+
let nats = MockNatsClient::new();
705+
let client = RpcMockClient;
706+
let session_id = AcpSessionId::new("sess-1").unwrap();
707+
708+
let envelope = Request {
709+
id: RequestId::Number(1),
710+
method: std::sync::Arc::from("terminal/output"),
711+
params: Some(TerminalOutputRequest::new("sess-1", "term-001")),
712+
};
713+
let payload = bytes::Bytes::from(serde_json::to_vec(&envelope).unwrap());
714+
715+
let parsed = crate::nats::ParsedClientSubject {
716+
session_id,
717+
method: ClientMethod::TerminalOutput,
718+
};
719+
720+
let ctx = DispatchContext {
721+
nats: &nats,
722+
client: &client,
723+
serializer: &StdJsonSerialize,
724+
};
725+
dispatch_client_method(
726+
"acp.sess-1.client.terminal.output",
727+
parsed,
728+
payload,
729+
Some("_INBOX.reply".to_string()),
730+
&ctx,
731+
)
732+
.await;
733+
734+
assert_eq!(nats.published_messages(), vec!["_INBOX.reply"]);
735+
}
736+
737+
#[tokio::test]
738+
async fn dispatch_client_method_dispatches_terminal_output_serialization_fallback() {
739+
let nats = MockNatsClient::new();
740+
let client = MockClient::new();
741+
let serializer = FailNextSerialize::new(1);
742+
let session_id = AcpSessionId::new("sess-1").unwrap();
743+
744+
let envelope = Request {
745+
id: RequestId::Number(1),
746+
method: std::sync::Arc::from("terminal/output"),
747+
params: Some(TerminalOutputRequest::new("sess-1", "term-001")),
748+
};
749+
let payload = bytes::Bytes::from(serde_json::to_vec(&envelope).unwrap());
750+
751+
let parsed = crate::nats::ParsedClientSubject {
752+
session_id,
753+
method: ClientMethod::TerminalOutput,
754+
};
755+
756+
let ctx = DispatchContext {
757+
nats: &nats,
758+
client: &client,
759+
serializer: &serializer,
760+
};
761+
dispatch_client_method(
762+
"acp.sess-1.client.terminal.output",
763+
parsed,
764+
payload,
765+
Some("_INBOX.reply".to_string()),
766+
&ctx,
767+
)
768+
.await;
769+
770+
assert_eq!(nats.published_messages(), vec!["_INBOX.reply"]);
771+
}
772+
580773
#[tokio::test]
581774
async fn dispatch_client_method_terminal_kill_no_reply_does_not_call_client_or_publish() {
582775
let nats = MockNatsClient::new();
@@ -753,6 +946,16 @@ mod tests {
753946
"mock kill_terminal_command failure",
754947
))
755948
}
949+
950+
async fn terminal_output(
951+
&self,
952+
_: TerminalOutputRequest,
953+
) -> agent_client_protocol::Result<TerminalOutputResponse> {
954+
Err(agent_client_protocol::Error::new(
955+
-32603,
956+
"mock terminal_output failure",
957+
))
958+
}
756959
}
757960

758961
#[tokio::test]
@@ -1259,6 +1462,13 @@ mod tests {
12591462
) -> agent_client_protocol::Result<ReadTextFileResponse> {
12601463
Ok(ReadTextFileResponse::new("file contents".to_string()))
12611464
}
1465+
1466+
async fn terminal_output(
1467+
&self,
1468+
_: TerminalOutputRequest,
1469+
) -> agent_client_protocol::Result<TerminalOutputResponse> {
1470+
Ok(TerminalOutputResponse::new("rpc mock output".to_string(), false))
1471+
}
12621472
}
12631473

12641474
#[tokio::test]

0 commit comments

Comments
 (0)