Skip to content

Commit 97cb312

Browse files
committed
feat(acp-nats): add load_session handler
- Add load_session handler with session validation and session.ready lifecycle - Add validate_session_id to config for session-scoped subject validation - Add session_load subject for {prefix}.{session_id}.agent.session.load - Add Bridge::validate_session for pre-flight session ID checks - Tests: happy path, NATS failure, invalid JSON, map_*_error branches, metrics Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent af437c6 commit 97cb312

4 files changed

Lines changed: 403 additions & 15 deletions

File tree

Lines changed: 350 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,350 @@
1+
use super::Bridge;
2+
use crate::error::AGENT_UNAVAILABLE;
3+
use crate::nats::{
4+
self, ExtSessionReady, FlushClient, FlushPolicy, PublishClient, PublishOptions, RequestClient,
5+
RetryPolicy, agent,
6+
};
7+
use crate::telemetry::metrics::Metrics;
8+
use agent_client_protocol::{
9+
Error, ErrorCode, LoadSessionRequest, LoadSessionResponse, Result, SessionId,
10+
};
11+
use std::time::Duration;
12+
use tracing::{info, instrument, warn};
13+
use trogon_nats::NatsError;
14+
use trogon_std::time::GetElapsed;
15+
16+
const SESSION_READY_DELAY: Duration = Duration::from_millis(100);
17+
18+
fn map_load_session_error(e: NatsError) -> Error {
19+
match &e {
20+
NatsError::Timeout { subject } => {
21+
warn!(subject = %subject, "load_session request timed out");
22+
Error::new(
23+
ErrorCode::Other(AGENT_UNAVAILABLE).into(),
24+
"Load session request timed out; agent may be overloaded or unavailable",
25+
)
26+
}
27+
NatsError::Request { subject, error } => {
28+
warn!(subject = %subject, error = %error, "load_session NATS request failed");
29+
Error::new(
30+
ErrorCode::Other(AGENT_UNAVAILABLE).into(),
31+
format!("Agent unavailable: {}", error),
32+
)
33+
}
34+
NatsError::Serialize(inner) => {
35+
warn!(error = %inner, "failed to serialize load_session request");
36+
Error::new(
37+
ErrorCode::InternalError.into(),
38+
format!("Failed to serialize load_session request: {}", inner),
39+
)
40+
}
41+
NatsError::Deserialize(inner) => {
42+
warn!(error = %inner, "failed to deserialize load_session response");
43+
Error::new(
44+
ErrorCode::InternalError.into(),
45+
"Invalid response from agent",
46+
)
47+
}
48+
_ => {
49+
warn!(error = %e, "load_session NATS request failed");
50+
Error::new(
51+
ErrorCode::InternalError.into(),
52+
"Load session request failed",
53+
)
54+
}
55+
}
56+
}
57+
58+
#[instrument(
59+
name = "acp.session.load",
60+
skip(bridge, args),
61+
fields(session_id = %args.session_id)
62+
)]
63+
pub async fn handle<N: RequestClient + PublishClient + FlushClient, C: GetElapsed>(
64+
bridge: &Bridge<N, C>,
65+
args: LoadSessionRequest,
66+
) -> Result<LoadSessionResponse> {
67+
let start = bridge.clock.now();
68+
69+
info!(session_id = %args.session_id, "Load session request");
70+
71+
bridge.validate_session(&args.session_id)?;
72+
let nats = bridge.nats();
73+
let subject = agent::session_load(bridge.config.acp_prefix(), &args.session_id.to_string());
74+
75+
let result = nats::request_with_timeout::<N, LoadSessionRequest, LoadSessionResponse>(
76+
nats,
77+
&subject,
78+
&args,
79+
bridge.config.operation_timeout,
80+
)
81+
.await
82+
.map_err(map_load_session_error);
83+
84+
if let Ok(ref _response) = result {
85+
let nats = bridge.nats.clone();
86+
let prefix = bridge.config.acp_prefix().to_string();
87+
let session_id = args.session_id.clone();
88+
let metrics = bridge.metrics.clone();
89+
tokio::spawn(async move {
90+
publish_session_ready(&nats, &prefix, &session_id, &metrics).await;
91+
});
92+
}
93+
94+
bridge.metrics.record_request(
95+
"load_session",
96+
bridge.clock.elapsed(start).as_secs_f64(),
97+
result.is_ok(),
98+
);
99+
100+
result
101+
}
102+
103+
async fn publish_session_ready<N: PublishClient + FlushClient>(
104+
nats: &N,
105+
prefix: &str,
106+
session_id: &SessionId,
107+
metrics: &Metrics,
108+
) {
109+
tokio::time::sleep(SESSION_READY_DELAY).await;
110+
111+
let subject = agent::ext_session_ready(prefix, &session_id.to_string());
112+
info!(session_id = %session_id, subject = %subject, "Publishing session.ready");
113+
114+
let message = ExtSessionReady::new(session_id.clone());
115+
116+
let options = PublishOptions::builder()
117+
.publish_retry_policy(RetryPolicy::standard())
118+
.flush_policy(FlushPolicy::standard())
119+
.build();
120+
121+
if let Err(e) = nats::publish(nats, &subject, &message, options).await {
122+
warn!(
123+
error = %e,
124+
session_id = %session_id,
125+
"Failed to publish session.ready"
126+
);
127+
metrics.record_error("session_ready", "session_ready_publish_failed");
128+
} else {
129+
info!(session_id = %session_id, "Published session.ready");
130+
}
131+
}
132+
133+
#[cfg(test)]
134+
mod tests {
135+
use super::{Bridge, map_load_session_error};
136+
use crate::config::Config;
137+
use crate::error::AGENT_UNAVAILABLE;
138+
use agent_client_protocol::{Agent, ErrorCode, LoadSessionRequest, LoadSessionResponse};
139+
use opentelemetry::Value;
140+
use opentelemetry::metrics::MeterProvider;
141+
use opentelemetry_sdk::metrics::data::{AggregatedMetrics, MetricData};
142+
use opentelemetry_sdk::metrics::{
143+
PeriodicReader, SdkMeterProvider, in_memory_exporter::InMemoryMetricExporter,
144+
};
145+
use std::time::Duration;
146+
use trogon_nats::{AdvancedMockNatsClient, NatsError};
147+
148+
fn assert_load_session_metric_recorded(
149+
finished_metrics: &[opentelemetry_sdk::metrics::data::ResourceMetrics],
150+
expected_success: bool,
151+
) {
152+
let found = finished_metrics
153+
.iter()
154+
.flat_map(|rm| rm.scope_metrics())
155+
.any(|sm| {
156+
sm.metrics().any(|metric| {
157+
if metric.name() != "acp.request.count" {
158+
return false;
159+
}
160+
let data = metric.data();
161+
let sum = match data {
162+
AggregatedMetrics::U64(MetricData::Sum(s)) => s,
163+
_ => return false,
164+
};
165+
sum.data_points().any(|dp| {
166+
let mut method_ok = false;
167+
let mut success_ok = false;
168+
for attr in dp.attributes() {
169+
if attr.key.as_str() == "method" {
170+
method_ok = attr.value.as_str() == "load_session";
171+
} else if attr.key.as_str() == "success" {
172+
success_ok = attr.value == Value::from(expected_success);
173+
}
174+
}
175+
method_ok && success_ok
176+
})
177+
})
178+
});
179+
assert!(
180+
found,
181+
"expected acp.request.count datapoint with method=load_session, success={}",
182+
expected_success
183+
);
184+
}
185+
186+
fn mock_bridge_with_metrics() -> (
187+
AdvancedMockNatsClient,
188+
Bridge<AdvancedMockNatsClient, trogon_std::time::SystemClock>,
189+
InMemoryMetricExporter,
190+
SdkMeterProvider,
191+
) {
192+
let exporter = InMemoryMetricExporter::default();
193+
let reader = PeriodicReader::builder(exporter.clone())
194+
.with_interval(Duration::from_millis(100))
195+
.build();
196+
let provider = SdkMeterProvider::builder().with_reader(reader).build();
197+
let meter = provider.meter("acp-nats-test");
198+
199+
let mock = AdvancedMockNatsClient::new();
200+
let bridge = Bridge::new(
201+
mock.clone(),
202+
trogon_std::time::SystemClock,
203+
&meter,
204+
Config::for_test("acp"),
205+
);
206+
(mock, bridge, exporter, provider)
207+
}
208+
209+
fn mock_bridge() -> (
210+
AdvancedMockNatsClient,
211+
Bridge<AdvancedMockNatsClient, trogon_std::time::SystemClock>,
212+
) {
213+
let mock = AdvancedMockNatsClient::new();
214+
let bridge = Bridge::new(
215+
mock.clone(),
216+
trogon_std::time::SystemClock,
217+
&opentelemetry::global::meter("acp-nats-test"),
218+
Config::for_test("acp"),
219+
);
220+
(mock, bridge)
221+
}
222+
223+
fn set_json_response<T: serde::Serialize>(
224+
mock: &AdvancedMockNatsClient,
225+
subject: &str,
226+
resp: &T,
227+
) {
228+
let bytes = serde_json::to_vec(resp).unwrap();
229+
mock.set_response(subject, bytes.into());
230+
}
231+
232+
struct FailsSerialize;
233+
impl serde::Serialize for FailsSerialize {
234+
fn serialize<S: serde::Serializer>(&self, _s: S) -> Result<S::Ok, S::Error> {
235+
Err(serde::ser::Error::custom("test serialize failure"))
236+
}
237+
}
238+
239+
#[tokio::test]
240+
async fn load_session_forwards_request_and_returns_response() {
241+
let (mock, bridge) = mock_bridge();
242+
let expected = LoadSessionResponse::new();
243+
set_json_response(&mock, "acp.s1.agent.session.load", &expected);
244+
245+
let request = LoadSessionRequest::new("s1", ".");
246+
let result = bridge.load_session(request).await;
247+
248+
assert!(result.is_ok());
249+
}
250+
251+
#[tokio::test]
252+
async fn load_session_returns_error_when_nats_request_fails() {
253+
let (mock, bridge) = mock_bridge();
254+
mock.fail_next_request();
255+
256+
let request = LoadSessionRequest::new("s1", ".");
257+
let err = bridge.load_session(request).await.unwrap_err();
258+
259+
assert!(err.to_string().contains("Agent unavailable"));
260+
assert_eq!(err.code, ErrorCode::Other(AGENT_UNAVAILABLE));
261+
}
262+
263+
#[tokio::test]
264+
async fn load_session_returns_error_when_response_is_invalid_json() {
265+
let (mock, bridge) = mock_bridge();
266+
mock.set_response("acp.s1.agent.session.load", "not json".into());
267+
268+
let request = LoadSessionRequest::new("s1", ".");
269+
let err = bridge.load_session(request).await.unwrap_err();
270+
271+
assert!(err.to_string().contains("Invalid response from agent"));
272+
assert_eq!(err.code, ErrorCode::InternalError);
273+
}
274+
275+
#[tokio::test]
276+
async fn load_session_records_metrics_on_success() {
277+
let (mock, bridge, exporter, provider) = mock_bridge_with_metrics();
278+
set_json_response(&mock, "acp.s1.agent.session.load", &LoadSessionResponse::new());
279+
280+
let _ = bridge.load_session(LoadSessionRequest::new("s1", ".")).await;
281+
282+
provider.force_flush().unwrap();
283+
let finished_metrics = exporter.get_finished_metrics().unwrap();
284+
assert_load_session_metric_recorded(&finished_metrics, true);
285+
provider.shutdown().unwrap();
286+
}
287+
288+
#[tokio::test]
289+
async fn load_session_records_metrics_on_failure() {
290+
let (mock, bridge, exporter, provider) = mock_bridge_with_metrics();
291+
mock.fail_next_request();
292+
293+
let _ = bridge.load_session(LoadSessionRequest::new("s1", ".")).await;
294+
295+
provider.force_flush().unwrap();
296+
let finished_metrics = exporter.get_finished_metrics().unwrap();
297+
assert_load_session_metric_recorded(&finished_metrics, false);
298+
provider.shutdown().unwrap();
299+
}
300+
301+
#[test]
302+
fn map_load_session_error_timeout() {
303+
let err = map_load_session_error(NatsError::Timeout {
304+
subject: "acp.s1.agent.session.load".into(),
305+
});
306+
assert!(err.to_string().contains("timed out"));
307+
assert_eq!(err.code, ErrorCode::Other(AGENT_UNAVAILABLE));
308+
}
309+
310+
#[test]
311+
fn map_load_session_error_request() {
312+
let err = map_load_session_error(NatsError::Request {
313+
subject: "acp.s1.agent.session.load".into(),
314+
error: "connection refused".into(),
315+
});
316+
assert!(err.to_string().contains("Agent unavailable"));
317+
assert_eq!(err.code, ErrorCode::Other(AGENT_UNAVAILABLE));
318+
}
319+
320+
#[test]
321+
fn map_load_session_error_serialize() {
322+
let serde_err = serde_json::to_vec(&FailsSerialize).unwrap_err();
323+
let err = map_load_session_error(NatsError::Serialize(serde_err));
324+
assert!(err.to_string().contains("serialize"));
325+
assert_eq!(err.code, ErrorCode::InternalError);
326+
}
327+
328+
#[test]
329+
fn map_load_session_error_deserialize() {
330+
let serde_err = serde_json::from_str::<LoadSessionResponse>("[]").unwrap_err();
331+
let err = map_load_session_error(NatsError::Deserialize(serde_err));
332+
assert!(err.to_string().contains("Invalid response from agent"));
333+
assert_eq!(err.code, ErrorCode::InternalError);
334+
}
335+
336+
#[test]
337+
fn map_load_session_error_other() {
338+
let err = map_load_session_error(NatsError::Other("misc failure".into()));
339+
assert!(err.to_string().contains("Load session request failed"));
340+
assert_eq!(err.code, ErrorCode::InternalError);
341+
}
342+
343+
#[tokio::test]
344+
async fn load_session_validates_session_id() {
345+
let (_mock, bridge) = mock_bridge();
346+
let request = LoadSessionRequest::new("invalid.session.id", ".");
347+
let err = bridge.load_session(request).await.unwrap_err();
348+
assert!(err.to_string().contains("Invalid session ID"));
349+
}
350+
}

0 commit comments

Comments
 (0)