Skip to content

Commit 54a4b13

Browse files
committed
feat(acp-nats): add fs_write_text_file client handler
1 parent 0d6ed26 commit 54a4b13

6 files changed

Lines changed: 345 additions & 1 deletion

File tree

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
use crate::client::rpc_reply;
2+
use crate::jsonrpc::extract_request_id;
3+
use crate::nats::{FlushClient, PublishClient};
4+
use agent_client_protocol::{
5+
Client, ErrorCode, Request, Response, WriteTextFileRequest, WriteTextFileResponse,
6+
};
7+
use bytes::Bytes;
8+
use serde::de::Error as SerdeDeError;
9+
use tracing::{instrument, warn};
10+
use trogon_std::JsonSerialize;
11+
12+
#[derive(Debug)]
13+
pub enum FsWriteTextFileError {
14+
InvalidRequest(serde_json::Error),
15+
ClientError(agent_client_protocol::Error),
16+
}
17+
18+
impl std::fmt::Display for FsWriteTextFileError {
19+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
20+
match self {
21+
Self::InvalidRequest(e) => write!(f, "invalid request: {}", e),
22+
Self::ClientError(e) => write!(f, "client error: {}", e),
23+
}
24+
}
25+
}
26+
27+
impl std::error::Error for FsWriteTextFileError {
28+
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
29+
match self {
30+
Self::InvalidRequest(e) => Some(e),
31+
Self::ClientError(e) => Some(e),
32+
}
33+
}
34+
}
35+
36+
pub fn error_code_and_message(e: &FsWriteTextFileError) -> (ErrorCode, String) {
37+
match e {
38+
FsWriteTextFileError::InvalidRequest(inner) => (
39+
ErrorCode::InvalidParams,
40+
format!("Invalid write_text_file request: {}", inner),
41+
),
42+
FsWriteTextFileError::ClientError(inner) => (inner.code, inner.message.clone()),
43+
}
44+
}
45+
46+
/// Handles write_text_file: parses request, calls client, wraps response in JSON-RPC envelope,
47+
/// and publishes to reply subject. Reply is required (request-reply pattern).
48+
#[instrument(
49+
name = "acp.client.fs.write_text_file",
50+
skip(payload, client, nats, serializer)
51+
)]
52+
pub async fn handle<N: PublishClient + FlushClient, C: Client, S: JsonSerialize>(
53+
payload: &[u8],
54+
client: &C,
55+
reply: Option<&str>,
56+
nats: &N,
57+
session_id: &str,
58+
serializer: &S,
59+
max_payload_bytes: usize,
60+
) {
61+
let reply_to = match reply {
62+
Some(r) => r,
63+
None => {
64+
warn!(
65+
session_id = %session_id,
66+
"write_text_file requires reply subject; ignoring message"
67+
);
68+
return;
69+
}
70+
};
71+
72+
let request_id = extract_request_id(payload);
73+
match forward_to_client(payload, client).await {
74+
Ok(response) => {
75+
let (response_bytes, content_type) = serializer
76+
.to_vec(&Response::Result {
77+
id: request_id.clone(),
78+
result: response,
79+
})
80+
.map(|v| (Bytes::from(v), rpc_reply::CONTENT_TYPE_JSON))
81+
.unwrap_or_else(|e| {
82+
warn!(error = %e, "JSON serialization of response failed, sending error reply");
83+
rpc_reply::error_response_bytes(
84+
serializer,
85+
request_id.clone(),
86+
ErrorCode::InternalError,
87+
&format!("Failed to serialize response: {}", e),
88+
)
89+
});
90+
if response_bytes.len() > max_payload_bytes {
91+
warn!(
92+
size = response_bytes.len(),
93+
max = max_payload_bytes,
94+
"write_text_file response exceeds NATS max payload"
95+
);
96+
let (bytes, content_type) = rpc_reply::error_response_bytes(
97+
serializer,
98+
request_id,
99+
ErrorCode::InternalError,
100+
&format!(
101+
"write_text_file response exceeds NATS max payload: {} > {}",
102+
response_bytes.len(),
103+
max_payload_bytes
104+
),
105+
);
106+
rpc_reply::publish_reply(
107+
nats,
108+
reply_to,
109+
bytes,
110+
content_type,
111+
"fs_write_text_file reply",
112+
)
113+
.await;
114+
} else {
115+
rpc_reply::publish_reply(
116+
nats,
117+
reply_to,
118+
response_bytes,
119+
content_type,
120+
"fs_write_text_file reply",
121+
)
122+
.await;
123+
}
124+
}
125+
Err(e) => {
126+
let (code, message) = error_code_and_message(&e);
127+
warn!(
128+
error = %e,
129+
session_id = %session_id,
130+
"Failed to handle fs_write_text_file"
131+
);
132+
let (bytes, content_type) =
133+
rpc_reply::error_response_bytes(serializer, request_id, code, &message);
134+
rpc_reply::publish_reply(
135+
nats,
136+
reply_to,
137+
bytes,
138+
content_type,
139+
"fs_write_text_file error reply",
140+
)
141+
.await;
142+
}
143+
}
144+
}
145+
146+
async fn forward_to_client<C: Client>(
147+
payload: &[u8],
148+
client: &C,
149+
) -> Result<WriteTextFileResponse, FsWriteTextFileError> {
150+
let envelope: Request<WriteTextFileRequest> =
151+
serde_json::from_slice(payload).map_err(FsWriteTextFileError::InvalidRequest)?;
152+
let request = envelope.params.ok_or_else(|| {
153+
FsWriteTextFileError::InvalidRequest(serde_json::Error::custom("params is null or missing"))
154+
})?;
155+
client
156+
.write_text_file(request)
157+
.await
158+
.map_err(FsWriteTextFileError::ClientError)
159+
}

rsworkspace/crates/acp-nats/src/client/mod.rs

Lines changed: 137 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
pub(crate) mod fs_read_text_file;
2+
pub(crate) mod fs_write_text_file;
23
pub(crate) mod request_permission;
34
pub(crate) mod rpc_reply;
45
pub(crate) mod session_update;
@@ -196,6 +197,18 @@ async fn dispatch_client_method<
196197
)
197198
.await;
198199
}
200+
ClientMethod::FsWriteTextFile => {
201+
fs_write_text_file::handle(
202+
&payload,
203+
ctx.client,
204+
reply.as_deref(),
205+
ctx.nats,
206+
parsed.session_id.as_str(),
207+
ctx.serializer,
208+
ctx.bridge.config.max_nats_payload_bytes(),
209+
)
210+
.await;
211+
}
199212
ClientMethod::SessionRequestPermission => {
200213
request_permission::handle(
201214
&payload,
@@ -280,7 +293,7 @@ mod tests {
280293
RequestPermissionOutcome, RequestPermissionRequest, RequestPermissionResponse,
281294
SessionNotification, SessionUpdate, TerminalExitStatus, TerminalOutputRequest,
282295
TerminalOutputResponse, ToolCallUpdate, ToolCallUpdateFields, WaitForTerminalExitRequest,
283-
WaitForTerminalExitResponse,
296+
WaitForTerminalExitResponse, WriteTextFileRequest, WriteTextFileResponse,
284297
};
285298
use async_trait::async_trait;
286299
use std::cell::RefCell;
@@ -351,6 +364,13 @@ mod tests {
351364
Ok(ReadTextFileResponse::new("mock file content".to_string()))
352365
}
353366

367+
async fn write_text_file(
368+
&self,
369+
_: WriteTextFileRequest,
370+
) -> agent_client_protocol::Result<WriteTextFileResponse> {
371+
Ok(WriteTextFileResponse::new())
372+
}
373+
354374
async fn create_terminal(
355375
&self,
356376
_: CreateTerminalRequest,
@@ -568,6 +588,87 @@ mod tests {
568588
assert_eq!(nats.published_messages(), vec!["_INBOX.reply"]);
569589
}
570590

591+
#[tokio::test]
592+
async fn dispatch_client_method_dispatches_fs_write_text_file() {
593+
let nats = MockNatsClient::new();
594+
let client = MockClient::new();
595+
let session_id = AcpSessionId::new("sess-1").unwrap();
596+
597+
let envelope = Request {
598+
id: RequestId::Number(42),
599+
method: std::sync::Arc::from("fs/write_text_file"),
600+
params: Some(WriteTextFileRequest::new(
601+
agent_client_protocol::SessionId::from("sess-1"),
602+
"/tmp/test.txt".to_string(),
603+
"content".to_string(),
604+
)),
605+
};
606+
let payload = bytes::Bytes::from(serde_json::to_vec(&envelope).unwrap());
607+
608+
let parsed = crate::nats::ParsedClientSubject {
609+
session_id,
610+
method: ClientMethod::FsWriteTextFile,
611+
};
612+
613+
let bridge = make_bridge(nats.clone());
614+
let ctx = DispatchContext {
615+
nats: &nats,
616+
client: &client,
617+
bridge: &bridge,
618+
serializer: &StdJsonSerialize,
619+
};
620+
dispatch_client_method(
621+
"acp.sess-1.client.fs.write_text_file",
622+
parsed,
623+
payload,
624+
Some("_INBOX.reply".to_string()),
625+
&ctx,
626+
)
627+
.await;
628+
629+
assert_eq!(nats.published_messages(), vec!["_INBOX.reply"]);
630+
let payloads = nats.published_payloads();
631+
assert_eq!(payloads.len(), 1);
632+
let response: serde_json::Value =
633+
serde_json::from_slice(payloads[0].as_ref()).expect("response should be valid JSON");
634+
assert_eq!(response["id"], 42, "response must be JSON-RPC envelope with matching id");
635+
assert!(response.get("result").is_some(), "success response must have result field");
636+
}
637+
638+
#[tokio::test]
639+
async fn fs_write_text_file_round_trip() {
640+
let nats = MockNatsClient::new();
641+
let client = MockClient::new();
642+
let envelope = Request {
643+
id: RequestId::Number(1),
644+
method: std::sync::Arc::from("fs/write_text_file"),
645+
params: Some(WriteTextFileRequest::new(
646+
agent_client_protocol::SessionId::from("session-001"),
647+
"/tmp/test.txt".to_string(),
648+
"content".to_string(),
649+
)),
650+
};
651+
let payload = serde_json::to_vec(&envelope).unwrap();
652+
653+
fs_write_text_file::handle(
654+
&payload,
655+
&client,
656+
Some("_INBOX.reply"),
657+
&nats,
658+
"session-001",
659+
&StdJsonSerialize,
660+
1_048_576,
661+
)
662+
.await;
663+
664+
assert_eq!(nats.published_messages(), vec!["_INBOX.reply"]);
665+
let payloads = nats.published_payloads();
666+
assert_eq!(payloads.len(), 1);
667+
let response: serde_json::Value = serde_json::from_slice(payloads[0].as_ref()).unwrap();
668+
assert_eq!(response["id"], 1);
669+
assert!(response.get("result").is_some());
670+
}
671+
571672
#[tokio::test]
572673
async fn dispatch_client_method_dispatches_terminal_create() {
573674
let nats = MockNatsClient::new();
@@ -2354,6 +2455,13 @@ mod tests {
23542455
Ok(ReadTextFileResponse::new("mock file content".to_string()))
23552456
}
23562457

2458+
async fn write_text_file(
2459+
&self,
2460+
_: WriteTextFileRequest,
2461+
) -> agent_client_protocol::Result<WriteTextFileResponse> {
2462+
Ok(WriteTextFileResponse::new())
2463+
}
2464+
23572465
async fn create_terminal(
23582466
&self,
23592467
_: CreateTerminalRequest,
@@ -2421,6 +2529,13 @@ mod tests {
24212529
Ok(ReadTextFileResponse::new("mock file content".to_string()))
24222530
}
24232531

2532+
async fn write_text_file(
2533+
&self,
2534+
_: WriteTextFileRequest,
2535+
) -> agent_client_protocol::Result<WriteTextFileResponse> {
2536+
Ok(WriteTextFileResponse::new())
2537+
}
2538+
24242539
async fn create_terminal(
24252540
&self,
24262541
_: CreateTerminalRequest,
@@ -2494,6 +2609,13 @@ mod tests {
24942609
Ok(ReadTextFileResponse::new("mock file content".to_string()))
24952610
}
24962611

2612+
async fn write_text_file(
2613+
&self,
2614+
_: WriteTextFileRequest,
2615+
) -> agent_client_protocol::Result<WriteTextFileResponse> {
2616+
Ok(WriteTextFileResponse::new())
2617+
}
2618+
24972619
async fn create_terminal(
24982620
&self,
24992621
_: CreateTerminalRequest,
@@ -2565,6 +2687,13 @@ mod tests {
25652687
Ok(ReadTextFileResponse::new("mock file content".to_string()))
25662688
}
25672689

2690+
async fn write_text_file(
2691+
&self,
2692+
_: WriteTextFileRequest,
2693+
) -> agent_client_protocol::Result<WriteTextFileResponse> {
2694+
Ok(WriteTextFileResponse::new())
2695+
}
2696+
25682697
async fn create_terminal(
25692698
&self,
25702699
_: CreateTerminalRequest,
@@ -3136,6 +3265,13 @@ mod tests {
31363265
Ok(ReadTextFileResponse::new("file contents".to_string()))
31373266
}
31383267

3268+
async fn write_text_file(
3269+
&self,
3270+
_: WriteTextFileRequest,
3271+
) -> agent_client_protocol::Result<WriteTextFileResponse> {
3272+
Ok(WriteTextFileResponse::new())
3273+
}
3274+
31393275
async fn terminal_output(
31403276
&self,
31413277
_: TerminalOutputRequest,

0 commit comments

Comments
 (0)