Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rsworkspace/crates/acp-nats-agent/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ where
<<J::Stream as trogon_nats::jetstream::JetStreamCreateConsumer>::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsDispatchMessage,
A: Agent + 'static,
{
let stream_name = acp_nats::jetstream::streams::commands_stream_name(prefix.as_str());
let stream_name = acp_nats::jetstream::streams::commands_stream_name(prefix);
let config = acp_nats::jetstream::consumers::commands_observer();

info!(stream = %stream_name, "Starting JetStream consumer for COMMANDS stream");
Expand Down
2 changes: 1 addition & 1 deletion rsworkspace/crates/acp-nats/src/acp_prefix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
//! malformed dots (consecutive, leading, trailing). Max 128 bytes. Validity is guaranteed at
//! construction.

use trogon_nats::SubjectTokenViolation;
use trogon_nats::DottedNatsToken;
use trogon_nats::SubjectTokenViolation;

/// Error returned when [`AcpPrefix`] validation fails.
#[derive(Debug, Clone, PartialEq)]
Expand Down
2 changes: 1 addition & 1 deletion rsworkspace/crates/acp-nats/src/agent/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ where
&subject_str,
args,
&trogon_std::StdJsonSerialize,
self.config.acp_prefix(),
self.config.acp_prefix_ref(),
session_id,
&req_id,
self.config.operation_timeout,
Expand Down
27 changes: 16 additions & 11 deletions rsworkspace/crates/acp-nats/src/agent/js_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use trogon_nats::jetstream::{
};
use trogon_std::JsonSerialize;

use crate::acp_prefix::AcpPrefix;
use crate::constants::SESSION_ID_HEADER;
use crate::jetstream::{consumers, streams};

Expand All @@ -21,7 +22,7 @@ pub async fn js_request<J, Req, Res, S>(
subject: &str,
request: &Req,
serializer: &S,
prefix: &str,
prefix: &AcpPrefix,
session_id: &str,
req_id: &str,
operation_timeout: Duration,
Expand Down Expand Up @@ -114,6 +115,10 @@ mod tests {

use crate::agent::test_support::MockJs;

fn test_prefix() -> AcpPrefix {
AcpPrefix::new("acp").expect("test prefix")
}

fn make_nats_msg(payload: &[u8]) -> async_nats::Message {
async_nats::Message {
subject: "test".into(),
Expand Down Expand Up @@ -141,7 +146,7 @@ mod tests {
"acp.session.s1.agent.prompt",
&agent_client_protocol::PromptRequest::new("s1", vec![]),
&trogon_std::StdJsonSerialize,
"acp",
&test_prefix(),
"s1",
"req-1",
Duration::from_secs(5),
Expand All @@ -167,7 +172,7 @@ mod tests {
"acp.session.s1.agent.prompt",
&agent_client_protocol::PromptRequest::new("s1", vec![]),
&trogon_std::StdJsonSerialize,
"acp",
&test_prefix(),
"s1",
"req-1",
Duration::from_secs(5),
Expand All @@ -187,7 +192,7 @@ mod tests {
"acp.session.s1.agent.prompt",
&agent_client_protocol::PromptRequest::new("s1", vec![]),
&trogon_std::StdJsonSerialize,
"acp",
&test_prefix(),
"s1",
"req-1",
Duration::from_secs(5),
Expand All @@ -214,7 +219,7 @@ mod tests {
"acp.session.s1.agent.prompt",
&agent_client_protocol::PromptRequest::new("s1", vec![]),
&trogon_std::StdJsonSerialize,
"acp",
&test_prefix(),
"s1",
"req-1",
Duration::from_secs(5),
Expand All @@ -239,7 +244,7 @@ mod tests {
"acp.session.s1.agent.prompt",
&agent_client_protocol::PromptRequest::new("s1", vec![]),
&trogon_std::StdJsonSerialize,
"acp",
&test_prefix(),
"s1",
"req-1",
Duration::from_secs(5),
Expand All @@ -261,7 +266,7 @@ mod tests {
"acp.session.s1.agent.prompt",
&agent_client_protocol::PromptRequest::new("s1", vec![]),
&trogon_std::StdJsonSerialize,
"acp",
&test_prefix(),
"s1",
"req-1",
Duration::from_millis(10),
Expand Down Expand Up @@ -298,7 +303,7 @@ mod tests {
"acp.session.s1.agent.prompt",
&agent_client_protocol::PromptRequest::new("s1", vec![]),
&trogon_std::StdJsonSerialize,
"acp",
&test_prefix(),
"s1",
"req-1",
Duration::from_secs(5),
Expand All @@ -323,7 +328,7 @@ mod tests {
"acp.session.s1.agent.prompt",
&agent_client_protocol::PromptRequest::new("s1", vec![]),
&trogon_std::StdJsonSerialize,
"acp",
&test_prefix(),
"s1",
"req-1",
Duration::from_secs(5),
Expand Down Expand Up @@ -355,7 +360,7 @@ mod tests {
"acp.session.s1.agent.prompt",
&agent_client_protocol::PromptRequest::new("s1", vec![]),
&trogon_std::StdJsonSerialize,
"acp",
&test_prefix(),
"s1",
"req-1",
Duration::from_secs(5),
Expand All @@ -377,7 +382,7 @@ mod tests {
"acp.session.s1.agent.prompt",
&agent_client_protocol::PromptRequest::new("s1", vec![]),
&trogon_std::FailNextSerialize::new(1),
"acp",
&test_prefix(),
"s1",
"req-1",
Duration::from_secs(5),
Expand Down
17 changes: 8 additions & 9 deletions rsworkspace/crates/acp-nats/src/agent/prompt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,9 @@ where
continue;
}
};
if bridge.notification_sender.send(notification).await.is_err() {
let _ = bridge.notification_sender.send(notification).await.inspect_err(|_| {
warn!("notification receiver dropped; continuing prompt");
}
});
}
resp = timeout(op_timeout, response_sub.next()) => {
match resp {
Expand Down Expand Up @@ -206,9 +206,8 @@ where
// JetStream consumers with DeliverAll replay from stream start, so they'll see the
// response even if the runner responds before we start consuming.
let sid = session_id.as_str();
let pfx = prefix.as_str();
let notifications_stream = streams::notifications_stream_name(pfx);
let notif_config = consumers::prompt_notifications_consumer(pfx, sid, req_id);
let notifications_stream = streams::notifications_stream_name(prefix);
let notif_config = consumers::prompt_notifications_consumer(prefix, sid, req_id);
let notif_stream = js.get_stream(&notifications_stream).await.map_err(|e| {
Error::new(
ErrorCode::InternalError.into(),
Expand All @@ -231,8 +230,8 @@ where
)
})?;

let responses_stream = streams::responses_stream_name(pfx);
let resp_config = consumers::prompt_response_consumer(pfx, sid, req_id);
let responses_stream = streams::responses_stream_name(prefix);
let resp_config = consumers::prompt_response_consumer(prefix, sid, req_id);
let resp_stream = js.get_stream(&responses_stream).await.map_err(|e| {
Error::new(
ErrorCode::InternalError.into(),
Expand Down Expand Up @@ -313,9 +312,9 @@ where
}
};
let _ = js_msg.ack().await;
if bridge.notification_sender.send(notification).await.is_err() {
let _ = bridge.notification_sender.send(notification).await.inspect_err(|_| {
warn!("notification receiver dropped; continuing prompt");
}
});
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion rsworkspace/crates/acp-nats/src/ext_method_name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
//! rejects `*`, `>`, whitespace; allows dotted namespaces (e.g. `vendor.operation`) but rejects
//! malformed dots (consecutive, leading, trailing). Validity is guaranteed at construction.

use trogon_nats::SubjectTokenViolation;
use trogon_nats::DottedNatsToken;
use trogon_nats::SubjectTokenViolation;

/// Error returned when [`ExtMethodName`] validation fails.
#[derive(Debug, Clone, PartialEq)]
Expand Down
35 changes: 22 additions & 13 deletions rsworkspace/crates/acp-nats/src/jetstream/consumers.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,34 @@
use async_nats::jetstream::consumer::pull::Config;
use async_nats::jetstream::consumer::{AckPolicy, DeliverPolicy, ReplayPolicy};

pub fn prompt_notifications_consumer(prefix: &str, session_id: &str, req_id: &str) -> Config {
use crate::acp_prefix::AcpPrefix;

pub fn prompt_notifications_consumer(prefix: &AcpPrefix, session_id: &str, req_id: &str) -> Config {
let pfx = prefix.as_str();
Config {
filter_subject: format!("{prefix}.session.{session_id}.agent.update.{req_id}"),
filter_subject: format!("{pfx}.session.{session_id}.agent.update.{req_id}"),
deliver_policy: DeliverPolicy::All,
ack_policy: AckPolicy::Explicit,
replay_policy: ReplayPolicy::Instant,
..Default::default()
}
}

pub fn prompt_response_consumer(prefix: &str, session_id: &str, req_id: &str) -> Config {
pub fn prompt_response_consumer(prefix: &AcpPrefix, session_id: &str, req_id: &str) -> Config {
let pfx = prefix.as_str();
Config {
filter_subject: format!("{prefix}.session.{session_id}.agent.prompt.response.{req_id}"),
filter_subject: format!("{pfx}.session.{session_id}.agent.prompt.response.{req_id}"),
deliver_policy: DeliverPolicy::All,
ack_policy: AckPolicy::Explicit,
replay_policy: ReplayPolicy::Instant,
..Default::default()
}
}

pub fn response_consumer(prefix: &str, session_id: &str, req_id: &str) -> Config {
pub fn response_consumer(prefix: &AcpPrefix, session_id: &str, req_id: &str) -> Config {
let pfx = prefix.as_str();
Config {
filter_subject: format!("{prefix}.session.{session_id}.agent.response.{req_id}"),
filter_subject: format!("{pfx}.session.{session_id}.agent.response.{req_id}"),
deliver_policy: DeliverPolicy::All,
ack_policy: AckPolicy::Explicit,
replay_policy: ReplayPolicy::Instant,
Expand All @@ -48,9 +53,13 @@ pub fn commands_observer() -> Config {
mod tests {
use super::*;

fn p(s: &str) -> AcpPrefix {
AcpPrefix::new(s).expect("test prefix")
}

#[test]
fn prompt_notifications_consumer_filter() {
let config = prompt_notifications_consumer("acp", "sess-1", "req-abc");
let config = prompt_notifications_consumer(&p("acp"), "sess-1", "req-abc");
assert_eq!(
config.filter_subject,
"acp.session.sess-1.agent.update.req-abc"
Expand All @@ -59,15 +68,15 @@ mod tests {

#[test]
fn prompt_notifications_consumer_delivers_all() {
let config = prompt_notifications_consumer("acp", "s1", "r1");
let config = prompt_notifications_consumer(&p("acp"), "s1", "r1");
assert_eq!(config.deliver_policy, DeliverPolicy::All);
assert_eq!(config.ack_policy, AckPolicy::Explicit);
assert_eq!(config.replay_policy, ReplayPolicy::Instant);
}

#[test]
fn prompt_response_consumer_filter() {
let config = prompt_response_consumer("acp", "sess-1", "req-abc");
let config = prompt_response_consumer(&p("acp"), "sess-1", "req-abc");
assert_eq!(
config.filter_subject,
"acp.session.sess-1.agent.prompt.response.req-abc"
Expand All @@ -89,7 +98,7 @@ mod tests {

#[test]
fn response_consumer_filter() {
let config = response_consumer("acp", "sess-1", "req-abc");
let config = response_consumer(&p("acp"), "sess-1", "req-abc");
assert_eq!(
config.filter_subject,
"acp.session.sess-1.agent.response.req-abc"
Expand All @@ -98,21 +107,21 @@ mod tests {

#[test]
fn response_consumer_delivers_all() {
let config = response_consumer("acp", "s1", "r1");
let config = response_consumer(&p("acp"), "s1", "r1");
assert_eq!(config.deliver_policy, DeliverPolicy::All);
assert_eq!(config.ack_policy, AckPolicy::Explicit);
assert_eq!(config.replay_policy, ReplayPolicy::Instant);
}

#[test]
fn response_consumer_custom_prefix() {
let config = response_consumer("myapp", "s1", "r1");
let config = response_consumer(&p("myapp"), "s1", "r1");
assert_eq!(config.filter_subject, "myapp.session.s1.agent.response.r1");
}

#[test]
fn custom_prefix_in_consumers() {
let config = prompt_response_consumer("myapp", "s1", "r1");
let config = prompt_response_consumer(&p("myapp"), "s1", "r1");
assert_eq!(
config.filter_subject,
"myapp.session.s1.agent.prompt.response.r1"
Expand Down
19 changes: 12 additions & 7 deletions rsworkspace/crates/acp-nats/src/jetstream/provision.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ impl std::error::Error for ProvisionError {}

pub async fn provision_streams<J: JetStreamContext>(
js: &J,
prefix: &str,
prefix: &crate::acp_prefix::AcpPrefix,
) -> Result<(), ProvisionError> {
for config in streams::all_configs(prefix) {
let name = config.name.clone();
Expand All @@ -31,19 +31,24 @@ pub async fn provision_streams<J: JetStreamContext>(
#[cfg(test)]
mod tests {
use super::*;
use crate::acp_prefix::AcpPrefix;
use trogon_nats::jetstream::MockJetStreamContext;

fn p(s: &str) -> AcpPrefix {
AcpPrefix::new(s).expect("test prefix")
}

#[tokio::test]
async fn provision_creates_six_streams() {
let ctx = MockJetStreamContext::new();
provision_streams(&ctx, "acp").await.unwrap();
provision_streams(&ctx, &p("acp")).await.unwrap();
assert_eq!(ctx.created_streams().len(), 6);
}

#[tokio::test]
async fn provision_creates_correct_stream_names() {
let ctx = MockJetStreamContext::new();
provision_streams(&ctx, "acp").await.unwrap();
provision_streams(&ctx, &p("acp")).await.unwrap();
let names: Vec<String> = ctx
.created_streams()
.iter()
Expand All @@ -60,7 +65,7 @@ mod tests {
#[tokio::test]
async fn provision_with_custom_prefix() {
let ctx = MockJetStreamContext::new();
provision_streams(&ctx, "myapp").await.unwrap();
provision_streams(&ctx, &p("myapp")).await.unwrap();
let names: Vec<String> = ctx
.created_streams()
.iter()
Expand All @@ -73,16 +78,16 @@ mod tests {
async fn provision_returns_error_on_failure() {
let ctx = MockJetStreamContext::new();
ctx.fail_next();
let result = provision_streams(&ctx, "acp").await;
let result = provision_streams(&ctx, &p("acp")).await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("ACP_COMMANDS"));
}

#[tokio::test]
async fn provision_is_idempotent() {
let ctx = MockJetStreamContext::new();
provision_streams(&ctx, "acp").await.unwrap();
provision_streams(&ctx, "acp").await.unwrap();
provision_streams(&ctx, &p("acp")).await.unwrap();
provision_streams(&ctx, &p("acp")).await.unwrap();
assert_eq!(ctx.created_streams().len(), 12);
}
}
Loading
Loading