11mod config;
22
3- use acp_nats:: { StdJsonSerialize , agent:: Bridge , client, nats , spawn_notification_forwarder} ;
3+ use acp_nats:: { StdJsonSerialize , agent:: Bridge , client, spawn_notification_forwarder} ;
44use agent_client_protocol:: { AgentSideConnection , SessionNotification } ;
5- use async_nats:: Client as NatsAsyncClient ;
65use std:: rc:: Rc ;
76use tracing:: { error, info} ;
8- use trogon_std:: env:: SystemEnv ;
9- use trogon_std:: fs:: SystemFs ;
107use trogon_std:: time:: SystemClock ;
118
12- use acp_telemetry:: ServiceName ;
9+ #[ cfg( not( coverage) ) ]
10+ use {
11+ acp_nats:: nats, acp_telemetry:: ServiceName , trogon_std:: env:: SystemEnv ,
12+ trogon_std:: fs:: SystemFs ,
13+ } ;
1314
15+ #[ cfg( not( coverage) ) ]
1416#[ tokio:: main]
1517async fn main ( ) -> Result < ( ) , Box < dyn std:: error:: Error > > {
16- let config = config:: base_config ( & SystemEnv ) ?;
18+ let config = config:: base_config ( & trogon_std :: CliArgs :: < config :: Args > :: new ( ) , & SystemEnv ) ?;
1719 acp_telemetry:: init_logger (
1820 ServiceName :: AcpNatsStdio ,
1921 config. acp_prefix ( ) ,
@@ -27,9 +29,19 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
2729 let nats_connect_timeout = acp_nats:: nats_connect_timeout ( & SystemEnv ) ;
2830 let nats_client = nats:: connect ( config. nats ( ) , nats_connect_timeout) . await ?;
2931
30- let local = tokio:: task:: LocalSet :: new ( ) ;
32+ let stdin = async_compat:: Compat :: new ( tokio:: io:: stdin ( ) ) ;
33+ let stdout = async_compat:: Compat :: new ( tokio:: io:: stdout ( ) ) ;
3134
32- let result = local. run_until ( run_bridge ( nats_client, & config) ) . await ;
35+ let local = tokio:: task:: LocalSet :: new ( ) ;
36+ let result = local
37+ . run_until ( run_bridge (
38+ nats_client,
39+ & config,
40+ stdout,
41+ stdin,
42+ acp_telemetry:: signal:: shutdown_signal ( ) ,
43+ ) )
44+ . await ;
3345
3446 if let Err ( ref e) = result {
3547 error ! ( error = %e, "ACP bridge stopped with error" ) ;
@@ -42,17 +54,25 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
4254 result
4355}
4456
45- // `Rc` is intentional throughout this function: the ACP `Agent` trait is
46- // `?Send`, so the entire bridge runs on a `LocalSet` with `spawn_local`.
47- // Do not replace with `Arc` or move tasks to `tokio::spawn` — that would
48- // violate the `!Send` constraint.
49- async fn run_bridge (
50- nats_client : NatsAsyncClient ,
51- config : & acp_nats:: Config ,
52- ) -> Result < ( ) , Box < dyn std:: error:: Error > > {
53- let stdin = async_compat:: Compat :: new ( tokio:: io:: stdin ( ) ) ;
54- let stdout = async_compat:: Compat :: new ( tokio:: io:: stdout ( ) ) ;
57+ #[ cfg( coverage) ]
58+ fn main ( ) { }
5559
60+ async fn run_bridge < N , W , R > (
61+ nats_client : N ,
62+ config : & acp_nats:: Config ,
63+ stdout : W ,
64+ stdin : R ,
65+ shutdown_signal : impl std:: future:: Future < Output = ( ) > ,
66+ ) -> Result < ( ) , Box < dyn std:: error:: Error > >
67+ where
68+ N : acp_nats:: RequestClient
69+ + acp_nats:: PublishClient
70+ + acp_nats:: FlushClient
71+ + acp_nats:: SubscribeClient
72+ + ' static ,
73+ W : futures:: AsyncWrite + Unpin + ' static ,
74+ R : futures:: AsyncRead + Unpin + ' static ,
75+ {
5676 let meter = acp_telemetry:: meter ( "acp-io-bridge-nats" ) ;
5777 let ( notification_tx, notification_rx) = tokio:: sync:: mpsc:: channel :: < SessionNotification > ( 64 ) ;
5878 let bridge = Rc :: new ( Bridge :: new (
@@ -106,7 +126,7 @@ async fn run_bridge(
106126 }
107127 }
108128 }
109- _ = acp_telemetry :: signal :: shutdown_signal( ) => {
129+ _ = shutdown_signal => {
110130 info!( "ACP bridge shutting down (signal received)" ) ;
111131 Ok ( ( ) )
112132 }
@@ -119,3 +139,72 @@ async fn run_bridge(
119139
120140 shutdown_result
121141}
142+
143+ #[ cfg( test) ]
144+ mod tests {
145+ use super :: * ;
146+ use trogon_nats:: AdvancedMockNatsClient ;
147+
148+ #[ tokio:: test]
149+ async fn run_bridge_shuts_down_on_signal ( ) {
150+ let mock = AdvancedMockNatsClient :: new ( ) ;
151+ let _sub = mock. inject_messages ( ) ;
152+ let config = acp_nats:: Config :: new (
153+ acp_nats:: AcpPrefix :: new ( "acp" ) . unwrap ( ) ,
154+ acp_nats:: NatsConfig {
155+ servers : vec ! [ "localhost:4222" . to_string( ) ] ,
156+ auth : trogon_nats:: NatsAuth :: None ,
157+ } ,
158+ ) ;
159+
160+ let ( reader, _writer) = tokio:: io:: duplex ( 1024 ) ;
161+ let ( _reader2, writer2) = tokio:: io:: duplex ( 1024 ) ;
162+ let stdin = async_compat:: Compat :: new ( reader) ;
163+ let stdout = async_compat:: Compat :: new ( writer2) ;
164+
165+ let local = tokio:: task:: LocalSet :: new ( ) ;
166+ let result = local
167+ . run_until ( run_bridge (
168+ mock,
169+ & config,
170+ stdout,
171+ stdin,
172+ std:: future:: ready ( ( ) ) ,
173+ ) )
174+ . await ;
175+
176+ assert ! ( result. is_ok( ) ) ;
177+ }
178+
179+ #[ tokio:: test]
180+ async fn run_bridge_exits_on_io_close ( ) {
181+ let mock = AdvancedMockNatsClient :: new ( ) ;
182+ let _sub = mock. inject_messages ( ) ;
183+ let config = acp_nats:: Config :: new (
184+ acp_nats:: AcpPrefix :: new ( "acp" ) . unwrap ( ) ,
185+ acp_nats:: NatsConfig {
186+ servers : vec ! [ "localhost:4222" . to_string( ) ] ,
187+ auth : trogon_nats:: NatsAuth :: None ,
188+ } ,
189+ ) ;
190+
191+ let ( reader, writer) = tokio:: io:: duplex ( 1024 ) ;
192+ let ( _reader2, writer2) = tokio:: io:: duplex ( 1024 ) ;
193+ drop ( writer) ;
194+ let stdin = async_compat:: Compat :: new ( reader) ;
195+ let stdout = async_compat:: Compat :: new ( writer2) ;
196+
197+ let local = tokio:: task:: LocalSet :: new ( ) ;
198+ let result = local
199+ . run_until ( run_bridge (
200+ mock,
201+ & config,
202+ stdout,
203+ stdin,
204+ std:: future:: pending ( ) ,
205+ ) )
206+ . await ;
207+
208+ assert ! ( result. is_ok( ) ) ;
209+ }
210+ }
0 commit comments