Skip to content

Commit 2161fa2

Browse files
committed
feat(acp-nats): add fs_read_text_file client handler
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent c4beead commit 2161fa2

8 files changed

Lines changed: 370 additions & 11 deletions

File tree

Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
use crate::jsonrpc::extract_request_id;
2+
use crate::nats::{FlushClient, PublishClient, headers_with_trace_context};
3+
use agent_client_protocol::{
4+
Client, Error, ErrorCode, ReadTextFileRequest, ReadTextFileResponse, Request, RequestId,
5+
Response,
6+
};
7+
use bytes::Bytes;
8+
use tracing::{instrument, warn};
9+
10+
async fn publish_reply<N: PublishClient + FlushClient>(
11+
nats: &N,
12+
reply_to: &str,
13+
bytes: Bytes,
14+
context: &str,
15+
) {
16+
let headers = headers_with_trace_context();
17+
if let Err(e) = nats
18+
.publish_with_headers(reply_to.to_string(), headers, bytes)
19+
.await
20+
{
21+
warn!(error = %e, "Failed to publish {}", context);
22+
}
23+
if let Err(e) = nats.flush().await {
24+
warn!(error = %e, "Failed to flush {}", context);
25+
}
26+
}
27+
28+
fn error_response_bytes(request_id: RequestId, code: ErrorCode, message: &str) -> Bytes {
29+
serde_json::to_vec(&Response::<()>::Error {
30+
id: request_id,
31+
error: Error::new(i32::from(code), message),
32+
})
33+
.unwrap_or_else(|e| {
34+
serde_json::to_vec(&Response::<()>::Error {
35+
id: RequestId::Null,
36+
error: Error::new(
37+
i32::from(code),
38+
format!("{} (serialization failed: {})", message, e),
39+
),
40+
})
41+
.unwrap()
42+
})
43+
.into()
44+
}
45+
46+
#[derive(Debug)]
47+
pub enum FsReadTextFileError {
48+
InvalidRequest(serde_json::Error),
49+
ClientError(agent_client_protocol::Error),
50+
}
51+
52+
impl std::fmt::Display for FsReadTextFileError {
53+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54+
match self {
55+
Self::InvalidRequest(e) => write!(f, "invalid request: {}", e),
56+
Self::ClientError(e) => write!(f, "client error: {}", e),
57+
}
58+
}
59+
}
60+
61+
impl std::error::Error for FsReadTextFileError {
62+
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
63+
match self {
64+
Self::InvalidRequest(e) => Some(e),
65+
Self::ClientError(e) => Some(e),
66+
}
67+
}
68+
}
69+
70+
pub fn error_code_and_message(e: &FsReadTextFileError) -> (ErrorCode, String) {
71+
match e {
72+
FsReadTextFileError::InvalidRequest(inner) => (
73+
ErrorCode::InvalidParams,
74+
format!("Invalid read_text_file request: {}", inner),
75+
),
76+
FsReadTextFileError::ClientError(inner) => (inner.code, inner.message.clone()),
77+
}
78+
}
79+
80+
/// Handles read_text_file: parses request, calls client, wraps response in JSON-RPC envelope,
81+
/// and publishes to reply subject. Reply is required (request-reply pattern).
82+
#[instrument(name = "acp.client.fs.read_text_file", skip(payload, client, nats))]
83+
pub async fn handle<N: PublishClient + FlushClient, C: Client>(
84+
payload: &[u8],
85+
client: &C,
86+
reply: Option<&str>,
87+
nats: &N,
88+
session_id: &str,
89+
) {
90+
let reply_to = match reply {
91+
Some(r) => r,
92+
None => {
93+
warn!(
94+
session_id = %session_id,
95+
"read_text_file requires reply subject; ignoring message"
96+
);
97+
return;
98+
}
99+
};
100+
101+
let request_id = extract_request_id(payload);
102+
match forward_to_client(payload, client).await {
103+
Ok(response) => {
104+
let response_bytes = serde_json::to_vec(&Response::Result {
105+
id: request_id.clone(),
106+
result: response,
107+
})
108+
.map(Bytes::from)
109+
.unwrap_or_else(|e| {
110+
error_response_bytes(
111+
request_id,
112+
ErrorCode::InternalError,
113+
&format!("Failed to serialize read_text_file response: {}", e),
114+
)
115+
});
116+
publish_reply(nats, reply_to, response_bytes, "fs_read_text_file reply").await;
117+
}
118+
Err(e) => {
119+
let (code, message) = error_code_and_message(&e);
120+
warn!(
121+
error = %e,
122+
session_id = %session_id,
123+
"Failed to handle fs_read_text_file"
124+
);
125+
let bytes = error_response_bytes(request_id, code, &message);
126+
publish_reply(nats, reply_to, bytes, "fs_read_text_file error reply").await;
127+
}
128+
}
129+
}
130+
131+
async fn forward_to_client<C: Client>(
132+
payload: &[u8],
133+
client: &C,
134+
) -> Result<ReadTextFileResponse, FsReadTextFileError> {
135+
let envelope: Request<ReadTextFileRequest> =
136+
serde_json::from_slice(payload).map_err(FsReadTextFileError::InvalidRequest)?;
137+
let request = envelope.params.ok_or_else(|| {
138+
FsReadTextFileError::InvalidRequest(
139+
serde_json::from_value::<ReadTextFileRequest>(serde_json::Value::Null).unwrap_err(),
140+
)
141+
})?;
142+
client
143+
.read_text_file(request)
144+
.await
145+
.map_err(FsReadTextFileError::ClientError)
146+
}
147+
148+
#[cfg(test)]
149+
mod tests {
150+
use super::*;
151+
use agent_client_protocol::{
152+
ReadTextFileRequest, ReadTextFileResponse, Request, RequestId, RequestPermissionRequest,
153+
RequestPermissionResponse, SessionNotification,
154+
};
155+
use async_trait::async_trait;
156+
157+
struct MockClient {
158+
content: String,
159+
}
160+
161+
impl MockClient {
162+
fn new(content: &str) -> Self {
163+
Self {
164+
content: content.to_string(),
165+
}
166+
}
167+
}
168+
169+
#[async_trait(?Send)]
170+
impl Client for MockClient {
171+
async fn session_notification(
172+
&self,
173+
_: SessionNotification,
174+
) -> agent_client_protocol::Result<()> {
175+
Ok(())
176+
}
177+
178+
async fn request_permission(
179+
&self,
180+
_: RequestPermissionRequest,
181+
) -> agent_client_protocol::Result<RequestPermissionResponse> {
182+
Err(agent_client_protocol::Error::new(
183+
-32603,
184+
"not implemented in test mock",
185+
))
186+
}
187+
188+
async fn read_text_file(
189+
&self,
190+
_: ReadTextFileRequest,
191+
) -> agent_client_protocol::Result<ReadTextFileResponse> {
192+
Ok(ReadTextFileResponse::new(self.content.clone()))
193+
}
194+
}
195+
196+
#[tokio::test]
197+
async fn fs_read_text_file_forwards_request_and_returns_response() {
198+
let client = MockClient::new("hello world");
199+
let request = ReadTextFileRequest::new(
200+
agent_client_protocol::SessionId::from("sess-1"),
201+
"/tmp/foo.txt".to_string(),
202+
);
203+
let envelope = Request {
204+
id: RequestId::Number(1),
205+
method: std::sync::Arc::from("fs/read_text_file"),
206+
params: Some(request),
207+
};
208+
let payload = serde_json::to_vec(&envelope).unwrap();
209+
210+
let result = forward_to_client(&payload, &client).await;
211+
assert!(result.is_ok());
212+
let response = result.unwrap();
213+
assert_eq!(response.content, "hello world");
214+
}
215+
216+
#[tokio::test]
217+
async fn fs_read_text_file_returns_error_when_payload_is_invalid_json() {
218+
let client = MockClient::new("hello");
219+
let result = forward_to_client(b"not json", &client).await;
220+
assert!(result.is_err());
221+
}
222+
223+
#[test]
224+
fn error_code_and_message_invalid_request_returns_invalid_params() {
225+
let err = serde_json::from_slice::<ReadTextFileRequest>(b"not json").unwrap_err();
226+
let fs_err = FsReadTextFileError::InvalidRequest(err);
227+
let (code, message) = error_code_and_message(&fs_err);
228+
assert_eq!(code, ErrorCode::InvalidParams);
229+
assert!(message.contains("Invalid read_text_file request"));
230+
}
231+
232+
#[test]
233+
fn error_code_and_message_client_error_preserves_client_code() {
234+
let client_err =
235+
agent_client_protocol::Error::new(ErrorCode::InvalidParams.into(), "file not found");
236+
let fs_err = FsReadTextFileError::ClientError(client_err);
237+
let (code, message) = error_code_and_message(&fs_err);
238+
assert_eq!(code, ErrorCode::InvalidParams);
239+
assert_eq!(message, "file not found");
240+
}
241+
}

0 commit comments

Comments
 (0)