Skip to content

Commit 48ab41d

Browse files
authored
feat(acp-nats): add ext_notification handler (#19)
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent b22ef6c commit 48ab41d

2 files changed

Lines changed: 356 additions & 18 deletions

File tree

Lines changed: 352 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,352 @@
1+
use super::Bridge;
2+
use crate::ext_method_name::ExtMethodName;
3+
use crate::nats::{self, FlushClient, PublishClient, RequestClient, agent};
4+
use agent_client_protocol::{Error, ErrorCode, ExtNotification, Result};
5+
use tracing::{info, instrument, warn};
6+
use trogon_std::time::GetElapsed;
7+
8+
/// Handles extension notification requests (fire-and-forget).
9+
///
10+
/// Publishes to NATS; payload size and other validation are left to NATS.
11+
/// Publish failure is logged and recorded as a metric but does not propagate
12+
/// to the caller, so the client always receives `Ok(())`.
13+
#[instrument(
14+
name = "acp.ext.notification",
15+
skip(bridge, args),
16+
fields(method = %args.method)
17+
)]
18+
pub async fn handle<N: RequestClient + PublishClient + FlushClient, C: GetElapsed>(
19+
bridge: &Bridge<N, C>,
20+
args: ExtNotification,
21+
) -> Result<()> {
22+
let start = bridge.clock.now();
23+
24+
info!(method = %args.method, "Extension notification");
25+
26+
let method_name = ExtMethodName::new(&args.method).map_err(|e| {
27+
bridge.metrics.record_request(
28+
"ext_notification",
29+
bridge.clock.elapsed(start).as_secs_f64(),
30+
false,
31+
);
32+
bridge
33+
.metrics
34+
.record_error("ext_notification", "invalid_method_name");
35+
Error::new(
36+
ErrorCode::InvalidParams.into(),
37+
format!("Invalid method name: {}", e),
38+
)
39+
})?;
40+
41+
let subject = agent::ext(bridge.config.acp_prefix(), method_name.as_str());
42+
43+
let publish_result = nats::publish(
44+
bridge.nats(),
45+
&subject,
46+
&args,
47+
nats::PublishOptions::builder()
48+
.flush_policy(nats::FlushPolicy::no_retries())
49+
.build(),
50+
)
51+
.await;
52+
53+
if let Err(error) = publish_result {
54+
warn!(
55+
method = %args.method,
56+
error = %error,
57+
"Failed to publish ext_notification to backend"
58+
);
59+
bridge
60+
.metrics
61+
.record_error("ext_notification", "ext_notification_publish_failed");
62+
}
63+
64+
bridge.metrics.record_request(
65+
"ext_notification",
66+
bridge.clock.elapsed(start).as_secs_f64(),
67+
true,
68+
);
69+
70+
Ok(())
71+
}
72+
73+
#[cfg(test)]
74+
mod tests {
75+
use super::*;
76+
use crate::config::Config;
77+
use agent_client_protocol::{Agent, ErrorCode, ExtNotification};
78+
use opentelemetry::Value;
79+
use opentelemetry::metrics::MeterProvider;
80+
use opentelemetry_sdk::metrics::data::{AggregatedMetrics, MetricData};
81+
use opentelemetry_sdk::metrics::{
82+
PeriodicReader, SdkMeterProvider, in_memory_exporter::InMemoryMetricExporter,
83+
};
84+
use serde_json::value::RawValue;
85+
use std::time::Duration;
86+
use trogon_nats::AdvancedMockNatsClient;
87+
88+
fn mock_bridge() -> (
89+
AdvancedMockNatsClient,
90+
Bridge<AdvancedMockNatsClient, trogon_std::time::SystemClock>,
91+
) {
92+
let mock = AdvancedMockNatsClient::new();
93+
let bridge = Bridge::new(
94+
mock.clone(),
95+
trogon_std::time::SystemClock,
96+
&opentelemetry::global::meter("acp-nats-test"),
97+
Config::for_test("acp"),
98+
);
99+
(mock, bridge)
100+
}
101+
102+
fn mock_bridge_with_metrics() -> (
103+
AdvancedMockNatsClient,
104+
Bridge<AdvancedMockNatsClient, trogon_std::time::SystemClock>,
105+
InMemoryMetricExporter,
106+
SdkMeterProvider,
107+
) {
108+
let exporter = InMemoryMetricExporter::default();
109+
let reader = PeriodicReader::builder(exporter.clone())
110+
.with_interval(Duration::from_millis(100))
111+
.build();
112+
let provider = SdkMeterProvider::builder().with_reader(reader).build();
113+
let meter = provider.meter("acp-nats-test");
114+
115+
let mock = AdvancedMockNatsClient::new();
116+
let bridge = Bridge::new(
117+
mock.clone(),
118+
trogon_std::time::SystemClock,
119+
&meter,
120+
Config::for_test("acp"),
121+
);
122+
(mock, bridge, exporter, provider)
123+
}
124+
125+
fn has_request_metric(
126+
finished_metrics: &[opentelemetry_sdk::metrics::data::ResourceMetrics],
127+
method: &str,
128+
expected_success: bool,
129+
) -> bool {
130+
finished_metrics
131+
.iter()
132+
.flat_map(|rm| rm.scope_metrics())
133+
.flat_map(|sm| sm.metrics())
134+
.find(|m| m.name() == "acp.request.count")
135+
.and_then(|metric| {
136+
let data = metric.data();
137+
if let AggregatedMetrics::U64(MetricData::Sum(s)) = data {
138+
s.data_points()
139+
.find(|dp| {
140+
let mut method_ok = false;
141+
let mut success_ok = false;
142+
for attr in dp.attributes() {
143+
if attr.key.as_str() == "method" {
144+
method_ok = attr.value.as_str() == method;
145+
} else if attr.key.as_str() == "success" {
146+
success_ok = attr.value == Value::from(expected_success);
147+
}
148+
}
149+
method_ok && success_ok
150+
})
151+
.map(|_| ())
152+
} else {
153+
None
154+
}
155+
})
156+
.is_some()
157+
}
158+
159+
fn has_error_metric(
160+
finished_metrics: &[opentelemetry_sdk::metrics::data::ResourceMetrics],
161+
operation: &str,
162+
reason: &str,
163+
) -> bool {
164+
finished_metrics
165+
.iter()
166+
.flat_map(|rm| rm.scope_metrics())
167+
.flat_map(|sm| sm.metrics())
168+
.find(|m| m.name() == "acp.errors.total")
169+
.and_then(|metric| {
170+
let data = metric.data();
171+
if let AggregatedMetrics::U64(MetricData::Sum(s)) = data {
172+
s.data_points()
173+
.find(|dp| {
174+
let mut operation_ok = false;
175+
let mut reason_ok = false;
176+
for attr in dp.attributes() {
177+
if attr.key.as_str() == "operation" {
178+
operation_ok = attr.value.as_str() == operation;
179+
} else if attr.key.as_str() == "reason" {
180+
reason_ok = attr.value.as_str() == reason;
181+
}
182+
}
183+
operation_ok && reason_ok
184+
})
185+
.map(|_| ())
186+
} else {
187+
None
188+
}
189+
})
190+
.is_some()
191+
}
192+
193+
#[tokio::test]
194+
async fn ext_notification_publishes_to_nats() {
195+
let (mock, bridge) = mock_bridge();
196+
197+
let params = RawValue::from_string(r#"{"event":"ping"}"#.to_string()).unwrap();
198+
let notification = ExtNotification::new("my_notify", params.into());
199+
let result = bridge.ext_notification(notification).await;
200+
201+
assert!(result.is_ok());
202+
203+
let published = mock.published_messages();
204+
assert!(
205+
published.iter().any(|s| s.contains("agent.ext.my_notify")),
206+
"Expected ext notification publish, got: {:?}",
207+
published
208+
);
209+
}
210+
211+
#[tokio::test]
212+
async fn ext_notification_validates_method_name() {
213+
let (_mock, bridge) = mock_bridge();
214+
let params = RawValue::from_string("{}".to_string()).unwrap();
215+
let notification = ExtNotification::new("method.*", params.into());
216+
let err = bridge.ext_notification(notification).await.unwrap_err();
217+
218+
assert!(err.message.contains("Invalid method name"));
219+
assert_eq!(err.code, ErrorCode::InvalidParams);
220+
}
221+
222+
#[tokio::test]
223+
async fn ext_notification_records_request_metric_on_invalid_method_name() {
224+
let (_mock, bridge, exporter, provider) = mock_bridge_with_metrics();
225+
226+
let _ = bridge
227+
.ext_notification(ExtNotification::new(
228+
"method.*",
229+
RawValue::from_string("{}".to_string()).unwrap().into(),
230+
))
231+
.await;
232+
233+
provider.force_flush().unwrap();
234+
let finished_metrics = exporter.get_finished_metrics().unwrap();
235+
assert!(
236+
has_request_metric(&finished_metrics, "ext_notification", false),
237+
"expected acp.request.count with method=ext_notification, success=false on validation failure"
238+
);
239+
assert!(
240+
has_error_metric(&finished_metrics, "ext_notification", "invalid_method_name"),
241+
"expected acp.errors.total with operation=ext_notification, reason=invalid_method_name"
242+
);
243+
provider.shutdown().unwrap();
244+
}
245+
246+
#[tokio::test]
247+
async fn ext_notification_records_metrics_on_success() {
248+
let (_mock, bridge, exporter, provider) = mock_bridge_with_metrics();
249+
250+
let _ = bridge
251+
.ext_notification(ExtNotification::new(
252+
"my_notify",
253+
RawValue::from_string("{}".to_string()).unwrap().into(),
254+
))
255+
.await;
256+
257+
provider.force_flush().unwrap();
258+
let finished_metrics = exporter.get_finished_metrics().unwrap();
259+
assert!(
260+
has_request_metric(&finished_metrics, "ext_notification", true),
261+
"expected acp.request.count with method=ext_notification, success=true"
262+
);
263+
provider.shutdown().unwrap();
264+
}
265+
266+
#[tokio::test]
267+
async fn ext_notification_returns_ok_when_publish_fails() {
268+
let (mock, bridge) = mock_bridge();
269+
mock.fail_publish_count(1);
270+
271+
let params = RawValue::from_string("{}".to_string()).unwrap();
272+
let notification = ExtNotification::new("my_notify", params.into());
273+
let result = bridge.ext_notification(notification).await;
274+
275+
assert!(result.is_ok(), "fire-and-forget: caller always gets Ok(())");
276+
}
277+
278+
#[tokio::test]
279+
async fn ext_notification_records_error_metric_on_publish_failure() {
280+
let (mock, bridge, exporter, provider) = mock_bridge_with_metrics();
281+
mock.fail_publish_count(1);
282+
283+
let _ = bridge
284+
.ext_notification(ExtNotification::new(
285+
"my_notify",
286+
RawValue::from_string("{}".to_string()).unwrap().into(),
287+
))
288+
.await;
289+
290+
provider.force_flush().unwrap();
291+
let finished_metrics = exporter.get_finished_metrics().unwrap();
292+
assert!(
293+
has_error_metric(
294+
&finished_metrics,
295+
"ext_notification",
296+
"ext_notification_publish_failed"
297+
),
298+
"expected acp.errors.total with operation=ext_notification, reason=ext_notification_publish_failed"
299+
);
300+
assert!(
301+
has_request_metric(&finished_metrics, "ext_notification", true),
302+
"publish failure is fire-and-forget; caller still gets Ok, so success=true"
303+
);
304+
provider.shutdown().unwrap();
305+
}
306+
307+
#[test]
308+
fn has_request_metric_returns_false_when_metric_is_histogram() {
309+
let exporter = InMemoryMetricExporter::default();
310+
let reader = PeriodicReader::builder(exporter.clone())
311+
.with_interval(Duration::from_millis(100))
312+
.build();
313+
let provider = SdkMeterProvider::builder().with_reader(reader).build();
314+
let meter = provider.meter("test");
315+
let histogram = meter
316+
.f64_histogram("acp.request.count")
317+
.with_description("test")
318+
.build();
319+
histogram.record(1.0, &[]);
320+
provider.force_flush().unwrap();
321+
let finished_metrics = exporter.get_finished_metrics().unwrap();
322+
assert!(!has_request_metric(
323+
&finished_metrics,
324+
"ext_notification",
325+
true
326+
));
327+
provider.shutdown().unwrap();
328+
}
329+
330+
#[test]
331+
fn has_error_metric_returns_false_when_metric_is_histogram() {
332+
let exporter = InMemoryMetricExporter::default();
333+
let reader = PeriodicReader::builder(exporter.clone())
334+
.with_interval(Duration::from_millis(100))
335+
.build();
336+
let provider = SdkMeterProvider::builder().with_reader(reader).build();
337+
let meter = provider.meter("test");
338+
let histogram = meter
339+
.f64_histogram("acp.errors.total")
340+
.with_description("test")
341+
.build();
342+
histogram.record(1.0, &[]);
343+
provider.force_flush().unwrap();
344+
let finished_metrics = exporter.get_finished_metrics().unwrap();
345+
assert!(!has_error_metric(
346+
&finished_metrics,
347+
"ext_notification",
348+
"ext_notification_publish_failed"
349+
));
350+
provider.shutdown().unwrap();
351+
}
352+
}

0 commit comments

Comments
 (0)