Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 20 additions & 23 deletions rsworkspace/crates/acp-nats-agent/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,8 @@ where
N: SubscribeClient + PublishClient + FlushClient + Clone + 'static,
A: Agent + 'static,
{
// TODO: These two wildcards overlap when session_id == "agent", causing duplicate
// dispatch. A single {prefix}.> avoids duplicates but consumes client messages.
// Revisit the subject topology to eliminate both problems.
let global_wildcard = acp_nats::nats::agent::wildcards::all(prefix);
let session_wildcard = acp_nats::nats::agent::wildcards::all_sessions(prefix);
let session_wildcard = acp_nats::nats::session::wildcards::all_agent(prefix);

info!(
global = %global_wildcard,
Expand Down Expand Up @@ -461,8 +458,8 @@ mod tests {
#[tokio::test]
async fn dispatch_cancel_is_notification_no_reply_published() {
let (nats, agent) = dispatch(
"acp.s1.agent.session.cancel",
&CancelNotification::new("sess-1"),
"acp.session.s1.agent.cancel",
&CancelNotification::new("s1"),
None,
Comment thread
yordis marked this conversation as resolved.
)
.await;
Expand Down Expand Up @@ -496,8 +493,8 @@ mod tests {
#[tokio::test]
async fn dispatch_prompt_returns_stop_reason() {
let (nats, _) = dispatch(
"acp.s1.agent.session.prompt",
&PromptRequest::new("sess-1", vec![]),
"acp.session.s1.agent.prompt",
&PromptRequest::new("s1", vec![]),
Some("_INBOX.3"),
)
.await;
Expand Down Expand Up @@ -599,8 +596,8 @@ mod tests {
#[tokio::test]
async fn dispatch_session_load_publishes_response() {
assert_dispatch_publishes(
"acp.s1.agent.session.load",
&LoadSessionRequest::new("sess-1", "/tmp"),
"acp.session.s1.agent.load",
&LoadSessionRequest::new("s1", "/tmp"),
)
.await;
}
Expand All @@ -613,53 +610,53 @@ mod tests {
#[tokio::test]
async fn dispatch_set_session_mode_publishes_response() {
assert_dispatch_publishes(
"acp.s1.agent.session.set_mode",
&SetSessionModeRequest::new("sess-1", "code"),
"acp.session.s1.agent.set_mode",
&SetSessionModeRequest::new("s1", "code"),
)
.await;
}

#[tokio::test]
async fn dispatch_set_session_config_option_publishes_response() {
assert_dispatch_publishes(
"acp.s1.agent.session.set_config_option",
&SetSessionConfigOptionRequest::new("sess-1", "key", "val"),
"acp.session.s1.agent.set_config_option",
&SetSessionConfigOptionRequest::new("s1", "key", "val"),
)
.await;
}

#[tokio::test]
async fn dispatch_set_session_model_publishes_response() {
assert_dispatch_publishes(
"acp.s1.agent.session.set_model",
&SetSessionModelRequest::new("sess-1", "gpt-4"),
"acp.session.s1.agent.set_model",
&SetSessionModelRequest::new("s1", "gpt-4"),
)
.await;
}

#[tokio::test]
async fn dispatch_fork_session_publishes_response() {
assert_dispatch_publishes(
"acp.s1.agent.session.fork",
&ForkSessionRequest::new("sess-1", "/tmp"),
"acp.session.s1.agent.fork",
&ForkSessionRequest::new("s1", "/tmp"),
)
.await;
}

#[tokio::test]
async fn dispatch_resume_session_publishes_response() {
assert_dispatch_publishes(
"acp.s1.agent.session.resume",
&ResumeSessionRequest::new("sess-1", "/tmp"),
"acp.session.s1.agent.resume",
&ResumeSessionRequest::new("s1", "/tmp"),
)
.await;
}

#[tokio::test]
async fn dispatch_close_session_publishes_response() {
assert_dispatch_publishes(
"acp.s1.agent.session.close",
&CloseSessionRequest::new("sess-1"),
"acp.session.s1.agent.close",
&CloseSessionRequest::new("s1"),
)
.await;
}
Expand Down Expand Up @@ -832,7 +829,7 @@ mod tests {

let subjects = nats.subscribed_to();
assert!(subjects.contains(&"myprefix.agent.>".to_string()));
assert!(subjects.contains(&"myprefix.*.agent.>".to_string()));
assert!(subjects.contains(&"myprefix.session.*.agent.>".to_string()));
})
.await;
}
Expand Down
4 changes: 2 additions & 2 deletions rsworkspace/crates/acp-nats/src/agent/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::cell::RefCell;
use crate::config::Config;
use crate::nats::{
self, ExtSessionReady, FlushClient, FlushPolicy, PublishClient, PublishOptions, RequestClient,
RetryPolicy, SubscribeClient, agent,
RetryPolicy, SubscribeClient, session,
};
use crate::pending_prompt_waiters::PendingSessionPromptResponseWaiters;
use crate::telemetry::metrics::Metrics;
Expand Down Expand Up @@ -96,7 +96,7 @@ async fn publish_session_ready<N: PublishClient + FlushClient>(
) {
tokio::time::sleep(SESSION_READY_DELAY).await;

let subject = agent::ext_session_ready(prefix, &session_id.to_string());
let subject = session::agent::ext_ready(prefix, &session_id.to_string());
info!(session_id = %session_id, subject = %subject, "Publishing session.ready");

let message = ExtSessionReady::new(session_id.clone());
Expand Down
14 changes: 7 additions & 7 deletions rsworkspace/crates/acp-nats/src/agent/cancel.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::Bridge;
use crate::nats::{self, FlushClient, PublishClient, agent};
use crate::nats::{self, FlushClient, PublishClient, session};
use crate::session_id::AcpSessionId;
use agent_client_protocol::{CancelNotification, Error, ErrorCode, Result};
use tracing::{info, instrument, warn};
Expand Down Expand Up @@ -34,7 +34,7 @@ pub async fn handle<N: PublishClient + FlushClient, C: GetElapsed>(
)
})?;

let subject = agent::session_cancel(bridge.config.acp_prefix(), &args.session_id.to_string());
let subject = session::agent::cancel(bridge.config.acp_prefix(), &args.session_id.to_string());

let publish_result = nats::publish(
bridge.nats(),
Expand All @@ -58,7 +58,7 @@ pub async fn handle<N: PublishClient + FlushClient, C: GetElapsed>(
}

let cancelled_subject =
agent::session_cancelled(bridge.config.acp_prefix(), &args.session_id.to_string());
session::agent::cancelled(bridge.config.acp_prefix(), &args.session_id.to_string());
if let Err(e) = bridge
.nats()
.publish_with_headers(
Expand Down Expand Up @@ -116,8 +116,8 @@ mod tests {

let published = mock.published_messages();
assert!(
published.contains(&"acp.s1.agent.session.cancel".to_string()),
"expected publish to acp.s1.agent.session.cancel, got: {:?}",
published.contains(&"acp.session.s1.agent.cancel".to_string()),
"expected publish to acp.session.s1.agent.cancel, got: {:?}",
published
);
}
Expand All @@ -130,8 +130,8 @@ mod tests {

let published = mock.published_messages();
assert!(
published.contains(&"acp.s1.agent.session.cancelled".to_string()),
"expected publish to acp.s1.agent.session.cancelled (prompt broadcast), got: {:?}",
published.contains(&"acp.session.s1.agent.cancelled".to_string()),
"expected publish to acp.session.s1.agent.cancelled (prompt broadcast), got: {:?}",
published
);
}
Expand Down
10 changes: 5 additions & 5 deletions rsworkspace/crates/acp-nats/src/agent/close_session.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::Bridge;
use crate::error::map_nats_error;
use crate::nats::{self, RequestClient, agent};
use crate::nats::{self, RequestClient, session};
use crate::session_id::AcpSessionId;
use agent_client_protocol::{CloseSessionRequest, CloseSessionResponse, Error, ErrorCode, Result};
use tracing::{info, instrument};
Expand Down Expand Up @@ -29,7 +29,7 @@ pub async fn handle<N: RequestClient, C: GetElapsed>(
)
})?;
let nats = bridge.nats();
let subject = agent::session_close(bridge.config.acp_prefix(), session_id.as_str());
let subject = session::agent::close(bridge.config.acp_prefix(), session_id.as_str());

let result = nats::request_with_timeout::<N, CloseSessionRequest, CloseSessionResponse>(
nats,
Expand Down Expand Up @@ -61,7 +61,7 @@ mod tests {
async fn close_session_forwards_request_and_returns_response() {
let (mock, bridge) = mock_bridge();
let expected = CloseSessionResponse::new();
set_json_response(&mock, "acp.s1.agent.session.close", &expected);
set_json_response(&mock, "acp.session.s1.agent.close", &expected);

let request = CloseSessionRequest::new("s1");
let result = bridge.close_session(request).await;
Expand All @@ -82,7 +82,7 @@ mod tests {
#[tokio::test]
async fn close_session_returns_error_when_response_is_invalid_json() {
let (mock, bridge) = mock_bridge();
mock.set_response("acp.s1.agent.session.close", "not json".into());
mock.set_response("acp.session.s1.agent.close", "not json".into());

let request = CloseSessionRequest::new("s1");
let err = bridge.close_session(request).await.unwrap_err();
Expand All @@ -105,7 +105,7 @@ mod tests {
let (mock, bridge, exporter, provider) = mock_bridge_with_metrics();
set_json_response(
&mock,
"acp.s1.agent.session.close",
"acp.session.s1.agent.close",
&CloseSessionResponse::new(),
);

Expand Down
18 changes: 9 additions & 9 deletions rsworkspace/crates/acp-nats/src/agent/fork_session.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::Bridge;
use crate::error::map_nats_error;
use crate::nats::{self, FlushClient, PublishClient, RequestClient, agent};
use crate::nats::{self, FlushClient, PublishClient, RequestClient, session};
use crate::session_id::AcpSessionId;
use agent_client_protocol::{Error, ErrorCode, ForkSessionRequest, ForkSessionResponse, Result};
use tracing::{Span, info, instrument};
Expand Down Expand Up @@ -29,7 +29,7 @@ pub async fn handle<N: RequestClient + PublishClient + FlushClient, C: GetElapse
)
})?;
let nats = bridge.nats();
let subject = agent::session_fork(bridge.config.acp_prefix(), session_id.as_str());
let subject = session::agent::fork(bridge.config.acp_prefix(), session_id.as_str());

let result = nats::request_with_timeout::<N, ForkSessionRequest, ForkSessionResponse>(
nats,
Expand Down Expand Up @@ -73,7 +73,7 @@ mod tests {
let (mock, bridge) = mock_bridge();
let new_session_id = SessionId::from("forked-session-1");
let expected = ForkSessionResponse::new(new_session_id.clone());
set_json_response(&mock, "acp.s1.agent.session.fork", &expected);
set_json_response(&mock, "acp.session.s1.agent.fork", &expected);

let request = ForkSessionRequest::new("s1", ".");
let result = bridge.fork_session(request).await;
Expand All @@ -97,7 +97,7 @@ mod tests {
#[tokio::test]
async fn fork_session_returns_error_when_response_is_invalid_json() {
let (mock, bridge) = mock_bridge();
mock.set_response("acp.s1.agent.session.fork", "not json".into());
mock.set_response("acp.session.s1.agent.fork", "not json".into());

let request = ForkSessionRequest::new("s1", ".");
let err = bridge.fork_session(request).await.unwrap_err();
Expand All @@ -121,7 +121,7 @@ mod tests {
let new_session_id = SessionId::from("forked-session-1");
set_json_response(
&mock,
"acp.s1.agent.session.fork",
"acp.session.s1.agent.fork",
&ForkSessionResponse::new(new_session_id),
);

Expand All @@ -132,8 +132,8 @@ mod tests {
tokio::time::sleep(Duration::from_millis(300)).await;
let published = mock.published_messages();
assert!(
published.contains(&"acp.forked-session-1.agent.ext.session.ready".to_string()),
"expected publish to acp.forked-session-1.agent.ext.session.ready, got: {:?}",
published.contains(&"acp.session.forked-session-1.agent.ext.ready".to_string()),
"expected publish to acp.session.forked-session-1.agent.ext.ready, got: {:?}",
published
);
}
Expand All @@ -143,7 +143,7 @@ mod tests {
let (mock, bridge, exporter, provider) = mock_bridge_with_metrics();
set_json_response(
&mock,
"acp.s1.agent.session.fork",
"acp.session.s1.agent.fork",
&ForkSessionResponse::new("forked-1"),
);

Expand Down Expand Up @@ -184,7 +184,7 @@ mod tests {
let (mock, bridge, exporter, provider) = mock_bridge_with_metrics();
set_json_response(
&mock,
"acp.s1.agent.session.fork",
"acp.session.s1.agent.fork",
&ForkSessionResponse::new("forked-1"),
);
mock.fail_publish_count(4);
Expand Down
18 changes: 9 additions & 9 deletions rsworkspace/crates/acp-nats/src/agent/load_session.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::Bridge;
use crate::error::map_nats_error;
use crate::nats::{self, FlushClient, PublishClient, RequestClient, agent};
use crate::nats::{self, FlushClient, PublishClient, RequestClient, session};
use crate::session_id::AcpSessionId;
use agent_client_protocol::{Error, ErrorCode, LoadSessionRequest, LoadSessionResponse, Result};
use tracing::{info, instrument};
Expand Down Expand Up @@ -29,7 +29,7 @@ pub async fn handle<N: RequestClient + PublishClient + FlushClient, C: GetElapse
)
})?;
let nats = bridge.nats();
let subject = agent::session_load(bridge.config.acp_prefix(), session_id.as_str());
let subject = session::agent::load(bridge.config.acp_prefix(), session_id.as_str());

let result = nats::request_with_timeout::<N, LoadSessionRequest, LoadSessionResponse>(
nats,
Expand Down Expand Up @@ -67,7 +67,7 @@ mod tests {
async fn load_session_forwards_request_and_returns_response() {
let (mock, bridge) = mock_bridge();
let expected = LoadSessionResponse::new();
set_json_response(&mock, "acp.s1.agent.session.load", &expected);
set_json_response(&mock, "acp.session.s1.agent.load", &expected);

let request = LoadSessionRequest::new("s1", ".");
let result = bridge.load_session(request).await;
Expand All @@ -90,7 +90,7 @@ mod tests {
#[tokio::test]
async fn load_session_returns_error_when_response_is_invalid_json() {
let (mock, bridge) = mock_bridge();
mock.set_response("acp.s1.agent.session.load", "not json".into());
mock.set_response("acp.session.s1.agent.load", "not json".into());

let request = LoadSessionRequest::new("s1", ".");
let err = bridge.load_session(request).await.unwrap_err();
Expand All @@ -104,7 +104,7 @@ mod tests {
let (mock, bridge, exporter, provider) = mock_bridge_with_metrics();
set_json_response(
&mock,
"acp.s1.agent.session.load",
"acp.session.s1.agent.load",
&LoadSessionResponse::new(),
);

Expand Down Expand Up @@ -153,7 +153,7 @@ mod tests {
let (mock, bridge, exporter, provider) = mock_bridge_with_metrics();
set_json_response(
&mock,
"acp.s1.agent.session.load",
"acp.session.s1.agent.load",
&LoadSessionResponse::new(),
);
mock.fail_publish_count(4);
Expand Down Expand Up @@ -181,7 +181,7 @@ mod tests {
let (mock, bridge) = mock_bridge();
set_json_response(
&mock,
"acp.s1.agent.session.load",
"acp.session.s1.agent.load",
&LoadSessionResponse::new(),
);

Expand All @@ -192,8 +192,8 @@ mod tests {
tokio::time::sleep(Duration::from_millis(300)).await;
let published = mock.published_messages();
assert!(
published.contains(&"acp.s1.agent.ext.session.ready".to_string()),
"expected publish to acp.s1.agent.ext.session.ready, got: {:?}",
published.contains(&"acp.session.s1.agent.ext.ready".to_string()),
"expected publish to acp.session.s1.agent.ext.ready, got: {:?}",
published
);
}
Expand Down
4 changes: 2 additions & 2 deletions rsworkspace/crates/acp-nats/src/agent/new_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ mod tests {
tokio::time::sleep(Duration::from_millis(300)).await;
let published = mock.published_messages();
assert!(
published.contains(&"acp.test-session-1.agent.ext.session.ready".to_string()),
"expected publish to acp.test-session-1.agent.ext.session.ready, got: {:?}",
published.contains(&"acp.session.test-session-1.agent.ext.ready".to_string()),
"expected publish to acp.session.test-session-1.agent.ext.ready, got: {:?}",
published
);
}
Expand Down
Loading
Loading