Skip to content

Commit 4bc1d90

Browse files
committed
refactor(acp-nats): per-subject structs enforce correct NATS usage at compile time
Each NATS subject is now its own struct with typed parameters. Passing the wrong subject to the wrong function is a compile error. Organized into directories by role: - commands/ — bridge→agent JetStream commands - global/ — global Core NATS request/reply - responses/ — agent→bridge responses - client_ops/ — agent→bridge client proxy - subscriptions/ — wildcard patterns for .subscribe() Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent d187733 commit 4bc1d90

65 files changed

Lines changed: 1930 additions & 565 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

rsworkspace/crates/acp-nats-agent/src/connection.rs

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use agent_client_protocol::{
1212
};
1313
use async_nats::Message;
1414
use async_nats::jetstream::AckKind;
15+
use async_nats::subject::ToSubject;
1516
#[cfg(test)]
1617
use bytes::Bytes;
1718
use futures::StreamExt;
@@ -160,8 +161,8 @@ where
160161
N: SubscribeClient + PublishClient + FlushClient + Clone + 'static,
161162
A: Agent + 'static,
162163
{
163-
let global_wildcard = acp_nats::nats::agent::wildcards::all(prefix);
164-
let session_wildcard = acp_nats::nats::session::wildcards::all_agent(prefix);
164+
let global_wildcard = acp_nats::nats::agent::wildcards::GlobalAllSubject::new(prefix);
165+
let session_wildcard = acp_nats::nats::session::wildcards::AllAgentSubject::new(prefix);
165166

166167
info!(
167168
global = %global_wildcard,
@@ -206,8 +207,8 @@ where
206207
N: SubscribeClient + PublishClient + FlushClient + Clone + 'static,
207208
A: Agent + 'static,
208209
{
209-
let global_wildcard = acp_nats::nats::agent::wildcards::all(prefix);
210-
let ext_wildcard = acp_nats::nats::session::wildcards::all_agent_ext(prefix);
210+
let global_wildcard = acp_nats::nats::agent::wildcards::GlobalAllSubject::new(prefix);
211+
let ext_wildcard = acp_nats::nats::session::wildcards::AllAgentExtSubject::new(prefix);
211212

212213
info!(
213214
global = %global_wildcard,
@@ -568,16 +569,20 @@ async fn dispatch_js_message<N: PublishClient + FlushClient, A: Agent, M: JsDisp
568569
.and_then(|h| h.get(trogon_nats::REQ_ID_HEADER))
569570
.map(|v| v.as_str().to_string());
570571

571-
let reply_subject = match (&req_id, &method) {
572+
let reply_subject: Option<async_nats::subject::Subject> = match (&req_id, &method) {
572573
(Some(rid), SessionAgentMethod::Prompt) => Some(
573-
acp_nats::nats::session::agent::prompt_response(prefix, session_id.as_str(), rid),
574+
acp_nats::nats::session::agent::PromptResponseSubject::new(
575+
prefix,
576+
session_id.as_str(),
577+
rid,
578+
)
579+
.to_subject(),
574580
),
575581
(_, SessionAgentMethod::Cancel) => None,
576-
(Some(rid), _) => Some(acp_nats::nats::session::agent::response(
577-
prefix,
578-
session_id.as_str(),
579-
rid,
580-
)),
582+
(Some(rid), _) => Some(
583+
acp_nats::nats::session::agent::ResponseSubject::new(prefix, session_id.as_str(), rid)
584+
.to_subject(),
585+
),
581586
(None, _) => {
582587
warn!(subject, "JetStream message missing X-Req-Id header");
583588
None
@@ -587,7 +592,7 @@ async fn dispatch_js_message<N: PublishClient + FlushClient, A: Agent, M: JsDisp
587592
let inner = js_msg.message();
588593
let msg = Message {
589594
subject: subject.as_str().into(),
590-
reply: reply_subject.as_deref().map(|s| s.into()),
595+
reply: reply_subject,
591596
payload: inner.payload.clone(),
592597
headers: inner.headers.clone(),
593598
status: None,

rsworkspace/crates/acp-nats/src/agent/authenticate.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ pub async fn handle<N: RequestClient, C: GetElapsed, J>(
2121

2222
let result = nats::request_with_timeout::<N, AuthenticateRequest, AuthenticateResponse>(
2323
nats,
24-
&agent::authenticate(bridge.config.acp_prefix()),
24+
&agent::AuthenticateSubject::new(bridge.config.acp_prefix()),
2525
&args,
2626
bridge.config.operation_timeout,
2727
)

rsworkspace/crates/acp-nats/src/agent/bridge.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ async fn publish_session_ready<N: PublishClient + FlushClient>(
125125
) {
126126
tokio::time::sleep(SESSION_READY_DELAY).await;
127127

128-
let subject = session::agent::ext_ready(prefix, &session_id.to_string());
128+
let subject = session::agent::ExtReadySubject::new(prefix, &session_id.to_string());
129129
info!(session_id = %session_id, subject = %subject, "Publishing session.ready");
130130

131131
let message = ExtSessionReady::new(session_id.clone());
@@ -157,7 +157,7 @@ where
157157
{
158158
pub(crate) async fn session_request<Req, Res>(
159159
&self,
160-
subject: &str,
160+
subject: &impl std::fmt::Display,
161161
args: &Req,
162162
session_id: &str,
163163
) -> Result<Res>
@@ -167,12 +167,13 @@ where
167167
{
168168
use crate::error::map_nats_error;
169169

170+
let subject_str = subject.to_string();
170171
match self.js() {
171172
Some(js) => {
172173
let req_id = uuid::Uuid::new_v4().to_string();
173174
js_request::js_request::<J, _, Res, _>(
174175
js,
175-
subject,
176+
&subject_str,
176177
args,
177178
&trogon_std::StdJsonSerialize,
178179
self.config.acp_prefix(),
@@ -182,9 +183,9 @@ where
182183
)
183184
.await
184185
}
185-
None => nats::request_with_timeout::<N, Req, Res>(
186+
None => trogon_nats::request_with_timeout::<N, Req, Res>(
186187
self.nats(),
187-
subject,
188+
&subject_str,
188189
args,
189190
self.config.operation_timeout,
190191
)

rsworkspace/crates/acp-nats/src/agent/cancel.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,10 @@ pub async fn handle<N: PublishClient + FlushClient, C: GetElapsed, J>(
3434
)
3535
})?;
3636

37-
let subject = session::agent::cancel(bridge.config.acp_prefix(), &args.session_id.to_string());
37+
let subject = session::agent::CancelSubject::new(
38+
bridge.config.acp_prefix(),
39+
&args.session_id.to_string(),
40+
);
3841

3942
let publish_result = nats::publish(
4043
bridge.nats(),
@@ -57,8 +60,10 @@ pub async fn handle<N: PublishClient + FlushClient, C: GetElapsed, J>(
5760
.record_error("cancel", "cancel_publish_failed");
5861
}
5962

60-
let cancelled_subject =
61-
session::agent::cancelled(bridge.config.acp_prefix(), &args.session_id.to_string());
63+
let cancelled_subject = session::agent::CancelledSubject::new(
64+
bridge.config.acp_prefix(),
65+
&args.session_id.to_string(),
66+
);
6267
if let Err(e) = bridge
6368
.nats()
6469
.publish_with_headers(

rsworkspace/crates/acp-nats/src/agent/close_session.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ where
3636
)
3737
})?;
3838
let prefix = bridge.config.acp_prefix();
39-
let subject = session::agent::close(prefix, session_id.as_str());
39+
let subject = session::agent::CloseSubject::new(prefix, session_id.as_str());
4040

4141
let result = bridge
4242
.session_request::<CloseSessionRequest, CloseSessionResponse>(

rsworkspace/crates/acp-nats/src/agent/ext_method.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ pub async fn handle<N: RequestClient, C: GetElapsed, J>(
3535
})?;
3636

3737
let nats = bridge.nats();
38-
let subject = agent::ext(bridge.config.acp_prefix(), method_name.as_str());
38+
let subject = agent::ExtSubject::new(bridge.config.acp_prefix(), method_name.as_str());
3939

4040
let result = nats::request_with_timeout::<N, ExtRequest, ExtResponse>(
4141
nats,

rsworkspace/crates/acp-nats/src/agent/ext_notification.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ pub async fn handle<N: PublishClient + FlushClient, C: GetElapsed, J>(
3838
)
3939
})?;
4040

41-
let subject = agent::ext(bridge.config.acp_prefix(), method_name.as_str());
41+
let subject = agent::ExtNotifySubject::new(bridge.config.acp_prefix(), method_name.as_str());
4242

4343
let publish_result = nats::publish(
4444
bridge.nats(),

rsworkspace/crates/acp-nats/src/agent/fork_session.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ where
3636
)
3737
})?;
3838
let prefix = bridge.config.acp_prefix();
39-
let subject = session::agent::fork(prefix, session_id.as_str());
39+
let subject = session::agent::ForkSubject::new(prefix, session_id.as_str());
4040

4141
let result = bridge
4242
.session_request::<ForkSessionRequest, ForkSessionResponse>(

rsworkspace/crates/acp-nats/src/agent/initialize.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ pub async fn handle<N: RequestClient, C: GetElapsed, J>(
2525
info!(client = %client_name, "Initialize request");
2626

2727
let nats = bridge.nats();
28-
let subject = agent::initialize(bridge.config.acp_prefix());
28+
let subject = agent::InitializeSubject::new(bridge.config.acp_prefix());
2929

3030
let result = nats::request_with_timeout::<N, InitializeRequest, InitializeResponse>(
3131
nats,

rsworkspace/crates/acp-nats/src/agent/list_sessions.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ pub async fn handle<N: RequestClient, C: GetElapsed, J>(
1515
info!("List sessions request");
1616

1717
let nats = bridge.nats();
18-
let subject = agent::session_list(bridge.config.acp_prefix());
18+
let subject = agent::SessionListSubject::new(bridge.config.acp_prefix());
1919

2020
let result = nats::request_with_timeout::<N, ListSessionsRequest, ListSessionsResponse>(
2121
nats,

0 commit comments

Comments
 (0)