Skip to content

Commit 608f52e

Browse files
committed
feat(acp-nats): add fs_read_text_file client handler
1 parent c4beead commit 608f52e

5 files changed

Lines changed: 273 additions & 4 deletions

File tree

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
use agent_client_protocol::{Client, ErrorCode, ReadTextFileRequest};
2+
use tracing::instrument;
3+
4+
#[derive(Debug)]
5+
pub enum FsReadTextFileError {
6+
InvalidRequest(serde_json::Error),
7+
ClientError(agent_client_protocol::Error),
8+
SerializationError(serde_json::Error),
9+
}
10+
11+
impl std::fmt::Display for FsReadTextFileError {
12+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
13+
match self {
14+
Self::InvalidRequest(e) => write!(f, "invalid request: {}", e),
15+
Self::ClientError(e) => write!(f, "client error: {}", e),
16+
Self::SerializationError(e) => write!(f, "serialization error: {}", e),
17+
}
18+
}
19+
}
20+
21+
impl std::error::Error for FsReadTextFileError {
22+
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
23+
match self {
24+
Self::InvalidRequest(e) => Some(e),
25+
Self::ClientError(e) => Some(e),
26+
Self::SerializationError(e) => Some(e),
27+
}
28+
}
29+
}
30+
31+
pub fn error_code_and_message(e: &FsReadTextFileError) -> (ErrorCode, String) {
32+
match e {
33+
FsReadTextFileError::InvalidRequest(inner) => (
34+
ErrorCode::InvalidParams,
35+
format!("Invalid read_text_file request: {}", inner),
36+
),
37+
FsReadTextFileError::ClientError(inner) => (inner.code.clone(), inner.message.clone()),
38+
FsReadTextFileError::SerializationError(inner) => (
39+
ErrorCode::InternalError,
40+
format!("Failed to serialize read_text_file response: {}", inner),
41+
),
42+
}
43+
}
44+
45+
/// Forwards read_text_file to the client. NATS enforces payload limits when publishing.
46+
#[instrument(name = "acp.client.fs.read_text_file", skip(payload, client))]
47+
pub async fn handle<C: Client>(
48+
payload: &[u8],
49+
client: &C,
50+
) -> Result<Vec<u8>, FsReadTextFileError> {
51+
let request: ReadTextFileRequest =
52+
serde_json::from_slice(payload).map_err(FsReadTextFileError::InvalidRequest)?;
53+
let response = client
54+
.read_text_file(request)
55+
.await
56+
.map_err(FsReadTextFileError::ClientError)?;
57+
serde_json::to_vec(&response).map_err(FsReadTextFileError::SerializationError)
58+
}
59+
60+
#[cfg(test)]
61+
mod tests {
62+
use super::*;
63+
use agent_client_protocol::{ReadTextFileRequest, ReadTextFileResponse};
64+
use agent_client_protocol::RequestPermissionRequest;
65+
use agent_client_protocol::RequestPermissionResponse;
66+
use agent_client_protocol::SessionNotification;
67+
use async_trait::async_trait;
68+
69+
struct MockClient {
70+
content: String,
71+
}
72+
73+
impl MockClient {
74+
fn new(content: &str) -> Self {
75+
Self {
76+
content: content.to_string(),
77+
}
78+
}
79+
}
80+
81+
#[async_trait(?Send)]
82+
impl Client for MockClient {
83+
async fn session_notification(
84+
&self,
85+
_: SessionNotification,
86+
) -> agent_client_protocol::Result<()> {
87+
Ok(())
88+
}
89+
90+
async fn request_permission(
91+
&self,
92+
_: RequestPermissionRequest,
93+
) -> agent_client_protocol::Result<RequestPermissionResponse> {
94+
Err(agent_client_protocol::Error::new(
95+
-32603,
96+
"not implemented in test mock",
97+
))
98+
}
99+
100+
async fn read_text_file(
101+
&self,
102+
_: ReadTextFileRequest,
103+
) -> agent_client_protocol::Result<ReadTextFileResponse> {
104+
Ok(ReadTextFileResponse::new(self.content.clone()))
105+
}
106+
}
107+
108+
#[tokio::test]
109+
async fn fs_read_text_file_forwards_request_and_returns_response() {
110+
let client = MockClient::new("hello world");
111+
let request = ReadTextFileRequest::new(
112+
agent_client_protocol::SessionId::from("sess-1"),
113+
"/tmp/foo.txt".to_string(),
114+
);
115+
let payload = serde_json::to_vec(&request).unwrap();
116+
117+
let result = handle(&payload, &client).await;
118+
assert!(result.is_ok());
119+
let response = serde_json::from_slice::<ReadTextFileResponse>(&result.unwrap()).unwrap();
120+
assert_eq!(response.content, "hello world");
121+
}
122+
123+
#[tokio::test]
124+
async fn fs_read_text_file_returns_error_when_payload_is_invalid_json() {
125+
let client = MockClient::new("hello");
126+
let result = handle(b"not json", &client).await;
127+
assert!(result.is_err());
128+
}
129+
130+
#[test]
131+
fn error_code_and_message_invalid_request_returns_invalid_params() {
132+
let err = serde_json::from_slice::<ReadTextFileRequest>(b"not json")
133+
.unwrap_err();
134+
let fs_err = FsReadTextFileError::InvalidRequest(err);
135+
let (code, message) = error_code_and_message(&fs_err);
136+
assert_eq!(code, ErrorCode::InvalidParams);
137+
assert!(message.contains("Invalid read_text_file request"));
138+
}
139+
140+
#[test]
141+
fn error_code_and_message_client_error_preserves_client_code() {
142+
let client_err = agent_client_protocol::Error::new(
143+
ErrorCode::InvalidParams.into(),
144+
"file not found",
145+
);
146+
let fs_err = FsReadTextFileError::ClientError(client_err);
147+
let (code, message) = error_code_and_message(&fs_err);
148+
assert_eq!(code, ErrorCode::InvalidParams);
149+
assert_eq!(message, "file not found");
150+
}
151+
152+
#[test]
153+
fn error_code_and_message_serialization_error_returns_internal_error() {
154+
struct Unserializable;
155+
impl serde::Serialize for Unserializable {
156+
fn serialize<S: serde::Serializer>(
157+
&self,
158+
_: S,
159+
) -> Result<S::Ok, S::Error> {
160+
Err(serde::ser::Error::custom("test serialization failure"))
161+
}
162+
}
163+
let err = serde_json::to_vec(&Unserializable).unwrap_err();
164+
let fs_err = FsReadTextFileError::SerializationError(err);
165+
let (code, message) = error_code_and_message(&fs_err);
166+
assert_eq!(code, ErrorCode::InternalError);
167+
assert!(message.contains("Failed to serialize read_text_file response"));
168+
}
169+
}

rsworkspace/crates/acp-nats/src/client/mod.rs

Lines changed: 81 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
1+
pub(crate) mod fs_read_text_file;
12
pub(crate) mod session_update;
23

34
use crate::agent::Bridge;
45
use crate::in_flight_slot_guard::InFlightSlotGuard;
56
use crate::nats::{
67
ClientMethod, FlushClient, PublishClient, RequestClient, SubscribeClient, client,
7-
parse_client_subject,
8+
headers_with_trace_context, parse_client_subject,
89
};
910
use agent_client_protocol::Client;
11+
use agent_client_protocol::ErrorCode;
1012
use async_nats::Message;
1113
use bytes::Bytes;
1214
use futures::StreamExt;
@@ -100,6 +102,7 @@ async fn process_message<
100102
}
101103

102104
let payload = msg.payload.clone();
105+
let reply = msg.reply.as_ref().map(|r| r.to_string());
103106
let nats = nats.clone();
104107

105108
let bridge_clone = bridge.clone();
@@ -110,6 +113,7 @@ async fn process_message<
110113
&subject,
111114
parsed,
112115
payload,
116+
reply,
113117
&nats,
114118
client.as_ref(),
115119
bridge_clone.as_ref(),
@@ -118,7 +122,39 @@ async fn process_message<
118122
});
119123
}
120124

121-
#[instrument(skip(payload, _nats, client, _bridge), fields(subject = %subject, session_id = tracing::field::Empty))]
125+
fn jsonrpc_error_response(
126+
request_id: serde_json::Value,
127+
code: ErrorCode,
128+
message: &str,
129+
) -> bytes::Bytes {
130+
let response = serde_json::json!({
131+
"jsonrpc": "2.0",
132+
"id": request_id,
133+
"error": {
134+
"code": i32::from(code),
135+
"message": message
136+
}
137+
});
138+
serde_json::to_vec(&response)
139+
.unwrap_or_else(|e| {
140+
format!(
141+
"{{\"jsonrpc\":\"2.0\",\"id\":null,\"error\":{{\"code\":{},\"message\":\"failed to serialize error response: {}\"}}}}",
142+
i32::from(code),
143+
e
144+
)
145+
.into_bytes()
146+
})
147+
.into()
148+
}
149+
150+
fn extract_request_id(payload: &[u8]) -> serde_json::Value {
151+
serde_json::from_slice::<serde_json::Value>(payload)
152+
.ok()
153+
.and_then(|v| v.get("id").cloned())
154+
.unwrap_or(serde_json::Value::Null)
155+
}
156+
157+
#[instrument(skip(payload, nats, client, _bridge), fields(subject = %subject, session_id = tracing::field::Empty))]
122158
async fn dispatch_client_method<
123159
N: SubscribeClient + RequestClient + PublishClient + FlushClient,
124160
Cl: Client,
@@ -127,13 +163,54 @@ async fn dispatch_client_method<
127163
subject: &str,
128164
parsed: crate::nats::ParsedClientSubject,
129165
payload: Bytes,
130-
_nats: &N,
166+
reply: Option<String>,
167+
nats: &N,
131168
client: &Cl,
132169
_bridge: &Bridge<N, C>,
133170
) {
134171
Span::current().record("session_id", parsed.session_id.as_str());
135172

136173
match parsed.method {
174+
ClientMethod::FsReadTextFile => {
175+
let request_id = extract_request_id(&payload);
176+
match fs_read_text_file::handle(&payload, client).await {
177+
Ok(bytes) => {
178+
if let Some(reply_to) = &reply {
179+
let headers = headers_with_trace_context();
180+
if let Err(e) = nats
181+
.publish_with_headers(reply_to.clone(), headers, bytes.into())
182+
.await
183+
{
184+
error!(error = %e, "Failed to publish fs_read_text_file reply");
185+
}
186+
if let Err(e) = nats.flush().await {
187+
warn!(error = %e, "Failed to flush fs_read_text_file reply");
188+
}
189+
}
190+
}
191+
Err(e) => {
192+
let (code, message) = fs_read_text_file::error_code_and_message(&e);
193+
error!(
194+
error = %e,
195+
session_id = %parsed.session_id,
196+
"Failed to handle fs_read_text_file"
197+
);
198+
if let Some(reply_to) = &reply {
199+
let bytes = jsonrpc_error_response(request_id, code, &message);
200+
let headers = headers_with_trace_context();
201+
if let Err(e) = nats
202+
.publish_with_headers(reply_to.clone(), headers, bytes)
203+
.await
204+
{
205+
error!(error = %e, "Failed to publish fs_read_text_file error reply");
206+
}
207+
if let Err(e) = nats.flush().await {
208+
warn!(error = %e, "Failed to flush fs_read_text_file error reply");
209+
}
210+
}
211+
}
212+
}
213+
}
137214
ClientMethod::SessionUpdate => {
138215
session_update::handle(&payload, client, &parsed.session_id).await;
139216
}
@@ -286,6 +363,7 @@ mod tests {
286363
"acp.sess-1.client.session.update",
287364
parsed,
288365
payload,
366+
None,
289367
&nats,
290368
&client,
291369
&bridge,

rsworkspace/crates/acp-nats/src/nats/client_method.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
#[derive(Debug, Clone, PartialEq, Eq)]
22
pub enum ClientMethod {
3+
FsReadTextFile,
34
SessionUpdate,
45
}
56

67
impl ClientMethod {
78
pub fn from_subject_suffix(suffix: &str) -> Option<Self> {
89
match suffix {
10+
"client.fs.read_text_file" => Some(Self::FsReadTextFile),
911
"client.session.update" => Some(Self::SessionUpdate),
1012
_ => None,
1113
}

rsworkspace/crates/acp-nats/src/nats/parsing.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,14 @@ pub fn parse_client_subject(subject: &str) -> Option<ParsedClientSubject> {
2020
mod tests {
2121
use super::*;
2222

23+
#[test]
24+
fn test_parse_fs_read_text_file() {
25+
let subject = "acp.sess123.client.fs.read_text_file";
26+
let parsed = parse_client_subject(subject).unwrap();
27+
assert_eq!(parsed.session_id.as_str(), "sess123");
28+
assert_eq!(parsed.method, ClientMethod::FsReadTextFile);
29+
}
30+
2331
#[test]
2432
fn test_parse_session_update() {
2533
let subject = "acp.sess123.client.session.update";
@@ -62,7 +70,7 @@ mod tests {
6270

6371
#[test]
6472
fn test_parse_unknown_method() {
65-
assert!(parse_client_subject("acp.sess123.client.fs.read_text_file").is_none());
73+
assert!(parse_client_subject("acp.sess123.client.unknown.method").is_none());
6674
}
6775

6876
#[test]

rsworkspace/crates/acp-nats/src/nats/subjects.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ pub mod agent {
3939
}
4040

4141
pub mod client {
42+
pub fn fs_read_text_file(prefix: &str, session_id: &str) -> String {
43+
format!("{}.{}.client.fs.read_text_file", prefix, session_id)
44+
}
45+
4246
pub fn session_update(prefix: &str, session_id: &str) -> String {
4347
format!("{}.{}.client.session.update", prefix, session_id)
4448
}
@@ -55,6 +59,14 @@ mod tests {
5559
use super::agent;
5660
use super::client;
5761

62+
#[test]
63+
fn client_fs_read_text_file_subject() {
64+
assert_eq!(
65+
client::fs_read_text_file("acp", "s1"),
66+
"acp.s1.client.fs.read_text_file"
67+
);
68+
}
69+
5870
#[test]
5971
fn client_session_update_subject() {
6072
assert_eq!(

0 commit comments

Comments
 (0)