Skip to content

Commit 4f5a90e

Browse files
committed
refactor(acp-nats): typed subjects enforce correct transport at compile time
Introduce RequestSubject, PublishSubject, and JetStreamSubject newtypes. Subject builders return the type matching their transport. The wrapper functions in nats::mod only accept the correct type — passing a JetStreamSubject to request_with_timeout is a compile error. Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent d187733 commit 4f5a90e

6 files changed

Lines changed: 179 additions & 46 deletions

File tree

rsworkspace/crates/acp-nats/src/agent/bridge.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ where
157157
{
158158
pub(crate) async fn session_request<Req, Res>(
159159
&self,
160-
subject: &str,
160+
subject: &crate::nats::JetStreamSubject,
161161
args: &Req,
162162
session_id: &str,
163163
) -> Result<Res>
@@ -172,7 +172,7 @@ where
172172
let req_id = uuid::Uuid::new_v4().to_string();
173173
js_request::js_request::<J, _, Res, _>(
174174
js,
175-
subject,
175+
&subject.0,
176176
args,
177177
&trogon_std::StdJsonSerialize,
178178
self.config.acp_prefix(),
@@ -182,9 +182,9 @@ where
182182
)
183183
.await
184184
}
185-
None => nats::request_with_timeout::<N, Req, Res>(
185+
None => trogon_nats::request_with_timeout::<N, Req, Res>(
186186
self.nats(),
187-
subject,
187+
&subject.0,
188188
args,
189189
self.config.operation_timeout,
190190
)

rsworkspace/crates/acp-nats/src/agent/cancel.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ pub async fn handle<N: PublishClient + FlushClient, C: GetElapsed, J>(
6262
if let Err(e) = bridge
6363
.nats()
6464
.publish_with_headers(
65-
cancelled_subject,
65+
cancelled_subject.0,
6666
async_nats::HeaderMap::new(),
6767
bytes::Bytes::new(),
6868
)

rsworkspace/crates/acp-nats/src/agent/ext_notification.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ pub async fn handle<N: PublishClient + FlushClient, C: GetElapsed, J>(
3838
)
3939
})?;
4040

41-
let subject = agent::ext(bridge.config.acp_prefix(), method_name.as_str());
41+
let subject = agent::ext_notify(bridge.config.acp_prefix(), method_name.as_str());
4242

4343
let publish_result = nats::publish(
4444
bridge.nats(),

rsworkspace/crates/acp-nats/src/agent/prompt.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ where
9090

9191
let mut cancel_sub = bridge
9292
.nats
93-
.subscribe(session::agent::cancelled(prefix, sid))
93+
.subscribe(session::agent::cancelled(prefix, sid).0)
9494
.await
9595
.map_err(|e| {
9696
Error::new(
@@ -110,7 +110,7 @@ where
110110
let prompt_subject = session::agent::prompt(prefix, sid);
111111
bridge
112112
.nats
113-
.publish_with_headers(prompt_subject, headers, Bytes::from(payload_bytes))
113+
.publish_with_headers(prompt_subject.0, headers, Bytes::from(payload_bytes))
114114
.await
115115
.map_err(|e| Error::new(ErrorCode::InternalError.into(), format!("publish: {e}")))?;
116116

@@ -253,7 +253,7 @@ where
253253
// Cancel still uses core NATS — it's a fire-and-forget signal, not persisted.
254254
let mut cancel_sub = bridge
255255
.nats
256-
.subscribe(session::agent::cancelled(prefix, sid))
256+
.subscribe(session::agent::cancelled(prefix, sid).0)
257257
.await
258258
.map_err(|e| {
259259
Error::new(
@@ -272,7 +272,7 @@ where
272272
headers.insert(SESSION_ID_HEADER, sid);
273273

274274
let prompt_subject = session::agent::prompt(prefix, sid);
275-
js.publish_with_headers(prompt_subject, headers, Bytes::from(payload_bytes))
275+
js.publish_with_headers(prompt_subject.0, headers, Bytes::from(payload_bytes))
276276
.await
277277
.map_err(|e| Error::new(ErrorCode::InternalError.into(), format!("js publish: {e}")))?
278278
.await

rsworkspace/crates/acp-nats/src/nats/mod.rs

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,44 @@ pub mod parsing;
33
mod subjects;
44
pub(crate) mod token;
55

6+
use serde::Serialize;
7+
use serde::de::DeserializeOwned;
8+
use std::time::Duration;
9+
610
pub use extensions::ExtSessionReady;
711
pub use parsing::{
812
ClientMethod, GlobalAgentMethod, ParsedAgentSubject, ParsedClientSubject, SessionAgentMethod,
913
parse_agent_subject, parse_client_subject,
1014
};
11-
pub use subjects::{agent, session};
15+
pub use subjects::{JetStreamSubject, PublishSubject, RequestSubject, agent, session};
1216
pub use trogon_nats::{
13-
FlushClient, FlushPolicy, PublishClient, PublishOptions, RequestClient, RetryPolicy,
14-
SubscribeClient, client, connect, headers_with_trace_context, inject_trace_context, publish,
15-
request_with_timeout,
17+
FlushClient, FlushPolicy, NatsError, PublishClient, PublishOptions, RequestClient, RetryPolicy,
18+
SubscribeClient, client, connect, headers_with_trace_context, inject_trace_context,
1619
};
20+
21+
/// Core NATS request/reply — only accepts [`RequestSubject`].
22+
pub async fn request_with_timeout<N: RequestClient, Req, Res>(
23+
client: &N,
24+
subject: &RequestSubject,
25+
request: &Req,
26+
timeout: Duration,
27+
) -> Result<Res, NatsError>
28+
where
29+
Req: Serialize,
30+
Res: DeserializeOwned,
31+
{
32+
trogon_nats::request_with_timeout(client, &subject.0, request, timeout).await
33+
}
34+
35+
/// Core NATS fire-and-forget publish — only accepts [`PublishSubject`].
36+
pub async fn publish<N: PublishClient + FlushClient, Req>(
37+
client: &N,
38+
subject: &PublishSubject,
39+
request: &Req,
40+
options: PublishOptions,
41+
) -> Result<(), NatsError>
42+
where
43+
Req: Serialize,
44+
{
45+
trogon_nats::publish(client, &subject.0, request, options).await
46+
}

rsworkspace/crates/acp-nats/src/nats/subjects.rs

Lines changed: 135 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,106 @@
1+
/// Bridge → agent via Core NATS request/reply. Expects a response.
2+
#[derive(Debug)]
3+
#[repr(transparent)]
4+
pub struct RequestSubject(pub(crate) String);
5+
6+
/// Bridge → agent via Core NATS publish. Fire-and-forget, no response.
7+
#[derive(Debug)]
8+
#[repr(transparent)]
9+
pub struct PublishSubject(pub(crate) String);
10+
11+
/// Bridge → agent via JetStream. Captured by a stream, consumed by agent.
12+
#[derive(Debug)]
13+
#[repr(transparent)]
14+
pub struct JetStreamSubject(pub(crate) String);
15+
16+
impl std::fmt::Display for RequestSubject {
17+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
18+
self.0.fmt(f)
19+
}
20+
}
21+
22+
impl std::fmt::Display for PublishSubject {
23+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
24+
self.0.fmt(f)
25+
}
26+
}
27+
28+
impl std::fmt::Display for JetStreamSubject {
29+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30+
self.0.fmt(f)
31+
}
32+
}
33+
34+
impl PartialEq<&str> for RequestSubject {
35+
fn eq(&self, other: &&str) -> bool {
36+
self.0 == *other
37+
}
38+
}
39+
40+
impl PartialEq<&str> for PublishSubject {
41+
fn eq(&self, other: &&str) -> bool {
42+
self.0 == *other
43+
}
44+
}
45+
46+
impl PartialEq<&str> for JetStreamSubject {
47+
fn eq(&self, other: &&str) -> bool {
48+
self.0 == *other
49+
}
50+
}
51+
52+
#[cfg(test)]
53+
impl RequestSubject {
54+
pub fn starts_with(&self, prefix: &str) -> bool {
55+
self.0.starts_with(prefix)
56+
}
57+
}
58+
59+
#[cfg(test)]
60+
impl PublishSubject {
61+
pub fn starts_with(&self, prefix: &str) -> bool {
62+
self.0.starts_with(prefix)
63+
}
64+
}
65+
66+
#[cfg(test)]
67+
impl JetStreamSubject {
68+
pub fn starts_with(&self, prefix: &str) -> bool {
69+
self.0.starts_with(prefix)
70+
}
71+
}
72+
173
pub mod agent {
2-
pub fn initialize(prefix: &str) -> String {
3-
format!("{}.agent.initialize", prefix)
74+
use super::{PublishSubject, RequestSubject};
75+
76+
/// Core NATS request/reply. Stream: GLOBAL (observability).
77+
pub fn initialize(prefix: &str) -> RequestSubject {
78+
RequestSubject(format!("{}.agent.initialize", prefix))
479
}
580

6-
pub fn authenticate(prefix: &str) -> String {
7-
format!("{}.agent.authenticate", prefix)
81+
/// Core NATS request/reply. Stream: GLOBAL (observability).
82+
pub fn authenticate(prefix: &str) -> RequestSubject {
83+
RequestSubject(format!("{}.agent.authenticate", prefix))
884
}
985

10-
pub fn session_new(prefix: &str) -> String {
11-
format!("{}.agent.session.new", prefix)
86+
/// Core NATS request/reply. Stream: GLOBAL (observability).
87+
pub fn session_new(prefix: &str) -> RequestSubject {
88+
RequestSubject(format!("{}.agent.session.new", prefix))
1289
}
1390

14-
pub fn session_list(prefix: &str) -> String {
15-
format!("{}.agent.session.list", prefix)
91+
/// Core NATS request/reply. No stream.
92+
pub fn session_list(prefix: &str) -> RequestSubject {
93+
RequestSubject(format!("{}.agent.session.list", prefix))
1694
}
1795

18-
pub fn ext(prefix: &str, method: &str) -> String {
19-
format!("{}.agent.ext.{}", prefix, method)
96+
/// Core NATS request/reply. Stream: GLOBAL_EXT (observability).
97+
pub fn ext(prefix: &str, method: &str) -> RequestSubject {
98+
RequestSubject(format!("{}.agent.ext.{}", prefix, method))
99+
}
100+
101+
/// Core NATS publish (fire-and-forget notification). Stream: GLOBAL_EXT (observability).
102+
pub fn ext_notify(prefix: &str, method: &str) -> PublishSubject {
103+
PublishSubject(format!("{}.agent.ext.{}", prefix, method))
20104
}
21105

22106
pub mod wildcards {
@@ -28,65 +112,84 @@ pub mod agent {
28112

29113
pub mod session {
30114
pub mod agent {
31-
pub fn load(prefix: &str, session_id: &str) -> String {
32-
format!("{}.session.{}.agent.load", prefix, session_id)
115+
use super::super::{JetStreamSubject, PublishSubject};
116+
117+
/// Bridge: JetStream publish. Stream: COMMANDS.
118+
pub fn load(prefix: &str, session_id: &str) -> JetStreamSubject {
119+
JetStreamSubject(format!("{}.session.{}.agent.load", prefix, session_id))
33120
}
34121

35-
pub fn prompt(prefix: &str, session_id: &str) -> String {
36-
format!("{}.session.{}.agent.prompt", prefix, session_id)
122+
/// Bridge: JetStream publish. Stream: COMMANDS.
123+
pub fn prompt(prefix: &str, session_id: &str) -> JetStreamSubject {
124+
JetStreamSubject(format!("{}.session.{}.agent.prompt", prefix, session_id))
37125
}
38126

39127
pub fn prompt_wildcard(prefix: &str) -> String {
40128
format!("{}.session.*.agent.prompt", prefix)
41129
}
42130

43-
pub fn cancel(prefix: &str, session_id: &str) -> String {
44-
format!("{}.session.{}.agent.cancel", prefix, session_id)
131+
/// Bridge: Core NATS publish (fire-and-forget). Stream: COMMANDS.
132+
pub fn cancel(prefix: &str, session_id: &str) -> PublishSubject {
133+
PublishSubject(format!("{}.session.{}.agent.cancel", prefix, session_id))
45134
}
46135

47-
pub fn cancelled(prefix: &str, session_id: &str) -> String {
48-
format!("{}.session.{}.agent.cancelled", prefix, session_id)
136+
/// Agent → bridge broadcast. Stream: RESPONSES.
137+
pub fn cancelled(prefix: &str, session_id: &str) -> PublishSubject {
138+
PublishSubject(format!("{}.session.{}.agent.cancelled", prefix, session_id))
49139
}
50140

51-
pub fn set_mode(prefix: &str, session_id: &str) -> String {
52-
format!("{}.session.{}.agent.set_mode", prefix, session_id)
141+
/// Bridge: JetStream publish. Stream: COMMANDS.
142+
pub fn set_mode(prefix: &str, session_id: &str) -> JetStreamSubject {
143+
JetStreamSubject(format!("{}.session.{}.agent.set_mode", prefix, session_id))
53144
}
54145

55-
pub fn set_config_option(prefix: &str, session_id: &str) -> String {
56-
format!("{}.session.{}.agent.set_config_option", prefix, session_id)
146+
/// Bridge: JetStream publish. Stream: COMMANDS.
147+
pub fn set_config_option(prefix: &str, session_id: &str) -> JetStreamSubject {
148+
JetStreamSubject(format!(
149+
"{}.session.{}.agent.set_config_option",
150+
prefix, session_id
151+
))
57152
}
58153

59-
pub fn set_model(prefix: &str, session_id: &str) -> String {
60-
format!("{}.session.{}.agent.set_model", prefix, session_id)
154+
/// Bridge: JetStream publish. Stream: COMMANDS.
155+
pub fn set_model(prefix: &str, session_id: &str) -> JetStreamSubject {
156+
JetStreamSubject(format!("{}.session.{}.agent.set_model", prefix, session_id))
61157
}
62158

63-
pub fn fork(prefix: &str, session_id: &str) -> String {
64-
format!("{}.session.{}.agent.fork", prefix, session_id)
159+
/// Bridge: JetStream publish. Stream: COMMANDS.
160+
pub fn fork(prefix: &str, session_id: &str) -> JetStreamSubject {
161+
JetStreamSubject(format!("{}.session.{}.agent.fork", prefix, session_id))
65162
}
66163

67-
pub fn resume(prefix: &str, session_id: &str) -> String {
68-
format!("{}.session.{}.agent.resume", prefix, session_id)
164+
/// Bridge: JetStream publish. Stream: COMMANDS.
165+
pub fn resume(prefix: &str, session_id: &str) -> JetStreamSubject {
166+
JetStreamSubject(format!("{}.session.{}.agent.resume", prefix, session_id))
69167
}
70168

71-
pub fn close(prefix: &str, session_id: &str) -> String {
72-
format!("{}.session.{}.agent.close", prefix, session_id)
169+
/// Bridge: JetStream publish. Stream: COMMANDS.
170+
pub fn close(prefix: &str, session_id: &str) -> JetStreamSubject {
171+
JetStreamSubject(format!("{}.session.{}.agent.close", prefix, session_id))
73172
}
74173

75-
pub fn ext_ready(prefix: &str, session_id: &str) -> String {
76-
format!("{}.session.{}.agent.ext.ready", prefix, session_id)
174+
/// Agent → bridge signal. Stream: RESPONSES.
175+
pub fn ext_ready(prefix: &str, session_id: &str) -> PublishSubject {
176+
PublishSubject(format!("{}.session.{}.agent.ext.ready", prefix, session_id))
77177
}
78178

179+
/// Agent → bridge async notification. Stream: NOTIFICATIONS.
79180
pub fn update(prefix: &str, session_id: &str, req_id: &str) -> String {
80181
format!("{}.session.{}.agent.update.{}", prefix, session_id, req_id)
81182
}
82183

184+
/// Agent → bridge streamed response. Stream: RESPONSES.
83185
pub fn prompt_response(prefix: &str, session_id: &str, req_id: &str) -> String {
84186
format!(
85187
"{}.session.{}.agent.prompt.response.{}",
86188
prefix, session_id, req_id
87189
)
88190
}
89191

192+
/// Agent → bridge one-shot response. Stream: RESPONSES.
90193
pub fn response(prefix: &str, session_id: &str, req_id: &str) -> String {
91194
format!(
92195
"{}.session.{}.agent.response.{}",

0 commit comments

Comments
 (0)