Skip to content

Commit 9100b5c

Browse files
authored
feat(acp-nats): complete ACP 0.10.2 agent method coverage (#48)
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent 64b028f commit 9100b5c

File tree

12 files changed

+2499
-7
lines changed

12 files changed

+2499
-7
lines changed

rsworkspace/crates/acp-nats/Cargo.toml

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,17 @@ edition = "2024"
77
workspace = true
88

99
[dependencies]
10-
agent-client-protocol = { version = "0.10.2", features = ["unstable_session_model", "unstable_session_fork", "unstable_session_resume", "unstable_session_usage"] }
10+
agent-client-protocol = { version = "0.10.2", features = [
11+
"unstable_auth_methods",
12+
"unstable_boolean_config",
13+
"unstable_cancel_request",
14+
"unstable_message_id",
15+
"unstable_session_close",
16+
"unstable_session_fork",
17+
"unstable_session_model",
18+
"unstable_session_resume",
19+
"unstable_session_usage",
20+
] }
1121
opentelemetry = "0.31.0"
1222
async-nats = { version = "0.46.0", default-features = false }
1323
async-trait = "0.1.89"
Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
use super::Bridge;
2+
use crate::error::map_nats_error;
3+
use crate::nats::{self, RequestClient, agent};
4+
use crate::session_id::AcpSessionId;
5+
use agent_client_protocol::{CloseSessionRequest, CloseSessionResponse, Error, ErrorCode, Result};
6+
use tracing::{info, instrument};
7+
use trogon_std::time::GetElapsed;
8+
9+
#[instrument(
10+
name = "acp.session.close",
11+
skip(bridge, args),
12+
fields(session_id = %args.session_id)
13+
)]
14+
pub async fn handle<N: RequestClient, C: GetElapsed>(
15+
bridge: &Bridge<N, C>,
16+
args: CloseSessionRequest,
17+
) -> Result<CloseSessionResponse> {
18+
let start = bridge.clock.now();
19+
20+
info!(session_id = %args.session_id, "Close session request");
21+
22+
let session_id = AcpSessionId::try_from(&args.session_id).map_err(|e| {
23+
bridge
24+
.metrics
25+
.record_error("session_validate", "invalid_session_id");
26+
Error::new(
27+
ErrorCode::InvalidParams.into(),
28+
format!("Invalid session ID: {}", e),
29+
)
30+
})?;
31+
let nats = bridge.nats();
32+
let subject = agent::session_close(bridge.config.acp_prefix(), session_id.as_str());
33+
34+
let result = nats::request_with_timeout::<N, CloseSessionRequest, CloseSessionResponse>(
35+
nats,
36+
&subject,
37+
&args,
38+
bridge.config.operation_timeout,
39+
)
40+
.await
41+
.map_err(map_nats_error);
42+
43+
bridge.metrics.record_request(
44+
"close_session",
45+
bridge.clock.elapsed(start).as_secs_f64(),
46+
result.is_ok(),
47+
);
48+
49+
result
50+
}
51+
52+
#[cfg(test)]
53+
mod tests {
54+
use super::*;
55+
use crate::config::Config;
56+
use crate::error::AGENT_UNAVAILABLE;
57+
use agent_client_protocol::{Agent, CloseSessionRequest, CloseSessionResponse, ErrorCode};
58+
use opentelemetry::Value;
59+
use opentelemetry::metrics::MeterProvider;
60+
use opentelemetry_sdk::metrics::data::{AggregatedMetrics, MetricData};
61+
use opentelemetry_sdk::metrics::{
62+
PeriodicReader, SdkMeterProvider, in_memory_exporter::InMemoryMetricExporter,
63+
};
64+
use std::time::Duration;
65+
use trogon_nats::AdvancedMockNatsClient;
66+
67+
fn mock_bridge() -> (
68+
AdvancedMockNatsClient,
69+
Bridge<AdvancedMockNatsClient, trogon_std::time::SystemClock>,
70+
) {
71+
let mock = AdvancedMockNatsClient::new();
72+
let bridge = Bridge::new(
73+
mock.clone(),
74+
trogon_std::time::SystemClock,
75+
&opentelemetry::global::meter("acp-nats-test"),
76+
Config::for_test("acp"),
77+
tokio::sync::mpsc::channel(1).0,
78+
);
79+
(mock, bridge)
80+
}
81+
82+
fn mock_bridge_with_metrics() -> (
83+
AdvancedMockNatsClient,
84+
Bridge<AdvancedMockNatsClient, trogon_std::time::SystemClock>,
85+
InMemoryMetricExporter,
86+
SdkMeterProvider,
87+
) {
88+
let exporter = InMemoryMetricExporter::default();
89+
let reader = PeriodicReader::builder(exporter.clone())
90+
.with_interval(Duration::from_millis(100))
91+
.build();
92+
let provider = SdkMeterProvider::builder().with_reader(reader).build();
93+
let meter = provider.meter("acp-nats-test");
94+
95+
let mock = AdvancedMockNatsClient::new();
96+
let bridge = Bridge::new(
97+
mock.clone(),
98+
trogon_std::time::SystemClock,
99+
&meter,
100+
Config::for_test("acp"),
101+
tokio::sync::mpsc::channel(1).0,
102+
);
103+
(mock, bridge, exporter, provider)
104+
}
105+
106+
fn set_json_response<T: serde::Serialize>(
107+
mock: &AdvancedMockNatsClient,
108+
subject: &str,
109+
resp: &T,
110+
) {
111+
let bytes = serde_json::to_vec(resp).unwrap();
112+
mock.set_response(subject, bytes.into());
113+
}
114+
115+
fn has_request_metric(
116+
finished_metrics: &[opentelemetry_sdk::metrics::data::ResourceMetrics],
117+
method: &str,
118+
expected_success: bool,
119+
) -> bool {
120+
finished_metrics
121+
.iter()
122+
.flat_map(|rm| rm.scope_metrics())
123+
.flat_map(|sm| sm.metrics())
124+
.find(|m| m.name() == "acp.requests")
125+
.and_then(|metric| {
126+
let data = metric.data();
127+
if let AggregatedMetrics::U64(MetricData::Sum(s)) = data {
128+
s.data_points()
129+
.find(|dp| {
130+
let mut method_ok = false;
131+
let mut success_ok = false;
132+
for attr in dp.attributes() {
133+
if attr.key.as_str() == "method" {
134+
method_ok = attr.value.as_str() == method;
135+
} else if attr.key.as_str() == "success" {
136+
success_ok = attr.value == Value::from(expected_success);
137+
}
138+
}
139+
method_ok && success_ok
140+
})
141+
.map(|_| ())
142+
} else {
143+
None
144+
}
145+
})
146+
.is_some()
147+
}
148+
149+
#[tokio::test]
150+
async fn close_session_forwards_request_and_returns_response() {
151+
let (mock, bridge) = mock_bridge();
152+
let expected = CloseSessionResponse::new();
153+
set_json_response(&mock, "acp.s1.agent.session.close", &expected);
154+
155+
let request = CloseSessionRequest::new("s1");
156+
let result = bridge.close_session(request).await;
157+
assert!(result.is_ok());
158+
}
159+
160+
#[tokio::test]
161+
async fn close_session_returns_error_when_nats_fails() {
162+
let (mock, bridge) = mock_bridge();
163+
mock.fail_next_request();
164+
165+
let request = CloseSessionRequest::new("s1");
166+
let err = bridge.close_session(request).await.unwrap_err();
167+
168+
assert_eq!(err.code, ErrorCode::Other(AGENT_UNAVAILABLE));
169+
}
170+
171+
#[tokio::test]
172+
async fn close_session_returns_error_when_response_is_invalid_json() {
173+
let (mock, bridge) = mock_bridge();
174+
mock.set_response("acp.s1.agent.session.close", "not json".into());
175+
176+
let request = CloseSessionRequest::new("s1");
177+
let err = bridge.close_session(request).await.unwrap_err();
178+
179+
assert_eq!(err.code, ErrorCode::InternalError);
180+
}
181+
182+
#[tokio::test]
183+
async fn close_session_validates_session_id() {
184+
let (_mock, bridge) = mock_bridge();
185+
let request = CloseSessionRequest::new("invalid.session.id");
186+
let err = bridge.close_session(request).await.unwrap_err();
187+
188+
assert!(err.to_string().contains("Invalid session ID"));
189+
assert_eq!(err.code, ErrorCode::InvalidParams);
190+
}
191+
192+
#[tokio::test]
193+
async fn close_session_records_metrics_on_success() {
194+
let (mock, bridge, exporter, provider) = mock_bridge_with_metrics();
195+
set_json_response(
196+
&mock,
197+
"acp.s1.agent.session.close",
198+
&CloseSessionResponse::new(),
199+
);
200+
201+
let _ = bridge.close_session(CloseSessionRequest::new("s1")).await;
202+
203+
provider.force_flush().unwrap();
204+
let finished_metrics = exporter.get_finished_metrics().unwrap();
205+
assert!(
206+
has_request_metric(&finished_metrics, "close_session", true),
207+
"expected acp.requests with method=close_session, success=true"
208+
);
209+
provider.shutdown().unwrap();
210+
}
211+
212+
#[tokio::test]
213+
async fn close_session_records_metrics_on_failure() {
214+
let (mock, bridge, exporter, provider) = mock_bridge_with_metrics();
215+
mock.fail_next_request();
216+
217+
let _ = bridge.close_session(CloseSessionRequest::new("s1")).await;
218+
219+
provider.force_flush().unwrap();
220+
let finished_metrics = exporter.get_finished_metrics().unwrap();
221+
assert!(
222+
has_request_metric(&finished_metrics, "close_session", false),
223+
"expected acp.requests with method=close_session, success=false"
224+
);
225+
provider.shutdown().unwrap();
226+
}
227+
228+
#[test]
229+
fn has_request_metric_returns_false_when_metric_is_histogram() {
230+
let exporter = InMemoryMetricExporter::default();
231+
let reader = PeriodicReader::builder(exporter.clone())
232+
.with_interval(Duration::from_millis(100))
233+
.build();
234+
let provider = SdkMeterProvider::builder().with_reader(reader).build();
235+
let meter = provider.meter("test");
236+
let histogram = meter
237+
.f64_histogram("acp.requests")
238+
.with_description("test")
239+
.build();
240+
histogram.record(1.0, &[]);
241+
provider.force_flush().unwrap();
242+
let finished_metrics = exporter.get_finished_metrics().unwrap();
243+
assert!(!has_request_metric(
244+
&finished_metrics,
245+
"close_session",
246+
true
247+
));
248+
provider.shutdown().unwrap();
249+
}
250+
}

0 commit comments

Comments
 (0)