Skip to content

Commit 12a58bb

Browse files
committed
refactor(acp-nats): per-subject structs with typed parameters
Each NATS subject is its own struct. Constructors accept AcpPrefix and AcpSessionId instead of raw strings — no primitive obsession, no way to pass unvalidated values. Passing the wrong subject to the wrong function is a compile error. Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent d187733 commit 12a58bb

66 files changed

Lines changed: 2150 additions & 649 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

rsworkspace/crates/acp-nats-agent/src/connection.rs

Lines changed: 108 additions & 69 deletions
Large diffs are not rendered by default.

rsworkspace/crates/acp-nats/src/agent/authenticate.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ pub async fn handle<N: RequestClient, C: GetElapsed, J>(
2121

2222
let result = nats::request_with_timeout::<N, AuthenticateRequest, AuthenticateResponse>(
2323
nats,
24-
&agent::authenticate(bridge.config.acp_prefix()),
24+
&agent::AuthenticateSubject::new(bridge.config.acp_prefix_ref()),
2525
&args,
2626
bridge.config.operation_timeout,
2727
)

rsworkspace/crates/acp-nats/src/agent/bridge.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ impl<N, C: GetElapsed, J> Bridge<N, C, J> {
108108
impl<N: PublishClient + FlushClient + Clone + Send + 'static, C: GetElapsed, J> Bridge<N, C, J> {
109109
pub(crate) fn schedule_session_ready(&self, session_id: SessionId) {
110110
let nats = self.nats.clone();
111-
let prefix = self.config.acp_prefix().to_string();
111+
let prefix = self.config.acp_prefix_ref().clone();
112112
let metrics = self.metrics.clone();
113113
let handle = tokio::spawn(async move {
114114
publish_session_ready(&nats, &prefix, &session_id, &metrics).await;
@@ -119,13 +119,14 @@ impl<N: PublishClient + FlushClient + Clone + Send + 'static, C: GetElapsed, J>
119119

120120
async fn publish_session_ready<N: PublishClient + FlushClient>(
121121
nats: &N,
122-
prefix: &str,
122+
prefix: &crate::acp_prefix::AcpPrefix,
123123
session_id: &SessionId,
124124
metrics: &Metrics,
125125
) {
126126
tokio::time::sleep(SESSION_READY_DELAY).await;
127127

128-
let subject = session::agent::ext_ready(prefix, &session_id.to_string());
128+
let acp_session_id = crate::session_id::AcpSessionId::new(session_id.to_string()).unwrap();
129+
let subject = session::agent::ExtReadySubject::new(prefix, &acp_session_id);
129130
info!(session_id = %session_id, subject = %subject, "Publishing session.ready");
130131

131132
let message = ExtSessionReady::new(session_id.clone());
@@ -157,7 +158,7 @@ where
157158
{
158159
pub(crate) async fn session_request<Req, Res>(
159160
&self,
160-
subject: &str,
161+
subject: &impl std::fmt::Display,
161162
args: &Req,
162163
session_id: &str,
163164
) -> Result<Res>
@@ -167,12 +168,13 @@ where
167168
{
168169
use crate::error::map_nats_error;
169170

171+
let subject_str = subject.to_string();
170172
match self.js() {
171173
Some(js) => {
172174
let req_id = uuid::Uuid::new_v4().to_string();
173175
js_request::js_request::<J, _, Res, _>(
174176
js,
175-
subject,
177+
&subject_str,
176178
args,
177179
&trogon_std::StdJsonSerialize,
178180
self.config.acp_prefix(),
@@ -182,9 +184,9 @@ where
182184
)
183185
.await
184186
}
185-
None => nats::request_with_timeout::<N, Req, Res>(
187+
None => trogon_nats::request_with_timeout::<N, Req, Res>(
186188
self.nats(),
187-
subject,
189+
&subject_str,
188190
args,
189191
self.config.operation_timeout,
190192
)

rsworkspace/crates/acp-nats/src/agent/cancel.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ pub async fn handle<N: PublishClient + FlushClient, C: GetElapsed, J>(
2323

2424
info!(session_id = %args.session_id, "Cancel notification");
2525

26-
AcpSessionId::try_from(&args.session_id).map_err(|e| {
26+
let session_id = AcpSessionId::try_from(&args.session_id).map_err(|e| {
2727
bridge
2828
.metrics
2929
.record_request("cancel", bridge.clock.elapsed(start).as_secs_f64(), false);
@@ -34,7 +34,8 @@ pub async fn handle<N: PublishClient + FlushClient, C: GetElapsed, J>(
3434
)
3535
})?;
3636

37-
let subject = session::agent::cancel(bridge.config.acp_prefix(), &args.session_id.to_string());
37+
let prefix = bridge.config.acp_prefix_ref();
38+
let subject = session::agent::CancelSubject::new(prefix, &session_id);
3839

3940
let publish_result = nats::publish(
4041
bridge.nats(),
@@ -57,8 +58,7 @@ pub async fn handle<N: PublishClient + FlushClient, C: GetElapsed, J>(
5758
.record_error("cancel", "cancel_publish_failed");
5859
}
5960

60-
let cancelled_subject =
61-
session::agent::cancelled(bridge.config.acp_prefix(), &args.session_id.to_string());
61+
let cancelled_subject = session::agent::CancelledSubject::new(prefix, &session_id);
6262
if let Err(e) = bridge
6363
.nats()
6464
.publish_with_headers(

rsworkspace/crates/acp-nats/src/agent/close_session.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ where
3535
format!("Invalid session ID: {}", e),
3636
)
3737
})?;
38-
let prefix = bridge.config.acp_prefix();
39-
let subject = session::agent::close(prefix, session_id.as_str());
38+
let prefix = bridge.config.acp_prefix_ref();
39+
let subject = session::agent::CloseSubject::new(prefix, &session_id);
4040

4141
let result = bridge
4242
.session_request::<CloseSessionRequest, CloseSessionResponse>(

rsworkspace/crates/acp-nats/src/agent/ext_method.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ pub async fn handle<N: RequestClient, C: GetElapsed, J>(
3535
})?;
3636

3737
let nats = bridge.nats();
38-
let subject = agent::ext(bridge.config.acp_prefix(), method_name.as_str());
38+
let subject = agent::ExtSubject::new(bridge.config.acp_prefix_ref(), method_name.as_str());
3939

4040
let result = nats::request_with_timeout::<N, ExtRequest, ExtResponse>(
4141
nats,

rsworkspace/crates/acp-nats/src/agent/ext_notification.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ pub async fn handle<N: PublishClient + FlushClient, C: GetElapsed, J>(
3838
)
3939
})?;
4040

41-
let subject = agent::ext(bridge.config.acp_prefix(), method_name.as_str());
41+
let subject =
42+
agent::ExtNotifySubject::new(bridge.config.acp_prefix_ref(), method_name.as_str());
4243

4344
let publish_result = nats::publish(
4445
bridge.nats(),

rsworkspace/crates/acp-nats/src/agent/fork_session.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ where
3535
format!("Invalid session ID: {}", e),
3636
)
3737
})?;
38-
let prefix = bridge.config.acp_prefix();
39-
let subject = session::agent::fork(prefix, session_id.as_str());
38+
let prefix = bridge.config.acp_prefix_ref();
39+
let subject = session::agent::ForkSubject::new(prefix, &session_id);
4040

4141
let result = bridge
4242
.session_request::<ForkSessionRequest, ForkSessionResponse>(

rsworkspace/crates/acp-nats/src/agent/initialize.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ pub async fn handle<N: RequestClient, C: GetElapsed, J>(
2525
info!(client = %client_name, "Initialize request");
2626

2727
let nats = bridge.nats();
28-
let subject = agent::initialize(bridge.config.acp_prefix());
28+
let subject = agent::InitializeSubject::new(bridge.config.acp_prefix_ref());
2929

3030
let result = nats::request_with_timeout::<N, InitializeRequest, InitializeResponse>(
3131
nats,

rsworkspace/crates/acp-nats/src/agent/list_sessions.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ pub async fn handle<N: RequestClient, C: GetElapsed, J>(
1515
info!("List sessions request");
1616

1717
let nats = bridge.nats();
18-
let subject = agent::session_list(bridge.config.acp_prefix());
18+
let subject = agent::SessionListSubject::new(bridge.config.acp_prefix_ref());
1919

2020
let result = nats::request_with_timeout::<N, ListSessionsRequest, ListSessionsResponse>(
2121
nats,

0 commit comments

Comments
 (0)