Skip to content

Commit 8bf0ed2

Browse files
committed
feat(acp-nats): turn Bridge into protocol translator (ACP ↔ NATS)
Bridge no longer executes prompts — it publishes to a runner over NATS and streams PromptEvents back to the editor.
1 parent 7e0002a commit 8bf0ed2

23 files changed

Lines changed: 728 additions & 1976 deletions

rsworkspace/Cargo.lock

Lines changed: 259 additions & 42 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rsworkspace/crates/acp-nats-stdio/src/main.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
mod config;
22

3-
use acp_nats::{StdJsonSerialize, agent::Bridge, client, nats};
4-
use agent_client_protocol::AgentSideConnection;
3+
use acp_nats::{StdJsonSerialize, agent::Bridge, client, nats, spawn_notification_forwarder};
4+
use agent_client_protocol::{AgentSideConnection, SessionNotification};
55
use async_nats::Client as NatsAsyncClient;
66
use std::rc::Rc;
77
use tracing::{error, info};
@@ -54,11 +54,13 @@ async fn run_bridge(
5454
let stdout = async_compat::Compat::new(tokio::io::stdout());
5555

5656
let meter = acp_telemetry::meter("acp-io-bridge-nats");
57+
let (notification_tx, notification_rx) = tokio::sync::mpsc::channel::<SessionNotification>(64);
5758
let bridge = Rc::new(Bridge::new(
5859
nats_client.clone(),
5960
SystemClock,
6061
&meter,
6162
config.clone(),
63+
notification_tx,
6264
));
6365

6466
let (connection, io_task) = AgentSideConnection::new(bridge.clone(), stdout, stdin, |fut| {
@@ -67,6 +69,8 @@ async fn run_bridge(
6769

6870
let connection = Rc::new(connection);
6971

72+
spawn_notification_forwarder(connection.clone(), notification_rx);
73+
7074
let client_connection = connection.clone();
7175
let bridge_for_client = bridge.clone();
7276
let mut client_task = tokio::task::spawn_local(client::run(

rsworkspace/crates/acp-nats-ws/src/connection.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
use acp_nats::{StdJsonSerialize, agent::Bridge, client};
2-
use agent_client_protocol::AgentSideConnection;
1+
use acp_nats::{StdJsonSerialize, agent::Bridge, client, spawn_notification_forwarder};
2+
use agent_client_protocol::{AgentSideConnection, SessionNotification};
33
use axum::extract::ws::{Message, WebSocket};
44
use futures_util::stream::{SplitSink, SplitStream};
55
use futures_util::{SinkExt, StreamExt};
@@ -34,11 +34,13 @@ pub async fn handle<N>(
3434
let outgoing = async_compat::Compat::new(agent_write);
3535

3636
let meter = acp_telemetry::meter("acp-nats-ws");
37+
let (notification_tx, notification_rx) = tokio::sync::mpsc::channel::<SessionNotification>(64);
3738
let bridge = Rc::new(Bridge::new(
3839
nats_client.clone(),
3940
SystemClock,
4041
&meter,
4142
config,
43+
notification_tx,
4244
));
4345

4446
let (connection, io_task) =
@@ -48,6 +50,8 @@ pub async fn handle<N>(
4850

4951
let connection = Rc::new(connection);
5052

53+
spawn_notification_forwarder(connection.clone(), notification_rx);
54+
5155
let recv_pump = tokio::task::spawn_local(run_recv_pump(ws_receiver, ws_recv_write));
5256
let send_pump = tokio::task::spawn_local(run_send_pump(ws_sender, ws_send_read));
5357

rsworkspace/crates/acp-nats/Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,17 @@ edition = "2024"
77
workspace = true
88

99
[dependencies]
10-
agent-client-protocol = "0.9.4"
10+
agent-client-protocol = { version = "0.9.4", features = ["unstable_session_list", "unstable_session_model", "unstable_session_fork", "unstable_session_resume", "unstable_session_usage"] }
11+
opentelemetry = "0.31.0"
1112
async-nats = "0.45.0"
1213
async-trait = "0.1.89"
1314
bytes = "1.9"
1415
futures = "0.3"
15-
opentelemetry = "0.31.0"
1616
serde = { version = "1.0.228", features = ["derive"] }
1717
serde_json = "1.0.149"
1818
tokio = { version = "1.49.0", features = ["rt", "macros", "sync", "time"] }
1919
tracing = "0.1.44"
20+
uuid = { version = "1", features = ["v4"] }
2021

2122
trogon-nats = { path = "../trogon-nats" }
2223
trogon-std = { path = "../trogon-std" }

rsworkspace/crates/acp-nats/src/agent/authenticate.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use super::Bridge;
22
use crate::error::AGENT_UNAVAILABLE;
3-
use crate::nats::{self, FlushClient, PublishClient, RequestClient, agent};
3+
use crate::nats::{self, RequestClient, agent};
44
use agent_client_protocol::{AuthenticateRequest, AuthenticateResponse, Error, ErrorCode, Result};
55
use tracing::{info, instrument, warn};
66
use trogon_nats::NatsError;
@@ -51,7 +51,7 @@ fn map_authenticate_error(e: NatsError) -> Error {
5151
skip(bridge, args),
5252
fields(method_id = %args.method_id)
5353
)]
54-
pub async fn handle<N: RequestClient + PublishClient + FlushClient, C: GetElapsed>(
54+
pub async fn handle<N: RequestClient, C: GetElapsed>(
5555
bridge: &Bridge<N, C>,
5656
args: AuthenticateRequest,
5757
) -> Result<AuthenticateResponse> {
@@ -150,6 +150,7 @@ mod tests {
150150
trogon_std::time::SystemClock,
151151
&meter,
152152
Config::for_test("acp"),
153+
tokio::sync::mpsc::channel(1).0,
153154
);
154155
(mock, bridge, exporter, provider)
155156
}
@@ -164,6 +165,7 @@ mod tests {
164165
trogon_std::time::SystemClock,
165166
&opentelemetry::global::meter("acp-nats-test"),
166167
Config::for_test("acp"),
168+
tokio::sync::mpsc::channel(1).0,
167169
);
168170
(mock, bridge)
169171
}

rsworkspace/crates/acp-nats/src/agent/cancel.rs

Lines changed: 34 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
use super::Bridge;
2-
use crate::nats::{self, FlushClient, PublishClient, RequestClient, agent};
2+
use crate::nats::{self, FlushClient, PublishClient, agent};
33
use crate::session_id::AcpSessionId;
4-
use agent_client_protocol::{
5-
CancelNotification, Error, ErrorCode, PromptResponse, Result, StopReason,
6-
};
4+
use agent_client_protocol::{CancelNotification, Error, ErrorCode, Result};
75
use tracing::{info, instrument, warn};
86
use trogon_std::time::GetElapsed;
97

@@ -17,7 +15,7 @@ use trogon_std::time::GetElapsed;
1715
skip(bridge, args),
1816
fields(session_id = %args.session_id)
1917
)]
20-
pub async fn handle<N: RequestClient + PublishClient + FlushClient, C: GetElapsed>(
18+
pub async fn handle<N: PublishClient + FlushClient, C: GetElapsed>(
2119
bridge: &Bridge<N, C>,
2220
args: CancelNotification,
2321
) -> Result<()> {
@@ -59,15 +57,19 @@ pub async fn handle<N: RequestClient + PublishClient + FlushClient, C: GetElapse
5957
.record_error("cancel", "cancel_publish_failed");
6058
}
6159

62-
bridge
63-
.cancelled_sessions
64-
.mark_cancelled(args.session_id.clone(), &bridge.clock);
65-
bridge
66-
.pending_session_prompt_responses
67-
.cancel_waiter_for_session(
68-
&args.session_id,
69-
Ok(PromptResponse::new(StopReason::Cancelled)),
70-
);
60+
let cancelled_subject =
61+
agent::session_cancelled(bridge.config.acp_prefix(), &args.session_id.to_string());
62+
if let Err(e) = bridge
63+
.nats()
64+
.publish_with_headers(
65+
cancelled_subject,
66+
async_nats::HeaderMap::new(),
67+
bytes::Bytes::new(),
68+
)
69+
.await
70+
{
71+
warn!(session_id = %args.session_id, error = %e, "Failed to publish session_cancelled broadcast");
72+
}
7173

7274
bridge.metrics.record_request(
7375
"cancel",
@@ -82,7 +84,7 @@ pub async fn handle<N: RequestClient + PublishClient + FlushClient, C: GetElapse
8284
mod tests {
8385
use super::Bridge;
8486
use crate::config::Config;
85-
use agent_client_protocol::{Agent, CancelNotification, ErrorCode, SessionId, StopReason};
87+
use agent_client_protocol::{Agent, CancelNotification, ErrorCode};
8688
use opentelemetry::Value;
8789
use opentelemetry::metrics::MeterProvider;
8890
use opentelemetry_sdk::metrics::data::{AggregatedMetrics, MetricData};
@@ -103,6 +105,7 @@ mod tests {
103105
trogon_std::time::SystemClock,
104106
&opentelemetry::global::meter("acp-nats-test"),
105107
Config::for_test("acp"),
108+
tokio::sync::mpsc::channel(1).0,
106109
);
107110
(mock, bridge)
108111
}
@@ -126,6 +129,7 @@ mod tests {
126129
trogon_std::time::SystemClock,
127130
&meter,
128131
Config::for_test("acp"),
132+
tokio::sync::mpsc::channel(1).0,
129133
);
130134
(mock, bridge, exporter, provider)
131135
}
@@ -142,6 +146,7 @@ mod tests {
142146
clock.clone(),
143147
&opentelemetry::global::meter("acp-nats-test"),
144148
Config::for_test("acp"),
149+
tokio::sync::mpsc::channel(1).0,
145150
);
146151
(mock, clock, bridge)
147152
}
@@ -228,6 +233,20 @@ mod tests {
228233
);
229234
}
230235

236+
#[tokio::test]
237+
async fn cancel_also_publishes_session_cancelled_broadcast() {
238+
let (mock, bridge) = mock_bridge();
239+
240+
let _ = bridge.cancel(CancelNotification::new("s1")).await;
241+
242+
let published = mock.published_messages();
243+
assert!(
244+
published.contains(&"acp.s1.agent.session.cancelled".to_string()),
245+
"expected publish to acp.s1.agent.session.cancelled (prompt broadcast), got: {:?}",
246+
published
247+
);
248+
}
249+
231250
#[tokio::test]
232251
async fn cancel_validates_session_id() {
233252
let (_mock, bridge) = mock_bridge();
@@ -337,49 +356,6 @@ mod tests {
337356
provider.shutdown().unwrap();
338357
}
339358

340-
#[tokio::test]
341-
async fn cancel_marks_session_as_cancelled() {
342-
let (_mock, _clock, bridge) = mock_bridge_with_clock();
343-
let session_id = "cancel-session-001";
344-
345-
assert!(
346-
bridge
347-
.cancelled_sessions
348-
.take_if_cancelled(&session_id.into(), &bridge.clock)
349-
.is_none()
350-
);
351-
352-
let notification = CancelNotification::new(session_id);
353-
bridge.cancel(notification).await.unwrap();
354-
355-
assert!(
356-
bridge
357-
.cancelled_sessions
358-
.take_if_cancelled(&session_id.into(), &bridge.clock)
359-
.is_some()
360-
);
361-
}
362-
363-
#[tokio::test]
364-
async fn cancel_resolves_pending_prompt_waiter_with_cancelled() {
365-
let (_mock, _clock, bridge) = mock_bridge_with_clock();
366-
let session_id: SessionId = "cancel-session-002".into();
367-
368-
let (rx, _guard, _token) = bridge
369-
.pending_session_prompt_responses
370-
.register_waiter(session_id.clone())
371-
.unwrap();
372-
373-
let notification = CancelNotification::new(session_id.clone());
374-
bridge.cancel(notification).await.unwrap();
375-
376-
let response = rx
377-
.await
378-
.expect("Should receive cancelled response")
379-
.expect("Prompt waiter should receive success response");
380-
assert_eq!(response.stop_reason, StopReason::Cancelled);
381-
}
382-
383359
#[tokio::test]
384360
async fn cancel_publishes_to_nats() {
385361
let (mock, _clock, bridge) = mock_bridge_with_clock();
@@ -395,43 +371,4 @@ mod tests {
395371
published
396372
);
397373
}
398-
399-
#[tokio::test]
400-
async fn cancel_session_evicts_expired_on_mark() {
401-
let (_mock, clock, bridge) = mock_bridge_with_clock();
402-
403-
let session_old: SessionId = "old-session".into();
404-
let session_new: SessionId = "new-session".into();
405-
406-
bridge
407-
.cancelled_sessions
408-
.mark_cancelled(session_old.clone(), &bridge.clock);
409-
410-
clock.advance(Duration::from_secs(301));
411-
412-
for idx in 0..15 {
413-
let filler_session: SessionId = format!("filler-{idx}").into();
414-
bridge
415-
.cancelled_sessions
416-
.mark_cancelled(filler_session, &bridge.clock);
417-
}
418-
419-
bridge
420-
.cancelled_sessions
421-
.mark_cancelled(session_new.clone(), &bridge.clock);
422-
423-
assert!(
424-
bridge
425-
.cancelled_sessions
426-
.take_if_cancelled(&session_old, &bridge.clock)
427-
.is_none()
428-
);
429-
430-
assert!(
431-
bridge
432-
.cancelled_sessions
433-
.take_if_cancelled(&session_new, &bridge.clock)
434-
.is_some()
435-
);
436-
}
437374
}

rsworkspace/crates/acp-nats/src/agent/ext_method.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use super::Bridge;
22
use crate::error::AGENT_UNAVAILABLE;
33
use crate::ext_method_name::ExtMethodName;
4-
use crate::nats::{self, FlushClient, PublishClient, RequestClient, agent};
4+
use crate::nats::{self, RequestClient, agent};
55
use agent_client_protocol::{Error, ErrorCode, ExtRequest, ExtResponse, Result};
66
use tracing::{info, instrument, warn};
77
use trogon_nats::NatsError;
@@ -52,7 +52,7 @@ fn map_ext_method_error(e: NatsError) -> Error {
5252
skip(bridge, args),
5353
fields(method = %args.method)
5454
)]
55-
pub async fn handle<N: RequestClient + PublishClient + FlushClient, C: GetElapsed>(
55+
pub async fn handle<N: RequestClient, C: GetElapsed>(
5656
bridge: &Bridge<N, C>,
5757
args: ExtRequest,
5858
) -> Result<ExtResponse> {
@@ -121,6 +121,7 @@ mod tests {
121121
trogon_std::time::SystemClock,
122122
&opentelemetry::global::meter("acp-nats-test"),
123123
Config::for_test("acp"),
124+
tokio::sync::mpsc::channel(1).0,
124125
);
125126
(mock, bridge)
126127
}
@@ -144,6 +145,7 @@ mod tests {
144145
trogon_std::time::SystemClock,
145146
&meter,
146147
Config::for_test("acp"),
148+
tokio::sync::mpsc::channel(1).0,
147149
);
148150
(mock, bridge, exporter, provider)
149151
}

rsworkspace/crates/acp-nats/src/agent/ext_notification.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use super::Bridge;
22
use crate::ext_method_name::ExtMethodName;
3-
use crate::nats::{self, FlushClient, PublishClient, RequestClient, agent};
3+
use crate::nats::{self, FlushClient, PublishClient, agent};
44
use agent_client_protocol::{Error, ErrorCode, ExtNotification, Result};
55
use tracing::{info, instrument, warn};
66
use trogon_std::time::GetElapsed;
@@ -15,7 +15,7 @@ use trogon_std::time::GetElapsed;
1515
skip(bridge, args),
1616
fields(method = %args.method)
1717
)]
18-
pub async fn handle<N: RequestClient + PublishClient + FlushClient, C: GetElapsed>(
18+
pub async fn handle<N: PublishClient + FlushClient, C: GetElapsed>(
1919
bridge: &Bridge<N, C>,
2020
args: ExtNotification,
2121
) -> Result<()> {
@@ -95,6 +95,7 @@ mod tests {
9595
trogon_std::time::SystemClock,
9696
&opentelemetry::global::meter("acp-nats-test"),
9797
Config::for_test("acp"),
98+
tokio::sync::mpsc::channel(1).0,
9899
);
99100
(mock, bridge)
100101
}
@@ -118,6 +119,7 @@ mod tests {
118119
trogon_std::time::SystemClock,
119120
&meter,
120121
Config::for_test("acp"),
122+
tokio::sync::mpsc::channel(1).0,
121123
);
122124
(mock, bridge, exporter, provider)
123125
}

0 commit comments

Comments
 (0)