Skip to content

Commit ef26c8c

Browse files
authored
refactor(acp-nats): redesign NATS subject topology (#64)
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent fe3f2e7 commit ef26c8c

File tree

18 files changed

+692
-882
lines changed

18 files changed

+692
-882
lines changed

rsworkspace/crates/acp-nats-agent/src/connection.rs

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -115,11 +115,8 @@ where
115115
N: SubscribeClient + PublishClient + FlushClient + Clone + 'static,
116116
A: Agent + 'static,
117117
{
118-
// TODO: These two wildcards overlap when session_id == "agent", causing duplicate
119-
// dispatch. A single {prefix}.> avoids duplicates but consumes client messages.
120-
// Revisit the subject topology to eliminate both problems.
121118
let global_wildcard = acp_nats::nats::agent::wildcards::all(prefix);
122-
let session_wildcard = acp_nats::nats::agent::wildcards::all_sessions(prefix);
119+
let session_wildcard = acp_nats::nats::session::wildcards::all_agent(prefix);
123120

124121
info!(
125122
global = %global_wildcard,
@@ -461,8 +458,8 @@ mod tests {
461458
#[tokio::test]
462459
async fn dispatch_cancel_is_notification_no_reply_published() {
463460
let (nats, agent) = dispatch(
464-
"acp.s1.agent.session.cancel",
465-
&CancelNotification::new("sess-1"),
461+
"acp.session.s1.agent.cancel",
462+
&CancelNotification::new("s1"),
466463
None,
467464
)
468465
.await;
@@ -496,8 +493,8 @@ mod tests {
496493
#[tokio::test]
497494
async fn dispatch_prompt_returns_stop_reason() {
498495
let (nats, _) = dispatch(
499-
"acp.s1.agent.session.prompt",
500-
&PromptRequest::new("sess-1", vec![]),
496+
"acp.session.s1.agent.prompt",
497+
&PromptRequest::new("s1", vec![]),
501498
Some("_INBOX.3"),
502499
)
503500
.await;
@@ -599,8 +596,8 @@ mod tests {
599596
#[tokio::test]
600597
async fn dispatch_session_load_publishes_response() {
601598
assert_dispatch_publishes(
602-
"acp.s1.agent.session.load",
603-
&LoadSessionRequest::new("sess-1", "/tmp"),
599+
"acp.session.s1.agent.load",
600+
&LoadSessionRequest::new("s1", "/tmp"),
604601
)
605602
.await;
606603
}
@@ -613,53 +610,53 @@ mod tests {
613610
#[tokio::test]
614611
async fn dispatch_set_session_mode_publishes_response() {
615612
assert_dispatch_publishes(
616-
"acp.s1.agent.session.set_mode",
617-
&SetSessionModeRequest::new("sess-1", "code"),
613+
"acp.session.s1.agent.set_mode",
614+
&SetSessionModeRequest::new("s1", "code"),
618615
)
619616
.await;
620617
}
621618

622619
#[tokio::test]
623620
async fn dispatch_set_session_config_option_publishes_response() {
624621
assert_dispatch_publishes(
625-
"acp.s1.agent.session.set_config_option",
626-
&SetSessionConfigOptionRequest::new("sess-1", "key", "val"),
622+
"acp.session.s1.agent.set_config_option",
623+
&SetSessionConfigOptionRequest::new("s1", "key", "val"),
627624
)
628625
.await;
629626
}
630627

631628
#[tokio::test]
632629
async fn dispatch_set_session_model_publishes_response() {
633630
assert_dispatch_publishes(
634-
"acp.s1.agent.session.set_model",
635-
&SetSessionModelRequest::new("sess-1", "gpt-4"),
631+
"acp.session.s1.agent.set_model",
632+
&SetSessionModelRequest::new("s1", "gpt-4"),
636633
)
637634
.await;
638635
}
639636

640637
#[tokio::test]
641638
async fn dispatch_fork_session_publishes_response() {
642639
assert_dispatch_publishes(
643-
"acp.s1.agent.session.fork",
644-
&ForkSessionRequest::new("sess-1", "/tmp"),
640+
"acp.session.s1.agent.fork",
641+
&ForkSessionRequest::new("s1", "/tmp"),
645642
)
646643
.await;
647644
}
648645

649646
#[tokio::test]
650647
async fn dispatch_resume_session_publishes_response() {
651648
assert_dispatch_publishes(
652-
"acp.s1.agent.session.resume",
653-
&ResumeSessionRequest::new("sess-1", "/tmp"),
649+
"acp.session.s1.agent.resume",
650+
&ResumeSessionRequest::new("s1", "/tmp"),
654651
)
655652
.await;
656653
}
657654

658655
#[tokio::test]
659656
async fn dispatch_close_session_publishes_response() {
660657
assert_dispatch_publishes(
661-
"acp.s1.agent.session.close",
662-
&CloseSessionRequest::new("sess-1"),
658+
"acp.session.s1.agent.close",
659+
&CloseSessionRequest::new("s1"),
663660
)
664661
.await;
665662
}
@@ -832,7 +829,7 @@ mod tests {
832829

833830
let subjects = nats.subscribed_to();
834831
assert!(subjects.contains(&"myprefix.agent.>".to_string()));
835-
assert!(subjects.contains(&"myprefix.*.agent.>".to_string()));
832+
assert!(subjects.contains(&"myprefix.session.*.agent.>".to_string()));
836833
})
837834
.await;
838835
}

rsworkspace/crates/acp-nats/src/agent/bridge.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::cell::RefCell;
33
use crate::config::Config;
44
use crate::nats::{
55
self, ExtSessionReady, FlushClient, FlushPolicy, PublishClient, PublishOptions, RequestClient,
6-
RetryPolicy, SubscribeClient, agent,
6+
RetryPolicy, SubscribeClient, session,
77
};
88
use crate::pending_prompt_waiters::PendingSessionPromptResponseWaiters;
99
use crate::telemetry::metrics::Metrics;
@@ -96,7 +96,7 @@ async fn publish_session_ready<N: PublishClient + FlushClient>(
9696
) {
9797
tokio::time::sleep(SESSION_READY_DELAY).await;
9898

99-
let subject = agent::ext_session_ready(prefix, &session_id.to_string());
99+
let subject = session::agent::ext_ready(prefix, &session_id.to_string());
100100
info!(session_id = %session_id, subject = %subject, "Publishing session.ready");
101101

102102
let message = ExtSessionReady::new(session_id.clone());

rsworkspace/crates/acp-nats/src/agent/cancel.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use super::Bridge;
2-
use crate::nats::{self, FlushClient, PublishClient, agent};
2+
use crate::nats::{self, FlushClient, PublishClient, session};
33
use crate::session_id::AcpSessionId;
44
use agent_client_protocol::{CancelNotification, Error, ErrorCode, Result};
55
use tracing::{info, instrument, warn};
@@ -34,7 +34,7 @@ pub async fn handle<N: PublishClient + FlushClient, C: GetElapsed>(
3434
)
3535
})?;
3636

37-
let subject = agent::session_cancel(bridge.config.acp_prefix(), &args.session_id.to_string());
37+
let subject = session::agent::cancel(bridge.config.acp_prefix(), &args.session_id.to_string());
3838

3939
let publish_result = nats::publish(
4040
bridge.nats(),
@@ -58,7 +58,7 @@ pub async fn handle<N: PublishClient + FlushClient, C: GetElapsed>(
5858
}
5959

6060
let cancelled_subject =
61-
agent::session_cancelled(bridge.config.acp_prefix(), &args.session_id.to_string());
61+
session::agent::cancelled(bridge.config.acp_prefix(), &args.session_id.to_string());
6262
if let Err(e) = bridge
6363
.nats()
6464
.publish_with_headers(
@@ -116,8 +116,8 @@ mod tests {
116116

117117
let published = mock.published_messages();
118118
assert!(
119-
published.contains(&"acp.s1.agent.session.cancel".to_string()),
120-
"expected publish to acp.s1.agent.session.cancel, got: {:?}",
119+
published.contains(&"acp.session.s1.agent.cancel".to_string()),
120+
"expected publish to acp.session.s1.agent.cancel, got: {:?}",
121121
published
122122
);
123123
}
@@ -130,8 +130,8 @@ mod tests {
130130

131131
let published = mock.published_messages();
132132
assert!(
133-
published.contains(&"acp.s1.agent.session.cancelled".to_string()),
134-
"expected publish to acp.s1.agent.session.cancelled (prompt broadcast), got: {:?}",
133+
published.contains(&"acp.session.s1.agent.cancelled".to_string()),
134+
"expected publish to acp.session.s1.agent.cancelled (prompt broadcast), got: {:?}",
135135
published
136136
);
137137
}

rsworkspace/crates/acp-nats/src/agent/close_session.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use super::Bridge;
22
use crate::error::map_nats_error;
3-
use crate::nats::{self, RequestClient, agent};
3+
use crate::nats::{self, RequestClient, session};
44
use crate::session_id::AcpSessionId;
55
use agent_client_protocol::{CloseSessionRequest, CloseSessionResponse, Error, ErrorCode, Result};
66
use tracing::{info, instrument};
@@ -29,7 +29,7 @@ pub async fn handle<N: RequestClient, C: GetElapsed>(
2929
)
3030
})?;
3131
let nats = bridge.nats();
32-
let subject = agent::session_close(bridge.config.acp_prefix(), session_id.as_str());
32+
let subject = session::agent::close(bridge.config.acp_prefix(), session_id.as_str());
3333

3434
let result = nats::request_with_timeout::<N, CloseSessionRequest, CloseSessionResponse>(
3535
nats,
@@ -61,7 +61,7 @@ mod tests {
6161
async fn close_session_forwards_request_and_returns_response() {
6262
let (mock, bridge) = mock_bridge();
6363
let expected = CloseSessionResponse::new();
64-
set_json_response(&mock, "acp.s1.agent.session.close", &expected);
64+
set_json_response(&mock, "acp.session.s1.agent.close", &expected);
6565

6666
let request = CloseSessionRequest::new("s1");
6767
let result = bridge.close_session(request).await;
@@ -82,7 +82,7 @@ mod tests {
8282
#[tokio::test]
8383
async fn close_session_returns_error_when_response_is_invalid_json() {
8484
let (mock, bridge) = mock_bridge();
85-
mock.set_response("acp.s1.agent.session.close", "not json".into());
85+
mock.set_response("acp.session.s1.agent.close", "not json".into());
8686

8787
let request = CloseSessionRequest::new("s1");
8888
let err = bridge.close_session(request).await.unwrap_err();
@@ -105,7 +105,7 @@ mod tests {
105105
let (mock, bridge, exporter, provider) = mock_bridge_with_metrics();
106106
set_json_response(
107107
&mock,
108-
"acp.s1.agent.session.close",
108+
"acp.session.s1.agent.close",
109109
&CloseSessionResponse::new(),
110110
);
111111

rsworkspace/crates/acp-nats/src/agent/fork_session.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use super::Bridge;
22
use crate::error::map_nats_error;
3-
use crate::nats::{self, FlushClient, PublishClient, RequestClient, agent};
3+
use crate::nats::{self, FlushClient, PublishClient, RequestClient, session};
44
use crate::session_id::AcpSessionId;
55
use agent_client_protocol::{Error, ErrorCode, ForkSessionRequest, ForkSessionResponse, Result};
66
use tracing::{Span, info, instrument};
@@ -29,7 +29,7 @@ pub async fn handle<N: RequestClient + PublishClient + FlushClient, C: GetElapse
2929
)
3030
})?;
3131
let nats = bridge.nats();
32-
let subject = agent::session_fork(bridge.config.acp_prefix(), session_id.as_str());
32+
let subject = session::agent::fork(bridge.config.acp_prefix(), session_id.as_str());
3333

3434
let result = nats::request_with_timeout::<N, ForkSessionRequest, ForkSessionResponse>(
3535
nats,
@@ -73,7 +73,7 @@ mod tests {
7373
let (mock, bridge) = mock_bridge();
7474
let new_session_id = SessionId::from("forked-session-1");
7575
let expected = ForkSessionResponse::new(new_session_id.clone());
76-
set_json_response(&mock, "acp.s1.agent.session.fork", &expected);
76+
set_json_response(&mock, "acp.session.s1.agent.fork", &expected);
7777

7878
let request = ForkSessionRequest::new("s1", ".");
7979
let result = bridge.fork_session(request).await;
@@ -97,7 +97,7 @@ mod tests {
9797
#[tokio::test]
9898
async fn fork_session_returns_error_when_response_is_invalid_json() {
9999
let (mock, bridge) = mock_bridge();
100-
mock.set_response("acp.s1.agent.session.fork", "not json".into());
100+
mock.set_response("acp.session.s1.agent.fork", "not json".into());
101101

102102
let request = ForkSessionRequest::new("s1", ".");
103103
let err = bridge.fork_session(request).await.unwrap_err();
@@ -121,7 +121,7 @@ mod tests {
121121
let new_session_id = SessionId::from("forked-session-1");
122122
set_json_response(
123123
&mock,
124-
"acp.s1.agent.session.fork",
124+
"acp.session.s1.agent.fork",
125125
&ForkSessionResponse::new(new_session_id),
126126
);
127127

@@ -132,8 +132,8 @@ mod tests {
132132
tokio::time::sleep(Duration::from_millis(300)).await;
133133
let published = mock.published_messages();
134134
assert!(
135-
published.contains(&"acp.forked-session-1.agent.ext.session.ready".to_string()),
136-
"expected publish to acp.forked-session-1.agent.ext.session.ready, got: {:?}",
135+
published.contains(&"acp.session.forked-session-1.agent.ext.ready".to_string()),
136+
"expected publish to acp.session.forked-session-1.agent.ext.ready, got: {:?}",
137137
published
138138
);
139139
}
@@ -143,7 +143,7 @@ mod tests {
143143
let (mock, bridge, exporter, provider) = mock_bridge_with_metrics();
144144
set_json_response(
145145
&mock,
146-
"acp.s1.agent.session.fork",
146+
"acp.session.s1.agent.fork",
147147
&ForkSessionResponse::new("forked-1"),
148148
);
149149

@@ -184,7 +184,7 @@ mod tests {
184184
let (mock, bridge, exporter, provider) = mock_bridge_with_metrics();
185185
set_json_response(
186186
&mock,
187-
"acp.s1.agent.session.fork",
187+
"acp.session.s1.agent.fork",
188188
&ForkSessionResponse::new("forked-1"),
189189
);
190190
mock.fail_publish_count(4);

rsworkspace/crates/acp-nats/src/agent/load_session.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use super::Bridge;
22
use crate::error::map_nats_error;
3-
use crate::nats::{self, FlushClient, PublishClient, RequestClient, agent};
3+
use crate::nats::{self, FlushClient, PublishClient, RequestClient, session};
44
use crate::session_id::AcpSessionId;
55
use agent_client_protocol::{Error, ErrorCode, LoadSessionRequest, LoadSessionResponse, Result};
66
use tracing::{info, instrument};
@@ -29,7 +29,7 @@ pub async fn handle<N: RequestClient + PublishClient + FlushClient, C: GetElapse
2929
)
3030
})?;
3131
let nats = bridge.nats();
32-
let subject = agent::session_load(bridge.config.acp_prefix(), session_id.as_str());
32+
let subject = session::agent::load(bridge.config.acp_prefix(), session_id.as_str());
3333

3434
let result = nats::request_with_timeout::<N, LoadSessionRequest, LoadSessionResponse>(
3535
nats,
@@ -67,7 +67,7 @@ mod tests {
6767
async fn load_session_forwards_request_and_returns_response() {
6868
let (mock, bridge) = mock_bridge();
6969
let expected = LoadSessionResponse::new();
70-
set_json_response(&mock, "acp.s1.agent.session.load", &expected);
70+
set_json_response(&mock, "acp.session.s1.agent.load", &expected);
7171

7272
let request = LoadSessionRequest::new("s1", ".");
7373
let result = bridge.load_session(request).await;
@@ -90,7 +90,7 @@ mod tests {
9090
#[tokio::test]
9191
async fn load_session_returns_error_when_response_is_invalid_json() {
9292
let (mock, bridge) = mock_bridge();
93-
mock.set_response("acp.s1.agent.session.load", "not json".into());
93+
mock.set_response("acp.session.s1.agent.load", "not json".into());
9494

9595
let request = LoadSessionRequest::new("s1", ".");
9696
let err = bridge.load_session(request).await.unwrap_err();
@@ -104,7 +104,7 @@ mod tests {
104104
let (mock, bridge, exporter, provider) = mock_bridge_with_metrics();
105105
set_json_response(
106106
&mock,
107-
"acp.s1.agent.session.load",
107+
"acp.session.s1.agent.load",
108108
&LoadSessionResponse::new(),
109109
);
110110

@@ -153,7 +153,7 @@ mod tests {
153153
let (mock, bridge, exporter, provider) = mock_bridge_with_metrics();
154154
set_json_response(
155155
&mock,
156-
"acp.s1.agent.session.load",
156+
"acp.session.s1.agent.load",
157157
&LoadSessionResponse::new(),
158158
);
159159
mock.fail_publish_count(4);
@@ -181,7 +181,7 @@ mod tests {
181181
let (mock, bridge) = mock_bridge();
182182
set_json_response(
183183
&mock,
184-
"acp.s1.agent.session.load",
184+
"acp.session.s1.agent.load",
185185
&LoadSessionResponse::new(),
186186
);
187187

@@ -192,8 +192,8 @@ mod tests {
192192
tokio::time::sleep(Duration::from_millis(300)).await;
193193
let published = mock.published_messages();
194194
assert!(
195-
published.contains(&"acp.s1.agent.ext.session.ready".to_string()),
196-
"expected publish to acp.s1.agent.ext.session.ready, got: {:?}",
195+
published.contains(&"acp.session.s1.agent.ext.ready".to_string()),
196+
"expected publish to acp.session.s1.agent.ext.ready, got: {:?}",
197197
published
198198
);
199199
}

rsworkspace/crates/acp-nats/src/agent/new_session.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,8 +177,8 @@ mod tests {
177177
tokio::time::sleep(Duration::from_millis(300)).await;
178178
let published = mock.published_messages();
179179
assert!(
180-
published.contains(&"acp.test-session-1.agent.ext.session.ready".to_string()),
181-
"expected publish to acp.test-session-1.agent.ext.session.ready, got: {:?}",
180+
published.contains(&"acp.session.test-session-1.agent.ext.ready".to_string()),
181+
"expected publish to acp.session.test-session-1.agent.ext.ready, got: {:?}",
182182
published
183183
);
184184
}

0 commit comments

Comments
 (0)