Skip to content

Commit dfdba45

Browse files
committed
refactor(acp-nats): per-subject structs with typed parameters
Each NATS subject is its own struct. Constructors accept AcpPrefix and AcpSessionId instead of raw strings — no primitive obsession, no way to pass unvalidated values. Passing the wrong subject to the wrong function is a compile error. Also fixes a potential panic in publish_session_ready where an unvalidated SessionId was passed to AcpSessionId::new().unwrap(). Now gracefully logs and returns on invalid session IDs. Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent d187733 commit dfdba45

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

66 files changed

+2257
-649
lines changed

rsworkspace/crates/acp-nats-agent/src/connection.rs

Lines changed: 108 additions & 69 deletions
Large diffs are not rendered by default.

rsworkspace/crates/acp-nats/src/agent/authenticate.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ pub async fn handle<N: RequestClient, C: GetElapsed, J>(
2121

2222
let result = nats::request_with_timeout::<N, AuthenticateRequest, AuthenticateResponse>(
2323
nats,
24-
&agent::authenticate(bridge.config.acp_prefix()),
24+
&agent::AuthenticateSubject::new(bridge.config.acp_prefix_ref()),
2525
&args,
2626
bridge.config.operation_timeout,
2727
)

rsworkspace/crates/acp-nats/src/agent/bridge.rs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ impl<N, C: GetElapsed, J> Bridge<N, C, J> {
108108
impl<N: PublishClient + FlushClient + Clone + Send + 'static, C: GetElapsed, J> Bridge<N, C, J> {
109109
pub(crate) fn schedule_session_ready(&self, session_id: SessionId) {
110110
let nats = self.nats.clone();
111-
let prefix = self.config.acp_prefix().to_string();
111+
let prefix = self.config.acp_prefix_ref().clone();
112112
let metrics = self.metrics.clone();
113113
let handle = tokio::spawn(async move {
114114
publish_session_ready(&nats, &prefix, &session_id, &metrics).await;
@@ -119,13 +119,21 @@ impl<N: PublishClient + FlushClient + Clone + Send + 'static, C: GetElapsed, J>
119119

120120
async fn publish_session_ready<N: PublishClient + FlushClient>(
121121
nats: &N,
122-
prefix: &str,
122+
prefix: &crate::acp_prefix::AcpPrefix,
123123
session_id: &SessionId,
124124
metrics: &Metrics,
125125
) {
126126
tokio::time::sleep(SESSION_READY_DELAY).await;
127127

128-
let subject = session::agent::ext_ready(prefix, &session_id.to_string());
128+
let acp_session_id = match crate::session_id::AcpSessionId::new(session_id.to_string()) {
129+
Ok(id) => id,
130+
Err(e) => {
131+
warn!(session_id = %session_id, error = %e, "Invalid session ID from backend, skipping session.ready");
132+
metrics.record_error("session_ready", "invalid_session_id");
133+
return;
134+
}
135+
};
136+
let subject = session::agent::ExtReadySubject::new(prefix, &acp_session_id);
129137
info!(session_id = %session_id, subject = %subject, "Publishing session.ready");
130138

131139
let message = ExtSessionReady::new(session_id.clone());
@@ -157,7 +165,7 @@ where
157165
{
158166
pub(crate) async fn session_request<Req, Res>(
159167
&self,
160-
subject: &str,
168+
subject: &impl std::fmt::Display,
161169
args: &Req,
162170
session_id: &str,
163171
) -> Result<Res>
@@ -167,12 +175,13 @@ where
167175
{
168176
use crate::error::map_nats_error;
169177

178+
let subject_str = subject.to_string();
170179
match self.js() {
171180
Some(js) => {
172181
let req_id = uuid::Uuid::new_v4().to_string();
173182
js_request::js_request::<J, _, Res, _>(
174183
js,
175-
subject,
184+
&subject_str,
176185
args,
177186
&trogon_std::StdJsonSerialize,
178187
self.config.acp_prefix(),
@@ -182,9 +191,9 @@ where
182191
)
183192
.await
184193
}
185-
None => nats::request_with_timeout::<N, Req, Res>(
194+
None => trogon_nats::request_with_timeout::<N, Req, Res>(
186195
self.nats(),
187-
subject,
196+
&subject_str,
188197
args,
189198
self.config.operation_timeout,
190199
)

rsworkspace/crates/acp-nats/src/agent/cancel.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ pub async fn handle<N: PublishClient + FlushClient, C: GetElapsed, J>(
2323

2424
info!(session_id = %args.session_id, "Cancel notification");
2525

26-
AcpSessionId::try_from(&args.session_id).map_err(|e| {
26+
let session_id = AcpSessionId::try_from(&args.session_id).map_err(|e| {
2727
bridge
2828
.metrics
2929
.record_request("cancel", bridge.clock.elapsed(start).as_secs_f64(), false);
@@ -34,7 +34,8 @@ pub async fn handle<N: PublishClient + FlushClient, C: GetElapsed, J>(
3434
)
3535
})?;
3636

37-
let subject = session::agent::cancel(bridge.config.acp_prefix(), &args.session_id.to_string());
37+
let prefix = bridge.config.acp_prefix_ref();
38+
let subject = session::agent::CancelSubject::new(prefix, &session_id);
3839

3940
let publish_result = nats::publish(
4041
bridge.nats(),
@@ -57,8 +58,7 @@ pub async fn handle<N: PublishClient + FlushClient, C: GetElapsed, J>(
5758
.record_error("cancel", "cancel_publish_failed");
5859
}
5960

60-
let cancelled_subject =
61-
session::agent::cancelled(bridge.config.acp_prefix(), &args.session_id.to_string());
61+
let cancelled_subject = session::agent::CancelledSubject::new(prefix, &session_id);
6262
if let Err(e) = bridge
6363
.nats()
6464
.publish_with_headers(

rsworkspace/crates/acp-nats/src/agent/close_session.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ where
3535
format!("Invalid session ID: {}", e),
3636
)
3737
})?;
38-
let prefix = bridge.config.acp_prefix();
39-
let subject = session::agent::close(prefix, session_id.as_str());
38+
let prefix = bridge.config.acp_prefix_ref();
39+
let subject = session::agent::CloseSubject::new(prefix, &session_id);
4040

4141
let result = bridge
4242
.session_request::<CloseSessionRequest, CloseSessionResponse>(

rsworkspace/crates/acp-nats/src/agent/ext_method.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ pub async fn handle<N: RequestClient, C: GetElapsed, J>(
3535
})?;
3636

3737
let nats = bridge.nats();
38-
let subject = agent::ext(bridge.config.acp_prefix(), method_name.as_str());
38+
let subject = agent::ExtSubject::new(bridge.config.acp_prefix_ref(), method_name.as_str());
3939

4040
let result = nats::request_with_timeout::<N, ExtRequest, ExtResponse>(
4141
nats,

rsworkspace/crates/acp-nats/src/agent/ext_notification.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ pub async fn handle<N: PublishClient + FlushClient, C: GetElapsed, J>(
3838
)
3939
})?;
4040

41-
let subject = agent::ext(bridge.config.acp_prefix(), method_name.as_str());
41+
let subject =
42+
agent::ExtNotifySubject::new(bridge.config.acp_prefix_ref(), method_name.as_str());
4243

4344
let publish_result = nats::publish(
4445
bridge.nats(),

rsworkspace/crates/acp-nats/src/agent/fork_session.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ where
3535
format!("Invalid session ID: {}", e),
3636
)
3737
})?;
38-
let prefix = bridge.config.acp_prefix();
39-
let subject = session::agent::fork(prefix, session_id.as_str());
38+
let prefix = bridge.config.acp_prefix_ref();
39+
let subject = session::agent::ForkSubject::new(prefix, &session_id);
4040

4141
let result = bridge
4242
.session_request::<ForkSessionRequest, ForkSessionResponse>(

rsworkspace/crates/acp-nats/src/agent/initialize.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ pub async fn handle<N: RequestClient, C: GetElapsed, J>(
2525
info!(client = %client_name, "Initialize request");
2626

2727
let nats = bridge.nats();
28-
let subject = agent::initialize(bridge.config.acp_prefix());
28+
let subject = agent::InitializeSubject::new(bridge.config.acp_prefix_ref());
2929

3030
let result = nats::request_with_timeout::<N, InitializeRequest, InitializeResponse>(
3131
nats,

rsworkspace/crates/acp-nats/src/agent/list_sessions.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ pub async fn handle<N: RequestClient, C: GetElapsed, J>(
1515
info!("List sessions request");
1616

1717
let nats = bridge.nats();
18-
let subject = agent::session_list(bridge.config.acp_prefix());
18+
let subject = agent::SessionListSubject::new(bridge.config.acp_prefix_ref());
1919

2020
let result = nats::request_with_timeout::<N, ListSessionsRequest, ListSessionsResponse>(
2121
nats,

0 commit comments

Comments
 (0)