Skip to content

Commit 87f6986

Browse files
committed
feat(acp-nats-stdio): add stdio-to-NATS bridge binary crate
Add acp-nats-stdio crate with main entrypoint, CLI config, graceful shutdown signal handling, and OpenTelemetry telemetry integration. Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent 18a4b4d commit 87f6986

File tree

9 files changed

+1492
-4
lines changed

9 files changed

+1492
-4
lines changed

rsworkspace/Cargo.lock

Lines changed: 800 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
[package]
2+
name = "acp-nats-stdio"
3+
version = "0.1.0"
4+
edition = "2024"
5+
6+
[dependencies]
7+
acp-nats = { path = "../acp-nats" }
8+
agent-client-protocol = "0.9.4"
9+
async-compat = "0.2.5"
10+
async-nats = "0.45.0"
11+
clap = { version = "4.5", features = ["derive", "env"] }
12+
opentelemetry = "0.31.0"
13+
opentelemetry-appender-tracing = "0.31.0"
14+
opentelemetry-otlp = { version = "0.31.0", features = ["http-json", "logs", "metrics", "reqwest-rustls"] }
15+
opentelemetry_sdk = { version = "0.31.0", features = ["rt-tokio", "logs", "metrics"] }
16+
trogon-std = { path = "../trogon-std" }
17+
tokio = { version = "1.49.0", features = ["rt-multi-thread", "macros", "signal", "io-std"] }
18+
tracing = "0.1.44"
19+
tracing-opentelemetry = "0.32.1"
20+
tracing-subscriber = { version = "0.3.22", features = ["env-filter", "fmt", "json"] }
21+
22+
[dev-dependencies]
23+
trogon-std = { path = "../trogon-std", features = ["test-support"] }
Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
use acp_nats::{AcpPrefix, AcpPrefixError, Config, NatsConfig};
2+
use clap::Parser;
3+
use std::time::Duration;
4+
use tracing::warn;
5+
use trogon_std::env::ReadEnv;
6+
7+
const MIN_TIMEOUT_SECS: u64 = 1;
8+
const DEFAULT_CONNECT_TIMEOUT_SECS: u64 = 10;
9+
10+
const ENV_ACP_PREFIX: &str = "ACP_PREFIX";
11+
const ENV_OPERATION_TIMEOUT_SECS: &str = "ACP_OPERATION_TIMEOUT_SECS";
12+
const ENV_PROMPT_TIMEOUT_SECS: &str = "ACP_PROMPT_TIMEOUT_SECS";
13+
const ENV_CONNECT_TIMEOUT_SECS: &str = "ACP_NATS_CONNECT_TIMEOUT_SECS";
14+
const DEFAULT_ACP_PREFIX: &str = "acp";
15+
16+
#[derive(Parser, Debug)]
17+
#[command(name = "acp-nats-stdio")]
18+
#[command(about = "ACP stdio to NATS bridge for agent-client protocol", long_about = None)]
19+
pub struct Args {
20+
#[arg(long = "acp-prefix")]
21+
pub acp_prefix: Option<String>,
22+
}
23+
24+
pub fn from_env_with_provider<E: ReadEnv>(env_provider: &E) -> Result<Config, AcpPrefixError> {
25+
let args = Args::parse();
26+
from_args(args, env_provider)
27+
}
28+
29+
pub fn from_args<E: ReadEnv>(args: Args, env_provider: &E) -> Result<Config, AcpPrefixError> {
30+
let raw_prefix = args
31+
.acp_prefix
32+
.or_else(|| env_provider.var(ENV_ACP_PREFIX).ok())
33+
.unwrap_or_else(|| DEFAULT_ACP_PREFIX.to_string());
34+
let prefix = AcpPrefix::new(raw_prefix)?;
35+
let mut config = Config::with_prefix(prefix, NatsConfig::from_env(env_provider));
36+
37+
if let Ok(raw) = env_provider.var(ENV_OPERATION_TIMEOUT_SECS) {
38+
match raw.parse::<u64>() {
39+
Ok(secs) if secs >= MIN_TIMEOUT_SECS => {
40+
config = config.with_operation_timeout(Duration::from_secs(secs));
41+
}
42+
Ok(secs) => {
43+
warn!(
44+
"{ENV_OPERATION_TIMEOUT_SECS}={secs} is below minimum ({MIN_TIMEOUT_SECS}), using default"
45+
);
46+
}
47+
Err(_) => {
48+
warn!("{ENV_OPERATION_TIMEOUT_SECS}={raw:?} is not a valid integer, using default");
49+
}
50+
}
51+
}
52+
53+
if let Ok(raw) = env_provider.var(ENV_PROMPT_TIMEOUT_SECS) {
54+
match raw.parse::<u64>() {
55+
Ok(secs) if secs >= MIN_TIMEOUT_SECS => {
56+
config = config.with_prompt_timeout(Duration::from_secs(secs));
57+
}
58+
Ok(secs) => {
59+
warn!(
60+
"{ENV_PROMPT_TIMEOUT_SECS}={secs} is below minimum ({MIN_TIMEOUT_SECS}), using default"
61+
);
62+
}
63+
Err(_) => {
64+
warn!("{ENV_PROMPT_TIMEOUT_SECS}={raw:?} is not a valid integer, using default");
65+
}
66+
}
67+
}
68+
69+
Ok(config)
70+
}
71+
72+
pub fn nats_connect_timeout<E: ReadEnv>(env_provider: &E) -> Duration {
73+
let default = Duration::from_secs(DEFAULT_CONNECT_TIMEOUT_SECS);
74+
75+
match env_provider.var(ENV_CONNECT_TIMEOUT_SECS) {
76+
Ok(raw) => match raw.parse::<u64>() {
77+
Ok(secs) if secs >= MIN_TIMEOUT_SECS => Duration::from_secs(secs),
78+
Ok(secs) => {
79+
warn!(
80+
"{ENV_CONNECT_TIMEOUT_SECS}={secs} is below minimum ({MIN_TIMEOUT_SECS}), using default"
81+
);
82+
default
83+
}
84+
Err(_) => {
85+
warn!("{ENV_CONNECT_TIMEOUT_SECS}={raw:?} is not a valid integer, using default");
86+
default
87+
}
88+
},
89+
Err(_) => default,
90+
}
91+
}
92+
93+
#[cfg(test)]
94+
mod tests {
95+
use super::*;
96+
use trogon_std::env::InMemoryEnv;
97+
98+
fn config_from_env(env: &InMemoryEnv) -> Config {
99+
let args = Args { acp_prefix: None };
100+
from_args(args, env).unwrap()
101+
}
102+
103+
#[test]
104+
fn test_default_config() {
105+
let env = InMemoryEnv::new();
106+
let config = config_from_env(&env);
107+
assert_eq!(config.acp_prefix(), DEFAULT_ACP_PREFIX);
108+
assert_eq!(config.nats().servers, vec!["localhost:4222"]);
109+
assert!(matches!(&config.nats().auth, acp_nats::NatsAuth::None));
110+
}
111+
112+
#[test]
113+
fn test_acp_prefix_from_env_provider() {
114+
let env = InMemoryEnv::new();
115+
env.set("ACP_PREFIX", "custom-prefix");
116+
let config = config_from_env(&env);
117+
assert_eq!(config.acp_prefix(), "custom-prefix");
118+
}
119+
120+
#[test]
121+
fn test_acp_prefix_from_args() {
122+
let env = InMemoryEnv::new();
123+
let args = Args {
124+
acp_prefix: Some("cli-prefix".to_string()),
125+
};
126+
let config = from_args(args, &env).unwrap();
127+
assert_eq!(config.acp_prefix(), "cli-prefix");
128+
}
129+
130+
#[test]
131+
fn test_args_override_env() {
132+
let env = InMemoryEnv::new();
133+
env.set("ACP_PREFIX", "env-prefix");
134+
let args = Args {
135+
acp_prefix: Some("cli-prefix".to_string()),
136+
};
137+
let config = from_args(args, &env).unwrap();
138+
assert_eq!(config.acp_prefix(), "cli-prefix");
139+
}
140+
141+
#[test]
142+
fn test_nats_config_from_env() {
143+
let env = InMemoryEnv::new();
144+
env.set("NATS_URL", "host1:4222,host2:4222");
145+
env.set("NATS_TOKEN", "my-token");
146+
let config = config_from_env(&env);
147+
assert_eq!(config.nats().servers, vec!["host1:4222", "host2:4222"]);
148+
assert!(matches!(&config.nats().auth, acp_nats::NatsAuth::Token(t) if t == "my-token"));
149+
}
150+
151+
#[test]
152+
fn test_nats_connect_timeout_from_env() {
153+
let env = InMemoryEnv::new();
154+
assert_eq!(
155+
nats_connect_timeout(&env),
156+
Duration::from_secs(DEFAULT_CONNECT_TIMEOUT_SECS)
157+
);
158+
159+
env.set(ENV_CONNECT_TIMEOUT_SECS, "15");
160+
assert_eq!(nats_connect_timeout(&env), Duration::from_secs(15));
161+
env.set(ENV_CONNECT_TIMEOUT_SECS, "0");
162+
assert_eq!(
163+
nats_connect_timeout(&env),
164+
Duration::from_secs(DEFAULT_CONNECT_TIMEOUT_SECS)
165+
);
166+
env.set(ENV_CONNECT_TIMEOUT_SECS, "not-a-number");
167+
assert_eq!(
168+
nats_connect_timeout(&env),
169+
Duration::from_secs(DEFAULT_CONNECT_TIMEOUT_SECS)
170+
);
171+
}
172+
173+
#[test]
174+
fn test_operation_timeout_invalid_env_is_ignored() {
175+
let env = InMemoryEnv::new();
176+
let default = config_from_env(&env);
177+
let default_timeout = default.operation_timeout();
178+
179+
env.set("ACP_OPERATION_TIMEOUT_SECS", "not-a-number");
180+
let invalid = config_from_env(&env);
181+
assert_eq!(invalid.operation_timeout(), default_timeout);
182+
}
183+
184+
#[test]
185+
fn test_operation_timeout_under_min_is_ignored() {
186+
let env = InMemoryEnv::new();
187+
let default = config_from_env(&env);
188+
let default_timeout = default.operation_timeout();
189+
190+
env.set("ACP_OPERATION_TIMEOUT_SECS", "0");
191+
let ignored = config_from_env(&env);
192+
assert_eq!(ignored.operation_timeout(), default_timeout);
193+
}
194+
195+
#[test]
196+
fn test_prompt_timeout_invalid_env_is_ignored() {
197+
let env = InMemoryEnv::new();
198+
let default = config_from_env(&env);
199+
let default_timeout = default.prompt_timeout();
200+
201+
env.set("ACP_PROMPT_TIMEOUT_SECS", "not-a-number");
202+
let invalid = config_from_env(&env);
203+
assert_eq!(invalid.prompt_timeout(), default_timeout);
204+
}
205+
206+
#[test]
207+
fn test_prompt_timeout_under_min_is_ignored() {
208+
let env = InMemoryEnv::new();
209+
let default = config_from_env(&env);
210+
let default_timeout = default.prompt_timeout();
211+
212+
env.set("ACP_PROMPT_TIMEOUT_SECS", "0");
213+
let ignored = config_from_env(&env);
214+
assert_eq!(ignored.prompt_timeout(), default_timeout);
215+
}
216+
}
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
mod config;
2+
mod signal;
3+
mod telemetry;
4+
5+
use acp_nats::{StdJsonSerialize, agent::Bridge, client, nats};
6+
use agent_client_protocol::AgentSideConnection;
7+
use async_nats::Client as NatsAsyncClient;
8+
use std::rc::Rc;
9+
use tracing::{error, info};
10+
use trogon_std::env::SystemEnv;
11+
use trogon_std::fs::SystemFs;
12+
use trogon_std::time::SystemClock;
13+
14+
#[tokio::main]
15+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
16+
let config = config::from_env_with_provider(&SystemEnv)?;
17+
telemetry::init_logger(config.acp_prefix(), &SystemEnv, &SystemFs);
18+
19+
info!("ACP bridge starting");
20+
21+
let nats_connect_timeout = config::nats_connect_timeout(&SystemEnv);
22+
let nats_client = nats::connect(config.nats(), nats_connect_timeout).await?;
23+
24+
let local = tokio::task::LocalSet::new();
25+
26+
let result = local.run_until(run_bridge(nats_client, &config)).await;
27+
28+
telemetry::shutdown_otel();
29+
30+
if let Err(ref e) = result {
31+
error!(error = %e, "ACP bridge stopped with error");
32+
} else {
33+
info!("ACP bridge stopped");
34+
}
35+
36+
result
37+
}
38+
39+
// `Rc` is intentional throughout this function: the ACP `Agent` trait is
40+
// `?Send`, so the entire bridge runs on a `LocalSet` with `spawn_local`.
41+
// Do not replace with `Arc` or move tasks to `tokio::spawn` — that would
42+
// violate the `!Send` constraint.
43+
async fn run_bridge(
44+
nats_client: NatsAsyncClient,
45+
config: &acp_nats::Config,
46+
) -> Result<(), Box<dyn std::error::Error>> {
47+
let stdin = async_compat::Compat::new(tokio::io::stdin());
48+
let stdout = async_compat::Compat::new(tokio::io::stdout());
49+
50+
let meter = opentelemetry::global::meter("acp-io-bridge-nats");
51+
let bridge = Rc::new(Bridge::new(
52+
nats_client.clone(),
53+
SystemClock,
54+
&meter,
55+
config.clone(),
56+
));
57+
58+
let (connection, io_task) = AgentSideConnection::new(bridge.clone(), stdout, stdin, |fut| {
59+
tokio::task::spawn_local(fut);
60+
});
61+
62+
let connection = Rc::new(connection);
63+
64+
let client_connection = connection.clone();
65+
let bridge_for_client = bridge.clone();
66+
let mut client_task = tokio::task::spawn_local(async move {
67+
client::run(
68+
nats_client,
69+
client_connection,
70+
bridge_for_client,
71+
StdJsonSerialize,
72+
)
73+
.await;
74+
});
75+
info!("ACP bridge running on stdio with NATS client proxy");
76+
77+
let shutdown_result = tokio::select! {
78+
result = &mut client_task => {
79+
match result {
80+
Ok(()) => {
81+
info!("ACP bridge client task completed");
82+
Ok(())
83+
}
84+
Err(e) => {
85+
error!(error = %e, "Client task ended with error");
86+
Err(Box::new(e) as Box<dyn std::error::Error>)
87+
}
88+
}
89+
}
90+
result = io_task => {
91+
match result {
92+
Err(e) => {
93+
error!(error = %e, "IO task error");
94+
Err(Box::new(e) as Box<dyn std::error::Error>)
95+
}
96+
Ok(()) => {
97+
info!("ACP bridge shutting down (IO closed)");
98+
Ok(())
99+
}
100+
}
101+
}
102+
_ = signal::shutdown_signal() => {
103+
info!("ACP bridge shutting down (signal received)");
104+
Ok(())
105+
}
106+
};
107+
108+
if !client_task.is_finished() {
109+
client_task.abort();
110+
}
111+
let _ = client_task.await;
112+
113+
shutdown_result
114+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
use tracing::info;
2+
use tracing::warn;
3+
4+
pub async fn shutdown_signal() {
5+
let ctrl_c = async {
6+
if let Err(error) = tokio::signal::ctrl_c().await {
7+
warn!(error = %error, "Failed to install Ctrl+C handler");
8+
std::future::pending::<()>().await;
9+
return;
10+
}
11+
info!("Received SIGINT (Ctrl+C)");
12+
};
13+
14+
#[cfg(unix)]
15+
let terminate = async {
16+
match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
17+
Ok(mut signal) => {
18+
signal.recv().await;
19+
info!("Received SIGTERM");
20+
}
21+
Err(error) => {
22+
warn!(error = %error, "Failed to install SIGTERM handler");
23+
std::future::pending::<()>().await;
24+
}
25+
}
26+
};
27+
28+
#[cfg(not(unix))]
29+
let terminate = std::future::pending::<()>();
30+
31+
tokio::select! {
32+
_ = ctrl_c => {}
33+
_ = terminate => {}
34+
}
35+
}

0 commit comments

Comments
 (0)