Skip to content

Commit cfdea48

Browse files
authored
feat(acp-nats): minimal initialize-only ACP-NATS bridge (#10)
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent c5f80ac commit cfdea48

12 files changed

Lines changed: 933 additions & 7 deletions

File tree

rsworkspace/Cargo.lock

Lines changed: 306 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rsworkspace/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,6 @@ members = ["crates/*"]
66
lto = "thin"
77
strip = "symbols"
88
codegen-units = 1
9-
panic = "abort"
9+
# Note: panic = "abort" removed because Cargo doesn't support per-package panic overrides.
10+
# Bridge crates use catch_unwind for panic isolation, which requires panic = "unwind".
11+
# All workspace crates will use unwinding in release builds for this reason.

rsworkspace/crates/AGENTS.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
Prefer domain-specific value objects over primitives (e.g. `AcpPrefix` not `String`). Each type's factory must guarantee correctness at construction—invalid instances should be unrepresentable. Validate per-type, not per-aggregate: avoid validating unrelated fields together in a single constructor.
2+
13
You must use the `test-support` feature to share test helpers between crates.
24
Prefer one trait per operation over a single trait with multiple operations.
35

@@ -6,3 +8,7 @@ For NATS infrastructure and testing, use the `trogon-nats` crate which provides:
68
- Connection management with auto-reconnect
79
- Request/publish utilities with retry policies
810
- Mock NATS clients (via `test-support` feature)
11+
12+
## Module conventions
13+
14+
Place observability concerns (metrics, tracing spans, logging helpers) under a `telemetry` module within each crate. Example: `acp-nats/src/telemetry/metrics.rs`. This keeps observability code separated from domain logic and provides a consistent location across crates.
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
[package]
2+
name = "acp-nats"
3+
version = "0.1.0"
4+
edition = "2024"
5+
6+
[dependencies]
7+
agent-client-protocol = "0.9.4"
8+
async-nats = "0.45.0"
9+
async-trait = "0.1.89"
10+
serde = { version = "1.0.228", features = ["derive"] }
11+
serde_json = "1.0.149"
12+
tokio = { version = "1.49.0", features = ["rt", "macros", "sync", "time"] }
13+
tracing = "0.1.44"
14+
15+
trogon-nats = { path = "../trogon-nats" }
16+
17+
[dev-dependencies]
18+
trogon-nats = { path = "../trogon-nats", features = ["test-support"] }
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
use super::Bridge;
2+
use crate::error::AGENT_UNAVAILABLE;
3+
use crate::nats::{self, FlushClient, PublishClient, RequestClient, agent};
4+
use agent_client_protocol::{Error, ErrorCode, InitializeRequest, InitializeResponse, Result};
5+
use tracing::{info, instrument, warn};
6+
use trogon_nats::NatsError;
7+
8+
fn map_initialize_error(e: NatsError) -> Error {
9+
match &e {
10+
NatsError::Timeout { subject } => {
11+
warn!(subject = %subject, "initialize request timed out");
12+
Error::new(
13+
ErrorCode::Other(AGENT_UNAVAILABLE).into(),
14+
"Initialize request timed out; agent may be overloaded or unavailable",
15+
)
16+
}
17+
NatsError::Request { subject, error } => {
18+
warn!(subject = %subject, error = %error, "initialize NATS request failed");
19+
Error::new(
20+
ErrorCode::Other(AGENT_UNAVAILABLE).into(),
21+
format!("Agent unavailable: {}", error),
22+
)
23+
}
24+
NatsError::Serialize(inner) => {
25+
warn!(error = %inner, "failed to serialize initialize request");
26+
Error::new(
27+
ErrorCode::InternalError.into(),
28+
format!("Failed to serialize initialize request: {}", inner),
29+
)
30+
}
31+
NatsError::Deserialize(inner) => {
32+
warn!(error = %inner, "failed to deserialize initialize response");
33+
Error::new(
34+
ErrorCode::InternalError.into(),
35+
"Invalid response from agent",
36+
)
37+
}
38+
_ => {
39+
warn!(error = %e, "initialize NATS request failed");
40+
Error::new(ErrorCode::InternalError.into(), "Initialize request failed")
41+
}
42+
}
43+
}
44+
45+
#[instrument(
46+
name = "acp.initialize",
47+
skip(bridge, args),
48+
fields(protocol_version = ?args.protocol_version)
49+
)]
50+
pub async fn handle<N: RequestClient + PublishClient + FlushClient>(
51+
bridge: &Bridge<N>,
52+
args: InitializeRequest,
53+
) -> Result<InitializeResponse> {
54+
let client_name = args
55+
.client_info
56+
.as_ref()
57+
.map(|c| c.name.as_str())
58+
.unwrap_or("unknown");
59+
60+
info!(client = %client_name, "Initialize request");
61+
62+
let nats = bridge.nats();
63+
let subject = agent::initialize(bridge.config.acp_prefix());
64+
65+
nats::request_with_timeout::<N, InitializeRequest, InitializeResponse>(
66+
nats,
67+
&subject,
68+
&args,
69+
bridge.config.operation_timeout,
70+
)
71+
.await
72+
.map_err(map_initialize_error)
73+
}
74+
75+
#[cfg(test)]
76+
mod tests {
77+
use super::{Bridge, map_initialize_error};
78+
use crate::config::Config;
79+
use crate::error::AGENT_UNAVAILABLE;
80+
use agent_client_protocol::{
81+
Agent, ErrorCode, Implementation, InitializeRequest, InitializeResponse, ProtocolVersion,
82+
};
83+
use trogon_nats::{AdvancedMockNatsClient, NatsError};
84+
85+
fn mock_bridge() -> (AdvancedMockNatsClient, Bridge<AdvancedMockNatsClient>) {
86+
let mock = AdvancedMockNatsClient::new();
87+
let bridge = Bridge::new(mock.clone(), Config::for_test("acp"));
88+
(mock, bridge)
89+
}
90+
91+
fn set_json_response<T: serde::Serialize>(
92+
mock: &AdvancedMockNatsClient,
93+
subject: &str,
94+
resp: &T,
95+
) {
96+
let bytes = serde_json::to_vec(resp).unwrap();
97+
mock.set_response(subject, bytes.into());
98+
}
99+
100+
struct FailsSerialize;
101+
impl serde::Serialize for FailsSerialize {
102+
fn serialize<S: serde::Serializer>(&self, _s: S) -> Result<S::Ok, S::Error> {
103+
Err(serde::ser::Error::custom("test serialize failure"))
104+
}
105+
}
106+
107+
#[tokio::test]
108+
async fn initialize_forwards_request_and_returns_response() {
109+
let (mock, bridge) = mock_bridge();
110+
let expected = InitializeResponse::new(ProtocolVersion::LATEST);
111+
set_json_response(&mock, "acp.agent.initialize", &expected);
112+
113+
let request = InitializeRequest::new(ProtocolVersion::LATEST);
114+
let result = bridge.initialize(request).await;
115+
116+
assert!(result.is_ok());
117+
let response = result.unwrap();
118+
assert_eq!(response.protocol_version, ProtocolVersion::LATEST);
119+
}
120+
121+
#[tokio::test]
122+
async fn initialize_logs_client_name_when_client_info_provided() {
123+
let (mock, bridge) = mock_bridge();
124+
let expected = InitializeResponse::new(ProtocolVersion::LATEST);
125+
set_json_response(&mock, "acp.agent.initialize", &expected);
126+
127+
let request = InitializeRequest::new(ProtocolVersion::LATEST)
128+
.client_info(Implementation::new("my-client", "1.0.0"));
129+
let result = bridge.initialize(request).await;
130+
131+
assert!(result.is_ok());
132+
}
133+
134+
#[tokio::test]
135+
async fn initialize_returns_error_when_nats_request_fails() {
136+
let (mock, bridge) = mock_bridge();
137+
mock.fail_next_request();
138+
139+
let request = InitializeRequest::new(ProtocolVersion::LATEST);
140+
let err = bridge.initialize(request).await.unwrap_err();
141+
142+
assert!(err.to_string().contains("Agent unavailable"));
143+
assert_eq!(err.code, ErrorCode::Other(AGENT_UNAVAILABLE));
144+
}
145+
146+
#[tokio::test]
147+
async fn initialize_returns_error_when_response_is_invalid_json() {
148+
let (mock, bridge) = mock_bridge();
149+
mock.set_response("acp.agent.initialize", "not json".into());
150+
151+
let request = InitializeRequest::new(ProtocolVersion::LATEST);
152+
let err = bridge.initialize(request).await.unwrap_err();
153+
154+
assert!(err.to_string().contains("Invalid response from agent"));
155+
assert_eq!(err.code, ErrorCode::InternalError);
156+
}
157+
158+
#[test]
159+
fn map_initialize_error_timeout() {
160+
let err = map_initialize_error(NatsError::Timeout {
161+
subject: "acp.agent.initialize".into(),
162+
});
163+
assert!(err.to_string().contains("timed out"));
164+
assert_eq!(err.code, ErrorCode::Other(AGENT_UNAVAILABLE));
165+
}
166+
167+
#[test]
168+
fn map_initialize_error_request() {
169+
let err = map_initialize_error(NatsError::Request {
170+
subject: "acp.agent.initialize".into(),
171+
error: "connection refused".into(),
172+
});
173+
assert!(err.to_string().contains("Agent unavailable"));
174+
assert_eq!(err.code, ErrorCode::Other(AGENT_UNAVAILABLE));
175+
}
176+
177+
#[test]
178+
fn map_initialize_error_serialize() {
179+
let serde_err = serde_json::to_vec(&FailsSerialize).unwrap_err();
180+
let err = map_initialize_error(NatsError::Serialize(serde_err));
181+
assert!(err.to_string().contains("serialize"));
182+
assert_eq!(err.code, ErrorCode::InternalError);
183+
}
184+
185+
#[test]
186+
fn map_initialize_error_deserialize() {
187+
let serde_err = serde_json::from_str::<InitializeResponse>("{}").unwrap_err();
188+
let err = map_initialize_error(NatsError::Deserialize(serde_err));
189+
assert!(err.to_string().contains("Invalid response from agent"));
190+
assert_eq!(err.code, ErrorCode::InternalError);
191+
}
192+
193+
#[test]
194+
fn map_initialize_error_other() {
195+
let err = map_initialize_error(NatsError::Other("misc failure".into()));
196+
assert!(err.to_string().contains("Initialize request failed"));
197+
assert_eq!(err.code, ErrorCode::InternalError);
198+
}
199+
}

0 commit comments

Comments
 (0)