Skip to content

Commit 3d534f9

Browse files
committed
feat(acp-nats): add fs_write_text_file client handler
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent 0d6ed26 commit 3d534f9

5 files changed

Lines changed: 434 additions & 1 deletion

File tree

Lines changed: 271 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,271 @@
1+
use crate::client::rpc_reply;
2+
use crate::jsonrpc::extract_request_id;
3+
use crate::nats::{FlushClient, PublishClient};
4+
use agent_client_protocol::{
5+
Client, ErrorCode, Request, Response, WriteTextFileRequest, WriteTextFileResponse,
6+
};
7+
use bytes::Bytes;
8+
use serde::de::Error as SerdeDeError;
9+
use tracing::{instrument, warn};
10+
use trogon_std::JsonSerialize;
11+
12+
#[derive(Debug)]
13+
pub enum FsWriteTextFileError {
14+
InvalidRequest(serde_json::Error),
15+
ClientError(agent_client_protocol::Error),
16+
}
17+
18+
impl std::fmt::Display for FsWriteTextFileError {
19+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
20+
match self {
21+
Self::InvalidRequest(e) => write!(f, "invalid request: {}", e),
22+
Self::ClientError(e) => write!(f, "client error: {}", e),
23+
}
24+
}
25+
}
26+
27+
impl std::error::Error for FsWriteTextFileError {
28+
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
29+
match self {
30+
Self::InvalidRequest(e) => Some(e),
31+
Self::ClientError(e) => Some(e),
32+
}
33+
}
34+
}
35+
36+
pub fn error_code_and_message(e: &FsWriteTextFileError) -> (ErrorCode, String) {
37+
match e {
38+
FsWriteTextFileError::InvalidRequest(inner) => (
39+
ErrorCode::InvalidParams,
40+
format!("Invalid write_text_file request: {}", inner),
41+
),
42+
FsWriteTextFileError::ClientError(inner) => (inner.code, inner.message.clone()),
43+
}
44+
}
45+
46+
/// Handles write_text_file: parses request, calls client, wraps response in JSON-RPC envelope,
47+
/// and publishes to reply subject. Reply is required (request-reply pattern).
48+
#[instrument(
49+
name = "acp.client.fs.write_text_file",
50+
skip(payload, client, nats, serializer)
51+
)]
52+
pub async fn handle<N: PublishClient + FlushClient, C: Client, S: JsonSerialize>(
53+
payload: &[u8],
54+
client: &C,
55+
reply: Option<&str>,
56+
nats: &N,
57+
session_id: &str,
58+
serializer: &S,
59+
) {
60+
let reply_to = match reply {
61+
Some(r) => r,
62+
None => {
63+
warn!(
64+
session_id = %session_id,
65+
"write_text_file requires reply subject; ignoring message"
66+
);
67+
return;
68+
}
69+
};
70+
71+
let request_id = extract_request_id(payload);
72+
match forward_to_client(payload, client, session_id).await {
73+
Ok(response) => {
74+
let (response_bytes, content_type) = serializer
75+
.to_vec(&Response::Result {
76+
id: request_id.clone(),
77+
result: response,
78+
})
79+
.map(|v| (Bytes::from(v), rpc_reply::CONTENT_TYPE_JSON))
80+
.unwrap_or_else(|e| {
81+
warn!(error = %e, "JSON serialization of response failed, sending error reply");
82+
rpc_reply::error_response_bytes(
83+
serializer,
84+
request_id,
85+
ErrorCode::InternalError,
86+
&format!("Failed to serialize response: {}", e),
87+
)
88+
});
89+
rpc_reply::publish_reply(
90+
nats,
91+
reply_to,
92+
response_bytes,
93+
content_type,
94+
"fs_write_text_file reply",
95+
)
96+
.await;
97+
}
98+
Err(e) => {
99+
let (code, message) = error_code_and_message(&e);
100+
warn!(
101+
error = %e,
102+
session_id = %session_id,
103+
"Failed to handle fs_write_text_file"
104+
);
105+
let (bytes, content_type) =
106+
rpc_reply::error_response_bytes(serializer, request_id, code, &message);
107+
rpc_reply::publish_reply(
108+
nats,
109+
reply_to,
110+
bytes,
111+
content_type,
112+
"fs_write_text_file error reply",
113+
)
114+
.await;
115+
}
116+
}
117+
}
118+
119+
async fn forward_to_client<C: Client>(
120+
payload: &[u8],
121+
client: &C,
122+
expected_session_id: &str,
123+
) -> Result<WriteTextFileResponse, FsWriteTextFileError> {
124+
let envelope: Request<WriteTextFileRequest> =
125+
serde_json::from_slice(payload).map_err(FsWriteTextFileError::InvalidRequest)?;
126+
let request = envelope.params.ok_or_else(|| {
127+
FsWriteTextFileError::InvalidRequest(serde_json::Error::custom("params is null or missing"))
128+
})?;
129+
let params_session_id = request.session_id.to_string();
130+
if params_session_id != expected_session_id {
131+
return Err(FsWriteTextFileError::InvalidRequest(
132+
serde_json::Error::custom(format!(
133+
"params.sessionId ({}) does not match subject session id ({})",
134+
params_session_id, expected_session_id
135+
)),
136+
));
137+
}
138+
client
139+
.write_text_file(request)
140+
.await
141+
.map_err(FsWriteTextFileError::ClientError)
142+
}
143+
144+
#[cfg(test)]
145+
mod tests {
146+
use super::*;
147+
use agent_client_protocol::{
148+
ReadTextFileRequest, ReadTextFileResponse, Request, RequestId, RequestPermissionRequest,
149+
RequestPermissionResponse, SessionNotification,
150+
};
151+
use async_trait::async_trait;
152+
use trogon_nats::MockNatsClient;
153+
use trogon_std::StdJsonSerialize;
154+
155+
struct FailingClient;
156+
157+
#[async_trait(?Send)]
158+
impl Client for FailingClient {
159+
async fn session_notification(
160+
&self,
161+
_: SessionNotification,
162+
) -> agent_client_protocol::Result<()> {
163+
Ok(())
164+
}
165+
166+
async fn request_permission(
167+
&self,
168+
_: RequestPermissionRequest,
169+
) -> agent_client_protocol::Result<RequestPermissionResponse> {
170+
Err(agent_client_protocol::Error::new(
171+
-32603,
172+
"not implemented in test mock",
173+
))
174+
}
175+
176+
async fn read_text_file(
177+
&self,
178+
_: ReadTextFileRequest,
179+
) -> agent_client_protocol::Result<ReadTextFileResponse> {
180+
Err(agent_client_protocol::Error::new(
181+
-32603,
182+
"not implemented in test mock",
183+
))
184+
}
185+
186+
async fn write_text_file(
187+
&self,
188+
_: WriteTextFileRequest,
189+
) -> agent_client_protocol::Result<WriteTextFileResponse> {
190+
Err(agent_client_protocol::Error::new(
191+
i32::from(ErrorCode::InvalidParams),
192+
"permission denied",
193+
))
194+
}
195+
}
196+
197+
#[tokio::test]
198+
async fn handle_client_error_publishes_error_reply_with_matching_id() {
199+
let nats = MockNatsClient::new();
200+
let client = FailingClient;
201+
let envelope = Request {
202+
id: RequestId::Number(99),
203+
method: std::sync::Arc::from("fs/write_text_file"),
204+
params: Some(WriteTextFileRequest::new(
205+
agent_client_protocol::SessionId::from("sess-1"),
206+
"/forbidden.txt".to_string(),
207+
"content".to_string(),
208+
)),
209+
};
210+
let payload = serde_json::to_vec(&envelope).unwrap();
211+
212+
handle(
213+
&payload,
214+
&client,
215+
Some("_INBOX.err"),
216+
&nats,
217+
"sess-1",
218+
&StdJsonSerialize,
219+
)
220+
.await;
221+
222+
assert_eq!(nats.published_messages(), vec!["_INBOX.err"]);
223+
let payloads = nats.published_payloads();
224+
assert_eq!(payloads.len(), 1);
225+
let response: serde_json::Value = serde_json::from_slice(payloads[0].as_ref()).unwrap();
226+
assert_eq!(
227+
response["id"], 99,
228+
"error response must preserve request id"
229+
);
230+
assert!(
231+
response.get("error").is_some(),
232+
"error response must have error field"
233+
);
234+
assert_eq!(
235+
response["error"]["code"],
236+
i32::from(ErrorCode::InvalidParams)
237+
);
238+
}
239+
240+
#[tokio::test]
241+
async fn handle_invalid_payload_publishes_error_reply() {
242+
let nats = MockNatsClient::new();
243+
let client = FailingClient;
244+
245+
handle(
246+
b"not json",
247+
&client,
248+
Some("_INBOX.err"),
249+
&nats,
250+
"sess-1",
251+
&StdJsonSerialize,
252+
)
253+
.await;
254+
255+
assert_eq!(nats.published_messages(), vec!["_INBOX.err"]);
256+
let payloads = nats.published_payloads();
257+
assert_eq!(payloads.len(), 1);
258+
let response: serde_json::Value = serde_json::from_slice(payloads[0].as_ref()).unwrap();
259+
assert!(response.get("error").is_some());
260+
}
261+
262+
#[test]
263+
fn error_code_and_message_client_error_preserves_client_code() {
264+
let client_err =
265+
agent_client_protocol::Error::new(ErrorCode::InvalidParams.into(), "permission denied");
266+
let fs_err = FsWriteTextFileError::ClientError(client_err);
267+
let (code, message) = error_code_and_message(&fs_err);
268+
assert_eq!(code, ErrorCode::InvalidParams);
269+
assert_eq!(message, "permission denied");
270+
}
271+
}

0 commit comments

Comments
 (0)