Skip to content

Commit 74dacbb

Browse files
committed
feat(acp-nats): add terminal_output client handler
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent 81ba171 commit 74dacbb

5 files changed

Lines changed: 811 additions & 1 deletion

File tree

rsworkspace/crates/acp-nats/src/client/mod.rs

Lines changed: 215 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ pub(crate) mod rpc_reply;
44
pub(crate) mod session_update;
55
pub(crate) mod terminal_create;
66
pub(crate) mod terminal_kill;
7+
pub(crate) mod terminal_output;
78

89
use crate::agent::Bridge;
910
use crate::error::AGENT_UNAVAILABLE;
@@ -225,6 +226,17 @@ async fn dispatch_client_method<
225226
)
226227
.await;
227228
}
229+
ClientMethod::TerminalOutput => {
230+
terminal_output::handle(
231+
&payload,
232+
ctx.client,
233+
reply.as_deref(),
234+
ctx.nats,
235+
parsed.session_id.as_str(),
236+
ctx.serializer,
237+
)
238+
.await;
239+
}
228240
}
229241
}
230242

@@ -237,7 +249,7 @@ mod tests {
237249
KillTerminalCommandRequest, KillTerminalCommandResponse, ReadTextFileRequest,
238250
ReadTextFileResponse, Request, RequestId, RequestPermissionOutcome,
239251
RequestPermissionRequest, RequestPermissionResponse, SessionNotification, SessionUpdate,
240-
ToolCallUpdate, ToolCallUpdateFields,
252+
TerminalOutputRequest, TerminalOutputResponse, ToolCallUpdate, ToolCallUpdateFields,
241253
};
242254
use async_trait::async_trait;
243255
use std::cell::RefCell;
@@ -248,19 +260,25 @@ mod tests {
248260
pub(super) struct MockClient {
249261
notifications: RefCell<Vec<String>>,
250262
kill_terminal_calls: RefCell<usize>,
263+
terminal_output_calls: RefCell<usize>,
251264
}
252265

253266
impl MockClient {
254267
pub(super) fn new() -> Self {
255268
Self {
256269
notifications: RefCell::new(Vec::new()),
257270
kill_terminal_calls: RefCell::new(0),
271+
terminal_output_calls: RefCell::new(0),
258272
}
259273
}
260274

261275
pub(super) fn kill_terminal_call_count(&self) -> usize {
262276
*self.kill_terminal_calls.borrow()
263277
}
278+
279+
pub(super) fn terminal_output_call_count(&self) -> usize {
280+
*self.terminal_output_calls.borrow()
281+
}
264282
}
265283

266284
#[async_trait(?Send)]
@@ -304,6 +322,17 @@ mod tests {
304322
*self.kill_terminal_calls.borrow_mut() += 1;
305323
Ok(KillTerminalCommandResponse::new())
306324
}
325+
326+
async fn terminal_output(
327+
&self,
328+
_: TerminalOutputRequest,
329+
) -> agent_client_protocol::Result<TerminalOutputResponse> {
330+
*self.terminal_output_calls.borrow_mut() += 1;
331+
Ok(TerminalOutputResponse::new(
332+
"mock output".to_string(),
333+
false,
334+
))
335+
}
307336
}
308337

309338
fn make_msg(subject: &str, payload: &[u8], reply: Option<&str>) -> async_nats::Message {
@@ -577,6 +606,171 @@ mod tests {
577606
assert!(response.get("error").is_none());
578607
}
579608

609+
#[tokio::test]
610+
async fn dispatch_client_method_dispatches_terminal_output() {
611+
let nats = MockNatsClient::new();
612+
let client = MockClient::new();
613+
let session_id = AcpSessionId::new("sess-1").unwrap();
614+
615+
let envelope = Request {
616+
id: RequestId::Number(1),
617+
method: std::sync::Arc::from("terminal/output"),
618+
params: Some(TerminalOutputRequest::new("sess-1", "term-001")),
619+
};
620+
let payload = bytes::Bytes::from(serde_json::to_vec(&envelope).unwrap());
621+
622+
let parsed = crate::nats::ParsedClientSubject {
623+
session_id,
624+
method: ClientMethod::TerminalOutput,
625+
};
626+
627+
let ctx = DispatchContext {
628+
nats: &nats,
629+
client: &client,
630+
serializer: &StdJsonSerialize,
631+
};
632+
dispatch_client_method(
633+
"acp.sess-1.client.terminal.output",
634+
parsed,
635+
payload,
636+
Some("_INBOX.reply".to_string()),
637+
&ctx,
638+
)
639+
.await;
640+
641+
assert_eq!(nats.published_messages(), vec!["_INBOX.reply"]);
642+
let payloads = nats.published_payloads();
643+
assert_eq!(payloads.len(), 1);
644+
let response: serde_json::Value = serde_json::from_slice(payloads[0].as_ref()).unwrap();
645+
assert_eq!(response.get("id"), Some(&serde_json::Value::from(1)));
646+
assert!(response.get("result").is_some());
647+
assert!(response.get("error").is_none());
648+
assert_eq!(
649+
client.terminal_output_call_count(),
650+
1,
651+
"terminal_output handler must run"
652+
);
653+
assert_eq!(
654+
client.kill_terminal_call_count(),
655+
0,
656+
"kill handler must not run"
657+
);
658+
}
659+
660+
#[tokio::test]
661+
async fn dispatch_client_method_dispatches_terminal_output_client_error_publishes_error_reply()
662+
{
663+
let nats = MockNatsClient::new();
664+
let client = TerminalKillFailingClient;
665+
let session_id = AcpSessionId::new("sess-1").unwrap();
666+
667+
let envelope = Request {
668+
id: RequestId::Number(1),
669+
method: std::sync::Arc::from("terminal/output"),
670+
params: Some(TerminalOutputRequest::new("sess-1", "term-001")),
671+
};
672+
let payload = bytes::Bytes::from(serde_json::to_vec(&envelope).unwrap());
673+
674+
let parsed = crate::nats::ParsedClientSubject {
675+
session_id,
676+
method: ClientMethod::TerminalOutput,
677+
};
678+
679+
let ctx = DispatchContext {
680+
nats: &nats,
681+
client: &client,
682+
serializer: &StdJsonSerialize,
683+
};
684+
dispatch_client_method(
685+
"acp.sess-1.client.terminal.output",
686+
parsed,
687+
payload,
688+
Some("_INBOX.reply".to_string()),
689+
&ctx,
690+
)
691+
.await;
692+
693+
assert_eq!(nats.published_messages(), vec!["_INBOX.reply"]);
694+
let payloads = nats.published_payloads();
695+
let response: serde_json::Value = serde_json::from_slice(payloads[0].as_ref()).unwrap();
696+
assert!(response.get("error").is_some());
697+
assert_eq!(
698+
response.get("error").and_then(|e| e.get("code")),
699+
Some(&serde_json::Value::from(-32603))
700+
);
701+
}
702+
703+
#[tokio::test]
704+
async fn dispatch_client_method_dispatches_terminal_output_with_rpc_mock_client() {
705+
let nats = MockNatsClient::new();
706+
let client = RpcMockClient;
707+
let session_id = AcpSessionId::new("sess-1").unwrap();
708+
709+
let envelope = Request {
710+
id: RequestId::Number(1),
711+
method: std::sync::Arc::from("terminal/output"),
712+
params: Some(TerminalOutputRequest::new("sess-1", "term-001")),
713+
};
714+
let payload = bytes::Bytes::from(serde_json::to_vec(&envelope).unwrap());
715+
716+
let parsed = crate::nats::ParsedClientSubject {
717+
session_id,
718+
method: ClientMethod::TerminalOutput,
719+
};
720+
721+
let ctx = DispatchContext {
722+
nats: &nats,
723+
client: &client,
724+
serializer: &StdJsonSerialize,
725+
};
726+
dispatch_client_method(
727+
"acp.sess-1.client.terminal.output",
728+
parsed,
729+
payload,
730+
Some("_INBOX.reply".to_string()),
731+
&ctx,
732+
)
733+
.await;
734+
735+
assert_eq!(nats.published_messages(), vec!["_INBOX.reply"]);
736+
}
737+
738+
#[tokio::test]
739+
async fn dispatch_client_method_dispatches_terminal_output_serialization_fallback() {
740+
let nats = MockNatsClient::new();
741+
let client = MockClient::new();
742+
let serializer = FailNextSerialize::new(1);
743+
let session_id = AcpSessionId::new("sess-1").unwrap();
744+
745+
let envelope = Request {
746+
id: RequestId::Number(1),
747+
method: std::sync::Arc::from("terminal/output"),
748+
params: Some(TerminalOutputRequest::new("sess-1", "term-001")),
749+
};
750+
let payload = bytes::Bytes::from(serde_json::to_vec(&envelope).unwrap());
751+
752+
let parsed = crate::nats::ParsedClientSubject {
753+
session_id,
754+
method: ClientMethod::TerminalOutput,
755+
};
756+
757+
let ctx = DispatchContext {
758+
nats: &nats,
759+
client: &client,
760+
serializer: &serializer,
761+
};
762+
dispatch_client_method(
763+
"acp.sess-1.client.terminal.output",
764+
parsed,
765+
payload,
766+
Some("_INBOX.reply".to_string()),
767+
&ctx,
768+
)
769+
.await;
770+
771+
assert_eq!(nats.published_messages(), vec!["_INBOX.reply"]);
772+
}
773+
580774
#[tokio::test]
581775
async fn dispatch_client_method_terminal_kill_no_reply_does_not_call_client_or_publish() {
582776
let nats = MockNatsClient::new();
@@ -753,6 +947,16 @@ mod tests {
753947
"mock kill_terminal_command failure",
754948
))
755949
}
950+
951+
async fn terminal_output(
952+
&self,
953+
_: TerminalOutputRequest,
954+
) -> agent_client_protocol::Result<TerminalOutputResponse> {
955+
Err(agent_client_protocol::Error::new(
956+
-32603,
957+
"mock terminal_output failure",
958+
))
959+
}
756960
}
757961

758962
#[tokio::test]
@@ -1259,6 +1463,16 @@ mod tests {
12591463
) -> agent_client_protocol::Result<ReadTextFileResponse> {
12601464
Ok(ReadTextFileResponse::new("file contents".to_string()))
12611465
}
1466+
1467+
async fn terminal_output(
1468+
&self,
1469+
_: TerminalOutputRequest,
1470+
) -> agent_client_protocol::Result<TerminalOutputResponse> {
1471+
Ok(TerminalOutputResponse::new(
1472+
"rpc mock output".to_string(),
1473+
false,
1474+
))
1475+
}
12621476
}
12631477

12641478
#[tokio::test]

0 commit comments

Comments
 (0)