Skip to content

Commit 8f9785f

Browse files
committed
feat(acp-nats): AcpStream enum owns stream config and subject patterns
AcpStream is the single source of truth for stream names, subject patterns, and JetStream config. All stream functions and callers now accept &AcpPrefix instead of &str. Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent 84355c0 commit 8f9785f

51 files changed

Lines changed: 557 additions & 229 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

rsworkspace/crates/acp-nats-agent/src/connection.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -489,7 +489,7 @@ where
489489
<<J::Stream as trogon_nats::jetstream::JetStreamCreateConsumer>::Consumer as trogon_nats::jetstream::JetStreamConsumer>::Message: JsDispatchMessage,
490490
A: Agent + 'static,
491491
{
492-
let stream_name = acp_nats::jetstream::streams::commands_stream_name(prefix.as_str());
492+
let stream_name = acp_nats::jetstream::streams::commands_stream_name(prefix);
493493
let config = acp_nats::jetstream::consumers::commands_observer();
494494

495495
info!(stream = %stream_name, "Starting JetStream consumer for COMMANDS stream");

rsworkspace/crates/acp-nats/src/acp_prefix.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77
//! malformed dots (consecutive, leading, trailing). Max 128 bytes. Validity is guaranteed at
88
//! construction.
99
10-
use trogon_nats::SubjectTokenViolation;
1110
use trogon_nats::DottedNatsToken;
11+
use trogon_nats::SubjectTokenViolation;
1212

1313
/// Error returned when [`AcpPrefix`] validation fails.
1414
#[derive(Debug, Clone, PartialEq)]

rsworkspace/crates/acp-nats/src/agent/bridge.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ where
184184
&subject_str,
185185
args,
186186
&trogon_std::StdJsonSerialize,
187-
self.config.acp_prefix(),
187+
self.config.acp_prefix_ref(),
188188
session_id,
189189
&req_id,
190190
self.config.operation_timeout,

rsworkspace/crates/acp-nats/src/agent/js_request.rs

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use trogon_nats::jetstream::{
1212
};
1313
use trogon_std::JsonSerialize;
1414

15+
use crate::acp_prefix::AcpPrefix;
1516
use crate::constants::SESSION_ID_HEADER;
1617
use crate::jetstream::{consumers, streams};
1718

@@ -21,7 +22,7 @@ pub async fn js_request<J, Req, Res, S>(
2122
subject: &str,
2223
request: &Req,
2324
serializer: &S,
24-
prefix: &str,
25+
prefix: &AcpPrefix,
2526
session_id: &str,
2627
req_id: &str,
2728
operation_timeout: Duration,
@@ -114,6 +115,10 @@ mod tests {
114115

115116
use crate::agent::test_support::MockJs;
116117

118+
fn test_prefix() -> AcpPrefix {
119+
AcpPrefix::new("acp").expect("test prefix")
120+
}
121+
117122
fn make_nats_msg(payload: &[u8]) -> async_nats::Message {
118123
async_nats::Message {
119124
subject: "test".into(),
@@ -141,7 +146,7 @@ mod tests {
141146
"acp.session.s1.agent.prompt",
142147
&agent_client_protocol::PromptRequest::new("s1", vec![]),
143148
&trogon_std::StdJsonSerialize,
144-
"acp",
149+
&test_prefix(),
145150
"s1",
146151
"req-1",
147152
Duration::from_secs(5),
@@ -167,7 +172,7 @@ mod tests {
167172
"acp.session.s1.agent.prompt",
168173
&agent_client_protocol::PromptRequest::new("s1", vec![]),
169174
&trogon_std::StdJsonSerialize,
170-
"acp",
175+
&test_prefix(),
171176
"s1",
172177
"req-1",
173178
Duration::from_secs(5),
@@ -187,7 +192,7 @@ mod tests {
187192
"acp.session.s1.agent.prompt",
188193
&agent_client_protocol::PromptRequest::new("s1", vec![]),
189194
&trogon_std::StdJsonSerialize,
190-
"acp",
195+
&test_prefix(),
191196
"s1",
192197
"req-1",
193198
Duration::from_secs(5),
@@ -214,7 +219,7 @@ mod tests {
214219
"acp.session.s1.agent.prompt",
215220
&agent_client_protocol::PromptRequest::new("s1", vec![]),
216221
&trogon_std::StdJsonSerialize,
217-
"acp",
222+
&test_prefix(),
218223
"s1",
219224
"req-1",
220225
Duration::from_secs(5),
@@ -239,7 +244,7 @@ mod tests {
239244
"acp.session.s1.agent.prompt",
240245
&agent_client_protocol::PromptRequest::new("s1", vec![]),
241246
&trogon_std::StdJsonSerialize,
242-
"acp",
247+
&test_prefix(),
243248
"s1",
244249
"req-1",
245250
Duration::from_secs(5),
@@ -261,7 +266,7 @@ mod tests {
261266
"acp.session.s1.agent.prompt",
262267
&agent_client_protocol::PromptRequest::new("s1", vec![]),
263268
&trogon_std::StdJsonSerialize,
264-
"acp",
269+
&test_prefix(),
265270
"s1",
266271
"req-1",
267272
Duration::from_millis(10),
@@ -298,7 +303,7 @@ mod tests {
298303
"acp.session.s1.agent.prompt",
299304
&agent_client_protocol::PromptRequest::new("s1", vec![]),
300305
&trogon_std::StdJsonSerialize,
301-
"acp",
306+
&test_prefix(),
302307
"s1",
303308
"req-1",
304309
Duration::from_secs(5),
@@ -323,7 +328,7 @@ mod tests {
323328
"acp.session.s1.agent.prompt",
324329
&agent_client_protocol::PromptRequest::new("s1", vec![]),
325330
&trogon_std::StdJsonSerialize,
326-
"acp",
331+
&test_prefix(),
327332
"s1",
328333
"req-1",
329334
Duration::from_secs(5),
@@ -355,7 +360,7 @@ mod tests {
355360
"acp.session.s1.agent.prompt",
356361
&agent_client_protocol::PromptRequest::new("s1", vec![]),
357362
&trogon_std::StdJsonSerialize,
358-
"acp",
363+
&test_prefix(),
359364
"s1",
360365
"req-1",
361366
Duration::from_secs(5),
@@ -377,7 +382,7 @@ mod tests {
377382
"acp.session.s1.agent.prompt",
378383
&agent_client_protocol::PromptRequest::new("s1", vec![]),
379384
&trogon_std::FailNextSerialize::new(1),
380-
"acp",
385+
&test_prefix(),
381386
"s1",
382387
"req-1",
383388
Duration::from_secs(5),

rsworkspace/crates/acp-nats/src/agent/prompt.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -206,9 +206,8 @@ where
206206
// JetStream consumers with DeliverAll replay from stream start, so they'll see the
207207
// response even if the runner responds before we start consuming.
208208
let sid = session_id.as_str();
209-
let pfx = prefix.as_str();
210-
let notifications_stream = streams::notifications_stream_name(pfx);
211-
let notif_config = consumers::prompt_notifications_consumer(pfx, sid, req_id);
209+
let notifications_stream = streams::notifications_stream_name(prefix);
210+
let notif_config = consumers::prompt_notifications_consumer(prefix, sid, req_id);
212211
let notif_stream = js.get_stream(&notifications_stream).await.map_err(|e| {
213212
Error::new(
214213
ErrorCode::InternalError.into(),
@@ -231,8 +230,8 @@ where
231230
)
232231
})?;
233232

234-
let responses_stream = streams::responses_stream_name(pfx);
235-
let resp_config = consumers::prompt_response_consumer(pfx, sid, req_id);
233+
let responses_stream = streams::responses_stream_name(prefix);
234+
let resp_config = consumers::prompt_response_consumer(prefix, sid, req_id);
236235
let resp_stream = js.get_stream(&responses_stream).await.map_err(|e| {
237236
Error::new(
238237
ErrorCode::InternalError.into(),

rsworkspace/crates/acp-nats/src/ext_method_name.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
//! rejects `*`, `>`, whitespace; allows dotted namespaces (e.g. `vendor.operation`) but rejects
66
//! malformed dots (consecutive, leading, trailing). Validity is guaranteed at construction.
77
8-
use trogon_nats::SubjectTokenViolation;
98
use trogon_nats::DottedNatsToken;
9+
use trogon_nats::SubjectTokenViolation;
1010

1111
/// Error returned when [`ExtMethodName`] validation fails.
1212
#[derive(Debug, Clone, PartialEq)]

rsworkspace/crates/acp-nats/src/jetstream/consumers.rs

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,34 @@
11
use async_nats::jetstream::consumer::pull::Config;
22
use async_nats::jetstream::consumer::{AckPolicy, DeliverPolicy, ReplayPolicy};
33

4-
pub fn prompt_notifications_consumer(prefix: &str, session_id: &str, req_id: &str) -> Config {
4+
use crate::acp_prefix::AcpPrefix;
5+
6+
pub fn prompt_notifications_consumer(prefix: &AcpPrefix, session_id: &str, req_id: &str) -> Config {
7+
let pfx = prefix.as_str();
58
Config {
6-
filter_subject: format!("{prefix}.session.{session_id}.agent.update.{req_id}"),
9+
filter_subject: format!("{pfx}.session.{session_id}.agent.update.{req_id}"),
710
deliver_policy: DeliverPolicy::All,
811
ack_policy: AckPolicy::Explicit,
912
replay_policy: ReplayPolicy::Instant,
1013
..Default::default()
1114
}
1215
}
1316

14-
pub fn prompt_response_consumer(prefix: &str, session_id: &str, req_id: &str) -> Config {
17+
pub fn prompt_response_consumer(prefix: &AcpPrefix, session_id: &str, req_id: &str) -> Config {
18+
let pfx = prefix.as_str();
1519
Config {
16-
filter_subject: format!("{prefix}.session.{session_id}.agent.prompt.response.{req_id}"),
20+
filter_subject: format!("{pfx}.session.{session_id}.agent.prompt.response.{req_id}"),
1721
deliver_policy: DeliverPolicy::All,
1822
ack_policy: AckPolicy::Explicit,
1923
replay_policy: ReplayPolicy::Instant,
2024
..Default::default()
2125
}
2226
}
2327

24-
pub fn response_consumer(prefix: &str, session_id: &str, req_id: &str) -> Config {
28+
pub fn response_consumer(prefix: &AcpPrefix, session_id: &str, req_id: &str) -> Config {
29+
let pfx = prefix.as_str();
2530
Config {
26-
filter_subject: format!("{prefix}.session.{session_id}.agent.response.{req_id}"),
31+
filter_subject: format!("{pfx}.session.{session_id}.agent.response.{req_id}"),
2732
deliver_policy: DeliverPolicy::All,
2833
ack_policy: AckPolicy::Explicit,
2934
replay_policy: ReplayPolicy::Instant,
@@ -48,9 +53,13 @@ pub fn commands_observer() -> Config {
4853
mod tests {
4954
use super::*;
5055

56+
fn p(s: &str) -> AcpPrefix {
57+
AcpPrefix::new(s).expect("test prefix")
58+
}
59+
5160
#[test]
5261
fn prompt_notifications_consumer_filter() {
53-
let config = prompt_notifications_consumer("acp", "sess-1", "req-abc");
62+
let config = prompt_notifications_consumer(&p("acp"), "sess-1", "req-abc");
5463
assert_eq!(
5564
config.filter_subject,
5665
"acp.session.sess-1.agent.update.req-abc"
@@ -59,15 +68,15 @@ mod tests {
5968

6069
#[test]
6170
fn prompt_notifications_consumer_delivers_all() {
62-
let config = prompt_notifications_consumer("acp", "s1", "r1");
71+
let config = prompt_notifications_consumer(&p("acp"), "s1", "r1");
6372
assert_eq!(config.deliver_policy, DeliverPolicy::All);
6473
assert_eq!(config.ack_policy, AckPolicy::Explicit);
6574
assert_eq!(config.replay_policy, ReplayPolicy::Instant);
6675
}
6776

6877
#[test]
6978
fn prompt_response_consumer_filter() {
70-
let config = prompt_response_consumer("acp", "sess-1", "req-abc");
79+
let config = prompt_response_consumer(&p("acp"), "sess-1", "req-abc");
7180
assert_eq!(
7281
config.filter_subject,
7382
"acp.session.sess-1.agent.prompt.response.req-abc"
@@ -89,7 +98,7 @@ mod tests {
8998

9099
#[test]
91100
fn response_consumer_filter() {
92-
let config = response_consumer("acp", "sess-1", "req-abc");
101+
let config = response_consumer(&p("acp"), "sess-1", "req-abc");
93102
assert_eq!(
94103
config.filter_subject,
95104
"acp.session.sess-1.agent.response.req-abc"
@@ -98,21 +107,21 @@ mod tests {
98107

99108
#[test]
100109
fn response_consumer_delivers_all() {
101-
let config = response_consumer("acp", "s1", "r1");
110+
let config = response_consumer(&p("acp"), "s1", "r1");
102111
assert_eq!(config.deliver_policy, DeliverPolicy::All);
103112
assert_eq!(config.ack_policy, AckPolicy::Explicit);
104113
assert_eq!(config.replay_policy, ReplayPolicy::Instant);
105114
}
106115

107116
#[test]
108117
fn response_consumer_custom_prefix() {
109-
let config = response_consumer("myapp", "s1", "r1");
118+
let config = response_consumer(&p("myapp"), "s1", "r1");
110119
assert_eq!(config.filter_subject, "myapp.session.s1.agent.response.r1");
111120
}
112121

113122
#[test]
114123
fn custom_prefix_in_consumers() {
115-
let config = prompt_response_consumer("myapp", "s1", "r1");
124+
let config = prompt_response_consumer(&p("myapp"), "s1", "r1");
116125
assert_eq!(
117126
config.filter_subject,
118127
"myapp.session.s1.agent.prompt.response.r1"

rsworkspace/crates/acp-nats/src/jetstream/provision.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ impl std::error::Error for ProvisionError {}
1616

1717
pub async fn provision_streams<J: JetStreamContext>(
1818
js: &J,
19-
prefix: &str,
19+
prefix: &crate::acp_prefix::AcpPrefix,
2020
) -> Result<(), ProvisionError> {
2121
for config in streams::all_configs(prefix) {
2222
let name = config.name.clone();
@@ -31,19 +31,24 @@ pub async fn provision_streams<J: JetStreamContext>(
3131
#[cfg(test)]
3232
mod tests {
3333
use super::*;
34+
use crate::acp_prefix::AcpPrefix;
3435
use trogon_nats::jetstream::MockJetStreamContext;
3536

37+
fn p(s: &str) -> AcpPrefix {
38+
AcpPrefix::new(s).expect("test prefix")
39+
}
40+
3641
#[tokio::test]
3742
async fn provision_creates_six_streams() {
3843
let ctx = MockJetStreamContext::new();
39-
provision_streams(&ctx, "acp").await.unwrap();
44+
provision_streams(&ctx, &p("acp")).await.unwrap();
4045
assert_eq!(ctx.created_streams().len(), 6);
4146
}
4247

4348
#[tokio::test]
4449
async fn provision_creates_correct_stream_names() {
4550
let ctx = MockJetStreamContext::new();
46-
provision_streams(&ctx, "acp").await.unwrap();
51+
provision_streams(&ctx, &p("acp")).await.unwrap();
4752
let names: Vec<String> = ctx
4853
.created_streams()
4954
.iter()
@@ -60,7 +65,7 @@ mod tests {
6065
#[tokio::test]
6166
async fn provision_with_custom_prefix() {
6267
let ctx = MockJetStreamContext::new();
63-
provision_streams(&ctx, "myapp").await.unwrap();
68+
provision_streams(&ctx, &p("myapp")).await.unwrap();
6469
let names: Vec<String> = ctx
6570
.created_streams()
6671
.iter()
@@ -73,16 +78,16 @@ mod tests {
7378
async fn provision_returns_error_on_failure() {
7479
let ctx = MockJetStreamContext::new();
7580
ctx.fail_next();
76-
let result = provision_streams(&ctx, "acp").await;
81+
let result = provision_streams(&ctx, &p("acp")).await;
7782
assert!(result.is_err());
7883
assert!(result.unwrap_err().to_string().contains("ACP_COMMANDS"));
7984
}
8085

8186
#[tokio::test]
8287
async fn provision_is_idempotent() {
8388
let ctx = MockJetStreamContext::new();
84-
provision_streams(&ctx, "acp").await.unwrap();
85-
provision_streams(&ctx, "acp").await.unwrap();
89+
provision_streams(&ctx, &p("acp")).await.unwrap();
90+
provision_streams(&ctx, &p("acp")).await.unwrap();
8691
assert_eq!(ctx.created_streams().len(), 12);
8792
}
8893
}

0 commit comments

Comments
 (0)