Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rsworkspace/crates/acp-nats/src/agent/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ where
&self,
subject: &impl crate::nats::markers::SessionCommand,
args: &Req,
session_id: &str,
session_id: &crate::session_id::AcpSessionId,
) -> Result<Res>
where
Req: serde::Serialize,
Expand Down
6 changes: 1 addition & 5 deletions rsworkspace/crates/acp-nats/src/agent/close_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,7 @@ where
let subject = session::agent::CloseSubject::new(prefix, &session_id);

let result = bridge
.session_request::<CloseSessionRequest, CloseSessionResponse>(
&subject,
&args,
session_id.as_str(),
)
.session_request::<CloseSessionRequest, CloseSessionResponse>(&subject, &args, &session_id)
.await;

bridge.metrics.record_request(
Expand Down
6 changes: 1 addition & 5 deletions rsworkspace/crates/acp-nats/src/agent/fork_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,7 @@ where
let subject = session::agent::ForkSubject::new(prefix, &session_id);

let result = bridge
.session_request::<ForkSessionRequest, ForkSessionResponse>(
&subject,
&args,
session_id.as_str(),
)
.session_request::<ForkSessionRequest, ForkSessionResponse>(&subject, &args, &session_id)
.await;

if let Ok(ref response) = result {
Expand Down
29 changes: 17 additions & 12 deletions rsworkspace/crates/acp-nats/src/agent/js_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub async fn js_request<J, Req, Res, S>(
request: &Req,
serializer: &S,
prefix: &AcpPrefix,
session_id: &str,
session_id: &crate::session_id::AcpSessionId,
req_id: &str,
operation_timeout: Duration,
) -> agent_client_protocol::Result<Res>
Expand Down Expand Up @@ -61,7 +61,7 @@ where

let mut headers = async_nats::HeaderMap::new();
headers.insert(REQ_ID_HEADER, req_id);
headers.insert(SESSION_ID_HEADER, session_id);
headers.insert(SESSION_ID_HEADER, session_id.as_str());

js.publish_with_headers(subject.to_string(), headers, Bytes::from(payload_bytes))
.await
Expand Down Expand Up @@ -114,11 +114,16 @@ mod tests {
use trogon_nats::jetstream::mocks::*;

use crate::agent::test_support::MockJs;
use crate::session_id::AcpSessionId;

fn test_prefix() -> AcpPrefix {
AcpPrefix::new("acp").expect("test prefix")
}

fn test_sid(s: &str) -> AcpSessionId {
AcpSessionId::new(s).expect("test session id")
}

fn make_nats_msg(payload: &[u8]) -> async_nats::Message {
async_nats::Message {
subject: "test".into(),
Expand Down Expand Up @@ -147,7 +152,7 @@ mod tests {
&agent_client_protocol::PromptRequest::new("s1", vec![]),
&trogon_std::StdJsonSerialize,
&test_prefix(),
"s1",
&test_sid("s1"),
"req-1",
Duration::from_secs(5),
)
Expand All @@ -173,7 +178,7 @@ mod tests {
&agent_client_protocol::PromptRequest::new("s1", vec![]),
&trogon_std::StdJsonSerialize,
&test_prefix(),
"s1",
&test_sid("s1"),
"req-1",
Duration::from_secs(5),
)
Expand All @@ -193,7 +198,7 @@ mod tests {
&agent_client_protocol::PromptRequest::new("s1", vec![]),
&trogon_std::StdJsonSerialize,
&test_prefix(),
"s1",
&test_sid("s1"),
"req-1",
Duration::from_secs(5),
)
Expand All @@ -220,7 +225,7 @@ mod tests {
&agent_client_protocol::PromptRequest::new("s1", vec![]),
&trogon_std::StdJsonSerialize,
&test_prefix(),
"s1",
&test_sid("s1"),
"req-1",
Duration::from_secs(5),
)
Expand All @@ -245,7 +250,7 @@ mod tests {
&agent_client_protocol::PromptRequest::new("s1", vec![]),
&trogon_std::StdJsonSerialize,
&test_prefix(),
"s1",
&test_sid("s1"),
"req-1",
Duration::from_secs(5),
)
Expand All @@ -267,7 +272,7 @@ mod tests {
&agent_client_protocol::PromptRequest::new("s1", vec![]),
&trogon_std::StdJsonSerialize,
&test_prefix(),
"s1",
&test_sid("s1"),
"req-1",
Duration::from_millis(10),
)
Expand Down Expand Up @@ -304,7 +309,7 @@ mod tests {
&agent_client_protocol::PromptRequest::new("s1", vec![]),
&trogon_std::StdJsonSerialize,
&test_prefix(),
"s1",
&test_sid("s1"),
"req-1",
Duration::from_secs(5),
)
Expand All @@ -329,7 +334,7 @@ mod tests {
&agent_client_protocol::PromptRequest::new("s1", vec![]),
&trogon_std::StdJsonSerialize,
&test_prefix(),
"s1",
&test_sid("s1"),
"req-1",
Duration::from_secs(5),
)
Expand Down Expand Up @@ -361,7 +366,7 @@ mod tests {
&agent_client_protocol::PromptRequest::new("s1", vec![]),
&trogon_std::StdJsonSerialize,
&test_prefix(),
"s1",
&test_sid("s1"),
"req-1",
Duration::from_secs(5),
)
Expand All @@ -383,7 +388,7 @@ mod tests {
&agent_client_protocol::PromptRequest::new("s1", vec![]),
&trogon_std::FailNextSerialize::new(1),
&test_prefix(),
"s1",
&test_sid("s1"),
"req-1",
Duration::from_secs(5),
)
Expand Down
6 changes: 1 addition & 5 deletions rsworkspace/crates/acp-nats/src/agent/load_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,7 @@ where
let subject = session::agent::LoadSubject::new(prefix, &session_id);

let result = bridge
.session_request::<LoadSessionRequest, LoadSessionResponse>(
&subject,
&args,
session_id.as_str(),
)
.session_request::<LoadSessionRequest, LoadSessionResponse>(&subject, &args, &session_id)
.await;

if result.is_ok() {
Expand Down
7 changes: 3 additions & 4 deletions rsworkspace/crates/acp-nats/src/agent/prompt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,8 @@ where
// Create consumers BEFORE publishing — same principle as subscribe-before-publish.
// JetStream consumers with DeliverAll replay from stream start, so they'll see the
// response even if the runner responds before we start consuming.
let sid = session_id.as_str();
let notifications_stream = streams::notifications_stream_name(prefix);
let notif_config = consumers::prompt_notifications_consumer(prefix, sid, req_id);
let notif_config = consumers::prompt_notifications_consumer(prefix, session_id, req_id);
let notif_stream = js.get_stream(&notifications_stream).await.map_err(|e| {
Error::new(
ErrorCode::InternalError.into(),
Expand All @@ -112,7 +111,7 @@ where
})?;

let responses_stream = streams::responses_stream_name(prefix);
let resp_config = consumers::prompt_response_consumer(prefix, sid, req_id);
let resp_config = consumers::prompt_response_consumer(prefix, session_id, req_id);
let resp_stream = js.get_stream(&responses_stream).await.map_err(|e| {
Error::new(
ErrorCode::InternalError.into(),
Expand Down Expand Up @@ -154,7 +153,7 @@ where

let mut headers = async_nats::HeaderMap::new();
headers.insert(REQ_ID_HEADER, req_id);
headers.insert(SESSION_ID_HEADER, sid);
headers.insert(SESSION_ID_HEADER, session_id.as_str());

let prompt_subject = session::agent::PromptSubject::new(prefix, session_id);
js.publish_with_headers(prompt_subject, headers, Bytes::from(payload_bytes))
Expand Down
2 changes: 1 addition & 1 deletion rsworkspace/crates/acp-nats/src/agent/resume_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ where
.session_request::<ResumeSessionRequest, ResumeSessionResponse>(
&subject,
&args,
session_id.as_str(),
&session_id,
)
.await;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ where
.session_request::<SetSessionConfigOptionRequest, SetSessionConfigOptionResponse>(
&subject,
&args,
session_id.as_str(),
&session_id,
)
.await;

Expand Down
2 changes: 1 addition & 1 deletion rsworkspace/crates/acp-nats/src/agent/set_session_mode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ where
.session_request::<SetSessionModeRequest, SetSessionModeResponse>(
&subject,
&args,
session_id.as_str(),
&session_id,
)
.await;

Expand Down
2 changes: 1 addition & 1 deletion rsworkspace/crates/acp-nats/src/agent/set_session_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ where
.session_request::<SetSessionModelRequest, SetSessionModelResponse>(
&subject,
&args,
session_id.as_str(),
&session_id,
)
.await;

Expand Down
42 changes: 29 additions & 13 deletions rsworkspace/crates/acp-nats/src/jetstream/consumers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,45 @@ use async_nats::jetstream::consumer::pull::Config;
use async_nats::jetstream::consumer::{AckPolicy, DeliverPolicy, ReplayPolicy};

use crate::acp_prefix::AcpPrefix;
use crate::session_id::AcpSessionId;

pub fn prompt_notifications_consumer(prefix: &AcpPrefix, session_id: &str, req_id: &str) -> Config {
pub fn prompt_notifications_consumer(
prefix: &AcpPrefix,
session_id: &AcpSessionId,
req_id: &str,
) -> Config {
let pfx = prefix.as_str();
let sid = session_id.as_str();
Config {
filter_subject: format!("{pfx}.session.{session_id}.agent.update.{req_id}"),
filter_subject: format!("{pfx}.session.{sid}.agent.update.{req_id}"),
deliver_policy: DeliverPolicy::All,
ack_policy: AckPolicy::Explicit,
replay_policy: ReplayPolicy::Instant,
..Default::default()
}
}

pub fn prompt_response_consumer(prefix: &AcpPrefix, session_id: &str, req_id: &str) -> Config {
pub fn prompt_response_consumer(
prefix: &AcpPrefix,
session_id: &AcpSessionId,
req_id: &str,
) -> Config {
let pfx = prefix.as_str();
let sid = session_id.as_str();
Config {
filter_subject: format!("{pfx}.session.{session_id}.agent.prompt.response.{req_id}"),
filter_subject: format!("{pfx}.session.{sid}.agent.prompt.response.{req_id}"),
deliver_policy: DeliverPolicy::All,
ack_policy: AckPolicy::Explicit,
replay_policy: ReplayPolicy::Instant,
..Default::default()
}
}

pub fn response_consumer(prefix: &AcpPrefix, session_id: &str, req_id: &str) -> Config {
pub fn response_consumer(prefix: &AcpPrefix, session_id: &AcpSessionId, req_id: &str) -> Config {
let pfx = prefix.as_str();
let sid = session_id.as_str();
Config {
filter_subject: format!("{pfx}.session.{session_id}.agent.response.{req_id}"),
filter_subject: format!("{pfx}.session.{sid}.agent.response.{req_id}"),
deliver_policy: DeliverPolicy::All,
ack_policy: AckPolicy::Explicit,
replay_policy: ReplayPolicy::Instant,
Expand Down Expand Up @@ -57,9 +69,13 @@ mod tests {
AcpPrefix::new(s).expect("test prefix")
}

fn sid(s: &str) -> AcpSessionId {
AcpSessionId::new(s).expect("test session id")
}

#[test]
fn prompt_notifications_consumer_filter() {
let config = prompt_notifications_consumer(&p("acp"), "sess-1", "req-abc");
let config = prompt_notifications_consumer(&p("acp"), &sid("sess-1"), "req-abc");
assert_eq!(
config.filter_subject,
"acp.session.sess-1.agent.update.req-abc"
Expand All @@ -68,15 +84,15 @@ mod tests {

#[test]
fn prompt_notifications_consumer_delivers_all() {
let config = prompt_notifications_consumer(&p("acp"), "s1", "r1");
let config = prompt_notifications_consumer(&p("acp"), &sid("s1"), "r1");
assert_eq!(config.deliver_policy, DeliverPolicy::All);
assert_eq!(config.ack_policy, AckPolicy::Explicit);
assert_eq!(config.replay_policy, ReplayPolicy::Instant);
}

#[test]
fn prompt_response_consumer_filter() {
let config = prompt_response_consumer(&p("acp"), "sess-1", "req-abc");
let config = prompt_response_consumer(&p("acp"), &sid("sess-1"), "req-abc");
assert_eq!(
config.filter_subject,
"acp.session.sess-1.agent.prompt.response.req-abc"
Expand All @@ -98,7 +114,7 @@ mod tests {

#[test]
fn response_consumer_filter() {
let config = response_consumer(&p("acp"), "sess-1", "req-abc");
let config = response_consumer(&p("acp"), &sid("sess-1"), "req-abc");
assert_eq!(
config.filter_subject,
"acp.session.sess-1.agent.response.req-abc"
Expand All @@ -107,21 +123,21 @@ mod tests {

#[test]
fn response_consumer_delivers_all() {
let config = response_consumer(&p("acp"), "s1", "r1");
let config = response_consumer(&p("acp"), &sid("s1"), "r1");
assert_eq!(config.deliver_policy, DeliverPolicy::All);
assert_eq!(config.ack_policy, AckPolicy::Explicit);
assert_eq!(config.replay_policy, ReplayPolicy::Instant);
}

#[test]
fn response_consumer_custom_prefix() {
let config = response_consumer(&p("myapp"), "s1", "r1");
let config = response_consumer(&p("myapp"), &sid("s1"), "r1");
assert_eq!(config.filter_subject, "myapp.session.s1.agent.response.r1");
}

#[test]
fn custom_prefix_in_consumers() {
let config = prompt_response_consumer(&p("myapp"), "s1", "r1");
let config = prompt_response_consumer(&p("myapp"), &sid("s1"), "r1");
assert_eq!(
config.filter_subject,
"myapp.session.s1.agent.prompt.response.r1"
Expand Down
Loading