Skip to content

Commit 3935f46

Browse files
committed
feat(acp-nats-stdio): add stdio-to-NATS bridge binary crate
- New acp-nats-stdio crate bridging ACP agent-client-protocol over stdio to NATS - CLI config with --acp-prefix arg and env var fallback - Graceful shutdown on SIGINT/SIGTERM - Full OpenTelemetry telemetry (traces, metrics, logs) with file logging fallback Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent 1dda09d commit 3935f46

File tree

9 files changed

+1482
-4
lines changed

9 files changed

+1482
-4
lines changed

rsworkspace/Cargo.lock

Lines changed: 800 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
[package]
2+
name = "acp-nats-stdio"
3+
version = "0.1.0"
4+
edition = "2024"
5+
6+
[dependencies]
7+
acp-nats = { path = "../acp-nats" }
8+
agent-client-protocol = "0.9.4"
9+
async-compat = "0.2.5"
10+
async-nats = "0.45.0"
11+
clap = { version = "4.5", features = ["derive", "env"] }
12+
opentelemetry = "0.31.0"
13+
opentelemetry-appender-tracing = "0.31.0"
14+
opentelemetry-otlp = { version = "0.31.0", features = ["http-json", "logs", "metrics", "reqwest-rustls"] }
15+
opentelemetry_sdk = { version = "0.31.0", features = ["rt-tokio", "logs", "metrics"] }
16+
trogon-std = { path = "../trogon-std" }
17+
tokio = { version = "1.49.0", features = ["rt-multi-thread", "macros", "signal", "io-std"] }
18+
tracing = "0.1.44"
19+
tracing-opentelemetry = "0.32.1"
20+
tracing-subscriber = { version = "0.3.22", features = ["env-filter", "fmt", "json"] }
21+
22+
[dev-dependencies]
23+
trogon-std = { path = "../trogon-std", features = ["test-support"] }
Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
use acp_nats::{AcpPrefix, AcpPrefixError, Config, NatsConfig};
2+
use clap::Parser;
3+
use std::time::Duration;
4+
use tracing::warn;
5+
use trogon_std::env::ReadEnv;
6+
7+
const MIN_TIMEOUT_SECS: u64 = 1;
8+
const DEFAULT_CONNECT_TIMEOUT_SECS: u64 = 10;
9+
10+
const ENV_ACP_PREFIX: &str = "ACP_PREFIX";
11+
const ENV_OPERATION_TIMEOUT_SECS: &str = "ACP_OPERATION_TIMEOUT_SECS";
12+
const ENV_PROMPT_TIMEOUT_SECS: &str = "ACP_PROMPT_TIMEOUT_SECS";
13+
const ENV_CONNECT_TIMEOUT_SECS: &str = "ACP_NATS_CONNECT_TIMEOUT_SECS";
14+
const DEFAULT_ACP_PREFIX: &str = "acp";
15+
16+
#[derive(Parser, Debug)]
17+
#[command(name = "acp-nats-stdio")]
18+
#[command(about = "ACP stdio to NATS bridge for agent-client protocol", long_about = None)]
19+
pub struct Args {
20+
#[arg(long = "acp-prefix")]
21+
pub acp_prefix: Option<String>,
22+
}
23+
24+
pub fn from_env_with_provider<E: ReadEnv>(
25+
env_provider: &E,
26+
) -> Result<Config, AcpPrefixError> {
27+
let args = Args::parse();
28+
from_args(args, env_provider)
29+
}
30+
31+
pub fn from_args<E: ReadEnv>(
32+
args: Args,
33+
env_provider: &E,
34+
) -> Result<Config, AcpPrefixError> {
35+
let raw_prefix = args
36+
.acp_prefix
37+
.or_else(|| env_provider.var(ENV_ACP_PREFIX).ok())
38+
.unwrap_or_else(|| DEFAULT_ACP_PREFIX.to_string());
39+
let prefix = AcpPrefix::new(raw_prefix)?;
40+
let mut config = Config::with_prefix(prefix, NatsConfig::from_env(env_provider));
41+
42+
if let Ok(raw) = env_provider.var(ENV_OPERATION_TIMEOUT_SECS) {
43+
match raw.parse::<u64>() {
44+
Ok(secs) if secs >= MIN_TIMEOUT_SECS => {
45+
config = config.with_operation_timeout(Duration::from_secs(secs));
46+
}
47+
Ok(secs) => {
48+
warn!(
49+
"{ENV_OPERATION_TIMEOUT_SECS}={secs} is below minimum ({MIN_TIMEOUT_SECS}), using default"
50+
);
51+
}
52+
Err(_) => {
53+
warn!("{ENV_OPERATION_TIMEOUT_SECS}={raw:?} is not a valid integer, using default");
54+
}
55+
}
56+
}
57+
58+
if let Ok(raw) = env_provider.var(ENV_PROMPT_TIMEOUT_SECS) {
59+
match raw.parse::<u64>() {
60+
Ok(secs) if secs >= MIN_TIMEOUT_SECS => {
61+
config = config.with_prompt_timeout(Duration::from_secs(secs));
62+
}
63+
Ok(secs) => {
64+
warn!(
65+
"{ENV_PROMPT_TIMEOUT_SECS}={secs} is below minimum ({MIN_TIMEOUT_SECS}), using default"
66+
);
67+
}
68+
Err(_) => {
69+
warn!("{ENV_PROMPT_TIMEOUT_SECS}={raw:?} is not a valid integer, using default");
70+
}
71+
}
72+
}
73+
74+
Ok(config)
75+
}
76+
77+
pub fn nats_connect_timeout<E: ReadEnv>(env_provider: &E) -> Duration {
78+
let default = Duration::from_secs(DEFAULT_CONNECT_TIMEOUT_SECS);
79+
80+
match env_provider.var(ENV_CONNECT_TIMEOUT_SECS) {
81+
Ok(raw) => match raw.parse::<u64>() {
82+
Ok(secs) if secs >= MIN_TIMEOUT_SECS => Duration::from_secs(secs),
83+
Ok(secs) => {
84+
warn!(
85+
"{ENV_CONNECT_TIMEOUT_SECS}={secs} is below minimum ({MIN_TIMEOUT_SECS}), using default"
86+
);
87+
default
88+
}
89+
Err(_) => {
90+
warn!("{ENV_CONNECT_TIMEOUT_SECS}={raw:?} is not a valid integer, using default");
91+
default
92+
}
93+
},
94+
Err(_) => default,
95+
}
96+
}
97+
98+
#[cfg(test)]
99+
mod tests {
100+
use super::*;
101+
use trogon_std::env::InMemoryEnv;
102+
103+
fn config_from_env(env: &InMemoryEnv) -> Config {
104+
let args = Args { acp_prefix: None };
105+
from_args(args, env).unwrap()
106+
}
107+
108+
#[test]
109+
fn test_default_config() {
110+
let env = InMemoryEnv::new();
111+
let config = config_from_env(&env);
112+
assert_eq!(config.acp_prefix(), DEFAULT_ACP_PREFIX);
113+
assert_eq!(config.nats().servers, vec!["localhost:4222"]);
114+
assert!(matches!(&config.nats().auth, acp_nats::NatsAuth::None));
115+
}
116+
117+
#[test]
118+
fn test_acp_prefix_from_env_provider() {
119+
let env = InMemoryEnv::new();
120+
env.set("ACP_PREFIX", "custom-prefix");
121+
let config = config_from_env(&env);
122+
assert_eq!(config.acp_prefix(), "custom-prefix");
123+
}
124+
125+
#[test]
126+
fn test_acp_prefix_from_args() {
127+
let env = InMemoryEnv::new();
128+
let args = Args {
129+
acp_prefix: Some("cli-prefix".to_string()),
130+
};
131+
let config = from_args(args, &env).unwrap();
132+
assert_eq!(config.acp_prefix(), "cli-prefix");
133+
}
134+
135+
#[test]
136+
fn test_args_override_env() {
137+
let env = InMemoryEnv::new();
138+
env.set("ACP_PREFIX", "env-prefix");
139+
let args = Args {
140+
acp_prefix: Some("cli-prefix".to_string()),
141+
};
142+
let config = from_args(args, &env).unwrap();
143+
assert_eq!(config.acp_prefix(), "cli-prefix");
144+
}
145+
146+
#[test]
147+
fn test_nats_config_from_env() {
148+
let env = InMemoryEnv::new();
149+
env.set("NATS_URL", "host1:4222,host2:4222");
150+
env.set("NATS_TOKEN", "my-token");
151+
let config = config_from_env(&env);
152+
assert_eq!(config.nats().servers, vec!["host1:4222", "host2:4222"]);
153+
assert!(matches!(&config.nats().auth, acp_nats::NatsAuth::Token(t) if t == "my-token"));
154+
}
155+
156+
#[test]
157+
fn test_nats_connect_timeout_from_env() {
158+
let env = InMemoryEnv::new();
159+
assert_eq!(nats_connect_timeout(&env), Duration::from_secs(DEFAULT_CONNECT_TIMEOUT_SECS));
160+
161+
env.set(ENV_CONNECT_TIMEOUT_SECS, "15");
162+
assert_eq!(nats_connect_timeout(&env), Duration::from_secs(15));
163+
env.set(ENV_CONNECT_TIMEOUT_SECS, "0");
164+
assert_eq!(nats_connect_timeout(&env), Duration::from_secs(DEFAULT_CONNECT_TIMEOUT_SECS));
165+
env.set(ENV_CONNECT_TIMEOUT_SECS, "not-a-number");
166+
assert_eq!(nats_connect_timeout(&env), Duration::from_secs(DEFAULT_CONNECT_TIMEOUT_SECS));
167+
}
168+
169+
#[test]
170+
fn test_operation_timeout_invalid_env_is_ignored() {
171+
let env = InMemoryEnv::new();
172+
let default = config_from_env(&env);
173+
let default_timeout = default.operation_timeout();
174+
175+
env.set("ACP_OPERATION_TIMEOUT_SECS", "not-a-number");
176+
let invalid = config_from_env(&env);
177+
assert_eq!(invalid.operation_timeout(), default_timeout);
178+
}
179+
180+
#[test]
181+
fn test_operation_timeout_under_min_is_ignored() {
182+
let env = InMemoryEnv::new();
183+
let default = config_from_env(&env);
184+
let default_timeout = default.operation_timeout();
185+
186+
env.set("ACP_OPERATION_TIMEOUT_SECS", "0");
187+
let ignored = config_from_env(&env);
188+
assert_eq!(ignored.operation_timeout(), default_timeout);
189+
}
190+
191+
#[test]
192+
fn test_prompt_timeout_invalid_env_is_ignored() {
193+
let env = InMemoryEnv::new();
194+
let default = config_from_env(&env);
195+
let default_timeout = default.prompt_timeout();
196+
197+
env.set("ACP_PROMPT_TIMEOUT_SECS", "not-a-number");
198+
let invalid = config_from_env(&env);
199+
assert_eq!(invalid.prompt_timeout(), default_timeout);
200+
}
201+
202+
#[test]
203+
fn test_prompt_timeout_under_min_is_ignored() {
204+
let env = InMemoryEnv::new();
205+
let default = config_from_env(&env);
206+
let default_timeout = default.prompt_timeout();
207+
208+
env.set("ACP_PROMPT_TIMEOUT_SECS", "0");
209+
let ignored = config_from_env(&env);
210+
assert_eq!(ignored.prompt_timeout(), default_timeout);
211+
}
212+
}
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
mod config;
2+
mod signal;
3+
mod telemetry;
4+
5+
use acp_nats::{agent::Bridge, client, nats, StdJsonSerialize};
6+
use agent_client_protocol::AgentSideConnection;
7+
use async_nats::Client as NatsAsyncClient;
8+
use std::rc::Rc;
9+
use tracing::{error, info};
10+
use trogon_std::env::SystemEnv;
11+
use trogon_std::fs::SystemFs;
12+
use trogon_std::time::SystemClock;
13+
14+
#[tokio::main]
15+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
16+
let config = config::from_env_with_provider(&SystemEnv)?;
17+
telemetry::init_logger(config.acp_prefix(), &SystemEnv, &SystemFs);
18+
19+
info!("ACP bridge starting");
20+
21+
let nats_connect_timeout = config::nats_connect_timeout(&SystemEnv);
22+
let nats_client = nats::connect(config.nats(), nats_connect_timeout).await?;
23+
24+
let local = tokio::task::LocalSet::new();
25+
26+
let result = local.run_until(run_bridge(nats_client, &config)).await;
27+
28+
telemetry::shutdown_otel();
29+
30+
if let Err(ref e) = result {
31+
error!(error = %e, "ACP bridge stopped with error");
32+
} else {
33+
info!("ACP bridge stopped");
34+
}
35+
36+
result
37+
}
38+
39+
// `Rc` is intentional throughout this function: the ACP `Agent` trait is
40+
// `?Send`, so the entire bridge runs on a `LocalSet` with `spawn_local`.
41+
// Do not replace with `Arc` or move tasks to `tokio::spawn` — that would
42+
// violate the `!Send` constraint.
43+
async fn run_bridge(
44+
nats_client: NatsAsyncClient,
45+
config: &acp_nats::Config,
46+
) -> Result<(), Box<dyn std::error::Error>> {
47+
let stdin = async_compat::Compat::new(tokio::io::stdin());
48+
let stdout = async_compat::Compat::new(tokio::io::stdout());
49+
50+
let meter = opentelemetry::global::meter("acp-io-bridge-nats");
51+
let bridge = Rc::new(Bridge::new(
52+
nats_client.clone(),
53+
SystemClock,
54+
&meter,
55+
config.clone(),
56+
));
57+
58+
let (connection, io_task) = AgentSideConnection::new(bridge.clone(), stdout, stdin, |fut| {
59+
tokio::task::spawn_local(fut);
60+
});
61+
62+
let connection = Rc::new(connection);
63+
64+
let client_connection = connection.clone();
65+
let bridge_for_client = bridge.clone();
66+
let mut client_task = tokio::task::spawn_local(async move {
67+
client::run(nats_client, client_connection, bridge_for_client, StdJsonSerialize).await;
68+
});
69+
info!("ACP bridge running on stdio with NATS client proxy");
70+
71+
let shutdown_result = tokio::select! {
72+
result = &mut client_task => {
73+
match result {
74+
Ok(()) => {
75+
info!("ACP bridge client task completed");
76+
Ok(())
77+
}
78+
Err(e) => {
79+
error!(error = %e, "Client task ended with error");
80+
Err(Box::new(e) as Box<dyn std::error::Error>)
81+
}
82+
}
83+
}
84+
result = io_task => {
85+
match result {
86+
Err(e) => {
87+
error!(error = %e, "IO task error");
88+
Err(Box::new(e) as Box<dyn std::error::Error>)
89+
}
90+
Ok(()) => {
91+
info!("ACP bridge shutting down (IO closed)");
92+
Ok(())
93+
}
94+
}
95+
}
96+
_ = signal::shutdown_signal() => {
97+
info!("ACP bridge shutting down (signal received)");
98+
Ok(())
99+
}
100+
};
101+
102+
if !client_task.is_finished() {
103+
client_task.abort();
104+
}
105+
let _ = client_task.await;
106+
107+
shutdown_result
108+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
use tracing::info;
2+
use tracing::warn;
3+
4+
pub async fn shutdown_signal() {
5+
let ctrl_c = async {
6+
if let Err(error) = tokio::signal::ctrl_c().await {
7+
warn!(error = %error, "Failed to install Ctrl+C handler");
8+
std::future::pending::<()>().await;
9+
return;
10+
}
11+
info!("Received SIGINT (Ctrl+C)");
12+
};
13+
14+
#[cfg(unix)]
15+
let terminate = async {
16+
match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
17+
Ok(mut signal) => {
18+
signal.recv().await;
19+
info!("Received SIGTERM");
20+
}
21+
Err(error) => {
22+
warn!(error = %error, "Failed to install SIGTERM handler");
23+
std::future::pending::<()>().await;
24+
}
25+
}
26+
};
27+
28+
#[cfg(not(unix))]
29+
let terminate = std::future::pending::<()>();
30+
31+
tokio::select! {
32+
_ = ctrl_c => {}
33+
_ = terminate => {}
34+
}
35+
}

0 commit comments

Comments
 (0)