diff --git a/rsworkspace/crates/acp-nats/src/agent/bridge.rs b/rsworkspace/crates/acp-nats/src/agent/bridge.rs index 2ae7baf8b..f38303636 100644 --- a/rsworkspace/crates/acp-nats/src/agent/bridge.rs +++ b/rsworkspace/crates/acp-nats/src/agent/bridge.rs @@ -146,7 +146,7 @@ where &self, subject: &impl crate::nats::markers::SessionCommand, args: &Req, - session_id: &str, + session_id: &crate::session_id::AcpSessionId, ) -> Result where Req: serde::Serialize, diff --git a/rsworkspace/crates/acp-nats/src/agent/close_session.rs b/rsworkspace/crates/acp-nats/src/agent/close_session.rs index 83b3b8c56..5beaeef10 100644 --- a/rsworkspace/crates/acp-nats/src/agent/close_session.rs +++ b/rsworkspace/crates/acp-nats/src/agent/close_session.rs @@ -39,11 +39,7 @@ where let subject = session::agent::CloseSubject::new(prefix, &session_id); let result = bridge - .session_request::( - &subject, - &args, - session_id.as_str(), - ) + .session_request::(&subject, &args, &session_id) .await; bridge.metrics.record_request( diff --git a/rsworkspace/crates/acp-nats/src/agent/fork_session.rs b/rsworkspace/crates/acp-nats/src/agent/fork_session.rs index 42d3683be..9027ca493 100644 --- a/rsworkspace/crates/acp-nats/src/agent/fork_session.rs +++ b/rsworkspace/crates/acp-nats/src/agent/fork_session.rs @@ -39,11 +39,7 @@ where let subject = session::agent::ForkSubject::new(prefix, &session_id); let result = bridge - .session_request::( - &subject, - &args, - session_id.as_str(), - ) + .session_request::(&subject, &args, &session_id) .await; if let Ok(ref response) = result { diff --git a/rsworkspace/crates/acp-nats/src/agent/js_request.rs b/rsworkspace/crates/acp-nats/src/agent/js_request.rs index 93d1f959b..b7b8045d5 100644 --- a/rsworkspace/crates/acp-nats/src/agent/js_request.rs +++ b/rsworkspace/crates/acp-nats/src/agent/js_request.rs @@ -23,7 +23,7 @@ pub async fn js_request( request: &Req, serializer: &S, prefix: &AcpPrefix, - session_id: &str, + session_id: &crate::session_id::AcpSessionId, req_id: &str, operation_timeout: Duration, ) -> agent_client_protocol::Result @@ -61,7 +61,7 @@ where let mut headers = async_nats::HeaderMap::new(); headers.insert(REQ_ID_HEADER, req_id); - headers.insert(SESSION_ID_HEADER, session_id); + headers.insert(SESSION_ID_HEADER, session_id.as_str()); js.publish_with_headers(subject.to_string(), headers, Bytes::from(payload_bytes)) .await @@ -114,11 +114,16 @@ mod tests { use trogon_nats::jetstream::mocks::*; use crate::agent::test_support::MockJs; + use crate::session_id::AcpSessionId; fn test_prefix() -> AcpPrefix { AcpPrefix::new("acp").expect("test prefix") } + fn test_sid(s: &str) -> AcpSessionId { + AcpSessionId::new(s).expect("test session id") + } + fn make_nats_msg(payload: &[u8]) -> async_nats::Message { async_nats::Message { subject: "test".into(), @@ -147,7 +152,7 @@ mod tests { &agent_client_protocol::PromptRequest::new("s1", vec![]), &trogon_std::StdJsonSerialize, &test_prefix(), - "s1", + &test_sid("s1"), "req-1", Duration::from_secs(5), ) @@ -173,7 +178,7 @@ mod tests { &agent_client_protocol::PromptRequest::new("s1", vec![]), &trogon_std::StdJsonSerialize, &test_prefix(), - "s1", + &test_sid("s1"), "req-1", Duration::from_secs(5), ) @@ -193,7 +198,7 @@ mod tests { &agent_client_protocol::PromptRequest::new("s1", vec![]), &trogon_std::StdJsonSerialize, &test_prefix(), - "s1", + &test_sid("s1"), "req-1", Duration::from_secs(5), ) @@ -220,7 +225,7 @@ mod tests { &agent_client_protocol::PromptRequest::new("s1", vec![]), &trogon_std::StdJsonSerialize, &test_prefix(), - "s1", + &test_sid("s1"), "req-1", Duration::from_secs(5), ) @@ -245,7 +250,7 @@ mod tests { &agent_client_protocol::PromptRequest::new("s1", vec![]), &trogon_std::StdJsonSerialize, &test_prefix(), - "s1", + &test_sid("s1"), "req-1", Duration::from_secs(5), ) @@ -267,7 +272,7 @@ mod tests { &agent_client_protocol::PromptRequest::new("s1", vec![]), &trogon_std::StdJsonSerialize, &test_prefix(), - "s1", + &test_sid("s1"), "req-1", Duration::from_millis(10), ) @@ -304,7 +309,7 @@ mod tests { &agent_client_protocol::PromptRequest::new("s1", vec![]), &trogon_std::StdJsonSerialize, &test_prefix(), - "s1", + &test_sid("s1"), "req-1", Duration::from_secs(5), ) @@ -329,7 +334,7 @@ mod tests { &agent_client_protocol::PromptRequest::new("s1", vec![]), &trogon_std::StdJsonSerialize, &test_prefix(), - "s1", + &test_sid("s1"), "req-1", Duration::from_secs(5), ) @@ -361,7 +366,7 @@ mod tests { &agent_client_protocol::PromptRequest::new("s1", vec![]), &trogon_std::StdJsonSerialize, &test_prefix(), - "s1", + &test_sid("s1"), "req-1", Duration::from_secs(5), ) @@ -383,7 +388,7 @@ mod tests { &agent_client_protocol::PromptRequest::new("s1", vec![]), &trogon_std::FailNextSerialize::new(1), &test_prefix(), - "s1", + &test_sid("s1"), "req-1", Duration::from_secs(5), ) diff --git a/rsworkspace/crates/acp-nats/src/agent/load_session.rs b/rsworkspace/crates/acp-nats/src/agent/load_session.rs index e60292831..08e612410 100644 --- a/rsworkspace/crates/acp-nats/src/agent/load_session.rs +++ b/rsworkspace/crates/acp-nats/src/agent/load_session.rs @@ -39,11 +39,7 @@ where let subject = session::agent::LoadSubject::new(prefix, &session_id); let result = bridge - .session_request::( - &subject, - &args, - session_id.as_str(), - ) + .session_request::(&subject, &args, &session_id) .await; if result.is_ok() { diff --git a/rsworkspace/crates/acp-nats/src/agent/prompt.rs b/rsworkspace/crates/acp-nats/src/agent/prompt.rs index 79ccdea1c..7ebb57f10 100644 --- a/rsworkspace/crates/acp-nats/src/agent/prompt.rs +++ b/rsworkspace/crates/acp-nats/src/agent/prompt.rs @@ -86,9 +86,8 @@ where // Create consumers BEFORE publishing — same principle as subscribe-before-publish. // JetStream consumers with DeliverAll replay from stream start, so they'll see the // response even if the runner responds before we start consuming. - let sid = session_id.as_str(); let notifications_stream = streams::notifications_stream_name(prefix); - let notif_config = consumers::prompt_notifications_consumer(prefix, sid, req_id); + let notif_config = consumers::prompt_notifications_consumer(prefix, session_id, req_id); let notif_stream = js.get_stream(¬ifications_stream).await.map_err(|e| { Error::new( ErrorCode::InternalError.into(), @@ -112,7 +111,7 @@ where })?; let responses_stream = streams::responses_stream_name(prefix); - let resp_config = consumers::prompt_response_consumer(prefix, sid, req_id); + let resp_config = consumers::prompt_response_consumer(prefix, session_id, req_id); let resp_stream = js.get_stream(&responses_stream).await.map_err(|e| { Error::new( ErrorCode::InternalError.into(), @@ -154,7 +153,7 @@ where let mut headers = async_nats::HeaderMap::new(); headers.insert(REQ_ID_HEADER, req_id); - headers.insert(SESSION_ID_HEADER, sid); + headers.insert(SESSION_ID_HEADER, session_id.as_str()); let prompt_subject = session::agent::PromptSubject::new(prefix, session_id); js.publish_with_headers(prompt_subject, headers, Bytes::from(payload_bytes)) diff --git a/rsworkspace/crates/acp-nats/src/agent/resume_session.rs b/rsworkspace/crates/acp-nats/src/agent/resume_session.rs index 94d26e5a0..a88225c90 100644 --- a/rsworkspace/crates/acp-nats/src/agent/resume_session.rs +++ b/rsworkspace/crates/acp-nats/src/agent/resume_session.rs @@ -44,7 +44,7 @@ where .session_request::( &subject, &args, - session_id.as_str(), + &session_id, ) .await; diff --git a/rsworkspace/crates/acp-nats/src/agent/set_session_config_option.rs b/rsworkspace/crates/acp-nats/src/agent/set_session_config_option.rs index 5231fe146..2cb8b435d 100644 --- a/rsworkspace/crates/acp-nats/src/agent/set_session_config_option.rs +++ b/rsworkspace/crates/acp-nats/src/agent/set_session_config_option.rs @@ -44,7 +44,7 @@ where .session_request::( &subject, &args, - session_id.as_str(), + &session_id, ) .await; diff --git a/rsworkspace/crates/acp-nats/src/agent/set_session_mode.rs b/rsworkspace/crates/acp-nats/src/agent/set_session_mode.rs index 0585ce7bf..1587dc609 100644 --- a/rsworkspace/crates/acp-nats/src/agent/set_session_mode.rs +++ b/rsworkspace/crates/acp-nats/src/agent/set_session_mode.rs @@ -44,7 +44,7 @@ where .session_request::( &subject, &args, - session_id.as_str(), + &session_id, ) .await; diff --git a/rsworkspace/crates/acp-nats/src/agent/set_session_model.rs b/rsworkspace/crates/acp-nats/src/agent/set_session_model.rs index 8189b1515..f281a7144 100644 --- a/rsworkspace/crates/acp-nats/src/agent/set_session_model.rs +++ b/rsworkspace/crates/acp-nats/src/agent/set_session_model.rs @@ -44,7 +44,7 @@ where .session_request::( &subject, &args, - session_id.as_str(), + &session_id, ) .await; diff --git a/rsworkspace/crates/acp-nats/src/jetstream/consumers.rs b/rsworkspace/crates/acp-nats/src/jetstream/consumers.rs index 5e4a19a77..8364daaeb 100644 --- a/rsworkspace/crates/acp-nats/src/jetstream/consumers.rs +++ b/rsworkspace/crates/acp-nats/src/jetstream/consumers.rs @@ -2,11 +2,17 @@ use async_nats::jetstream::consumer::pull::Config; use async_nats::jetstream::consumer::{AckPolicy, DeliverPolicy, ReplayPolicy}; use crate::acp_prefix::AcpPrefix; +use crate::session_id::AcpSessionId; -pub fn prompt_notifications_consumer(prefix: &AcpPrefix, session_id: &str, req_id: &str) -> Config { +pub fn prompt_notifications_consumer( + prefix: &AcpPrefix, + session_id: &AcpSessionId, + req_id: &str, +) -> Config { let pfx = prefix.as_str(); + let sid = session_id.as_str(); Config { - filter_subject: format!("{pfx}.session.{session_id}.agent.update.{req_id}"), + filter_subject: format!("{pfx}.session.{sid}.agent.update.{req_id}"), deliver_policy: DeliverPolicy::All, ack_policy: AckPolicy::Explicit, replay_policy: ReplayPolicy::Instant, @@ -14,10 +20,15 @@ pub fn prompt_notifications_consumer(prefix: &AcpPrefix, session_id: &str, req_i } } -pub fn prompt_response_consumer(prefix: &AcpPrefix, session_id: &str, req_id: &str) -> Config { +pub fn prompt_response_consumer( + prefix: &AcpPrefix, + session_id: &AcpSessionId, + req_id: &str, +) -> Config { let pfx = prefix.as_str(); + let sid = session_id.as_str(); Config { - filter_subject: format!("{pfx}.session.{session_id}.agent.prompt.response.{req_id}"), + filter_subject: format!("{pfx}.session.{sid}.agent.prompt.response.{req_id}"), deliver_policy: DeliverPolicy::All, ack_policy: AckPolicy::Explicit, replay_policy: ReplayPolicy::Instant, @@ -25,10 +36,11 @@ pub fn prompt_response_consumer(prefix: &AcpPrefix, session_id: &str, req_id: &s } } -pub fn response_consumer(prefix: &AcpPrefix, session_id: &str, req_id: &str) -> Config { +pub fn response_consumer(prefix: &AcpPrefix, session_id: &AcpSessionId, req_id: &str) -> Config { let pfx = prefix.as_str(); + let sid = session_id.as_str(); Config { - filter_subject: format!("{pfx}.session.{session_id}.agent.response.{req_id}"), + filter_subject: format!("{pfx}.session.{sid}.agent.response.{req_id}"), deliver_policy: DeliverPolicy::All, ack_policy: AckPolicy::Explicit, replay_policy: ReplayPolicy::Instant, @@ -57,9 +69,13 @@ mod tests { AcpPrefix::new(s).expect("test prefix") } + fn sid(s: &str) -> AcpSessionId { + AcpSessionId::new(s).expect("test session id") + } + #[test] fn prompt_notifications_consumer_filter() { - let config = prompt_notifications_consumer(&p("acp"), "sess-1", "req-abc"); + let config = prompt_notifications_consumer(&p("acp"), &sid("sess-1"), "req-abc"); assert_eq!( config.filter_subject, "acp.session.sess-1.agent.update.req-abc" @@ -68,7 +84,7 @@ mod tests { #[test] fn prompt_notifications_consumer_delivers_all() { - let config = prompt_notifications_consumer(&p("acp"), "s1", "r1"); + let config = prompt_notifications_consumer(&p("acp"), &sid("s1"), "r1"); assert_eq!(config.deliver_policy, DeliverPolicy::All); assert_eq!(config.ack_policy, AckPolicy::Explicit); assert_eq!(config.replay_policy, ReplayPolicy::Instant); @@ -76,7 +92,7 @@ mod tests { #[test] fn prompt_response_consumer_filter() { - let config = prompt_response_consumer(&p("acp"), "sess-1", "req-abc"); + let config = prompt_response_consumer(&p("acp"), &sid("sess-1"), "req-abc"); assert_eq!( config.filter_subject, "acp.session.sess-1.agent.prompt.response.req-abc" @@ -98,7 +114,7 @@ mod tests { #[test] fn response_consumer_filter() { - let config = response_consumer(&p("acp"), "sess-1", "req-abc"); + let config = response_consumer(&p("acp"), &sid("sess-1"), "req-abc"); assert_eq!( config.filter_subject, "acp.session.sess-1.agent.response.req-abc" @@ -107,7 +123,7 @@ mod tests { #[test] fn response_consumer_delivers_all() { - let config = response_consumer(&p("acp"), "s1", "r1"); + let config = response_consumer(&p("acp"), &sid("s1"), "r1"); assert_eq!(config.deliver_policy, DeliverPolicy::All); assert_eq!(config.ack_policy, AckPolicy::Explicit); assert_eq!(config.replay_policy, ReplayPolicy::Instant); @@ -115,13 +131,13 @@ mod tests { #[test] fn response_consumer_custom_prefix() { - let config = response_consumer(&p("myapp"), "s1", "r1"); + let config = response_consumer(&p("myapp"), &sid("s1"), "r1"); assert_eq!(config.filter_subject, "myapp.session.s1.agent.response.r1"); } #[test] fn custom_prefix_in_consumers() { - let config = prompt_response_consumer(&p("myapp"), "s1", "r1"); + let config = prompt_response_consumer(&p("myapp"), &sid("s1"), "r1"); assert_eq!( config.filter_subject, "myapp.session.s1.agent.prompt.response.r1"