Skip to content

Commit dbbaff9

Browse files
committed
feat(acp-nats-stdio): add stdio-to-NATS bridge binary crate
Add acp-nats-stdio crate with main entrypoint, CLI config, graceful shutdown signal handling, and OpenTelemetry telemetry integration. Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent a19a17d commit dbbaff9

9 files changed

Lines changed: 1498 additions & 4 deletions

File tree

rsworkspace/Cargo.lock

Lines changed: 800 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
[package]
2+
name = "acp-nats-stdio"
3+
version = "0.1.0"
4+
edition = "2024"
5+
6+
[dependencies]
7+
acp-nats = { path = "../acp-nats" }
8+
agent-client-protocol = "0.9.4"
9+
async-compat = "0.2.5"
10+
async-nats = "0.45.0"
11+
clap = { version = "4.5", features = ["derive", "env"] }
12+
opentelemetry = "0.31.0"
13+
opentelemetry-appender-tracing = "0.31.0"
14+
opentelemetry-otlp = { version = "0.31.0", features = ["http-json", "logs", "metrics", "reqwest-rustls"] }
15+
opentelemetry_sdk = { version = "0.31.0", features = ["rt-tokio", "logs", "metrics"] }
16+
trogon-std = { path = "../trogon-std" }
17+
tokio = { version = "1.49.0", features = ["rt-multi-thread", "macros", "signal", "io-std"] }
18+
tracing = "0.1.44"
19+
tracing-opentelemetry = "0.32.1"
20+
tracing-subscriber = { version = "0.3.22", features = ["env-filter", "fmt", "json"] }
21+
22+
[dev-dependencies]
23+
trogon-std = { path = "../trogon-std", features = ["test-support"] }
Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
use acp_nats::{AcpPrefix, AcpPrefixError, Config, NatsConfig};
2+
use clap::Parser;
3+
use std::time::Duration;
4+
use tracing::warn;
5+
use trogon_std::env::ReadEnv;
6+
7+
const MIN_TIMEOUT_SECS: u64 = 1;
8+
const DEFAULT_CONNECT_TIMEOUT_SECS: u64 = 10;
9+
10+
const ENV_ACP_PREFIX: &str = "ACP_PREFIX";
11+
const ENV_OPERATION_TIMEOUT_SECS: &str = "ACP_OPERATION_TIMEOUT_SECS";
12+
const ENV_PROMPT_TIMEOUT_SECS: &str = "ACP_PROMPT_TIMEOUT_SECS";
13+
const ENV_CONNECT_TIMEOUT_SECS: &str = "ACP_NATS_CONNECT_TIMEOUT_SECS";
14+
const DEFAULT_ACP_PREFIX: &str = "acp";
15+
16+
#[derive(Parser, Debug)]
17+
#[command(name = "acp-nats-stdio")]
18+
#[command(about = "ACP stdio to NATS bridge for agent-client protocol", long_about = None)]
19+
pub struct Args {
20+
#[arg(long = "acp-prefix")]
21+
pub acp_prefix: Option<String>,
22+
}
23+
24+
pub fn base_config<E: ReadEnv>(env_provider: &E) -> Result<Config, AcpPrefixError> {
25+
let args = Args::parse();
26+
base_config_from_args(args, env_provider)
27+
}
28+
29+
fn base_config_from_args<E: ReadEnv>(
30+
args: Args,
31+
env_provider: &E,
32+
) -> Result<Config, AcpPrefixError> {
33+
let raw_prefix = args
34+
.acp_prefix
35+
.or_else(|| env_provider.var(ENV_ACP_PREFIX).ok())
36+
.unwrap_or_else(|| DEFAULT_ACP_PREFIX.to_string());
37+
let prefix = AcpPrefix::new(raw_prefix)?;
38+
Ok(Config::with_prefix(
39+
prefix,
40+
NatsConfig::from_env(env_provider),
41+
))
42+
}
43+
44+
pub fn apply_timeout_overrides<E: ReadEnv>(config: Config, env_provider: &E) -> Config {
45+
let mut config = config;
46+
47+
if let Ok(raw) = env_provider.var(ENV_OPERATION_TIMEOUT_SECS) {
48+
match raw.parse::<u64>() {
49+
Ok(secs) if secs >= MIN_TIMEOUT_SECS => {
50+
config = config.with_operation_timeout(Duration::from_secs(secs));
51+
}
52+
Ok(secs) => {
53+
warn!(
54+
"{ENV_OPERATION_TIMEOUT_SECS}={secs} is below minimum ({MIN_TIMEOUT_SECS}), using default"
55+
);
56+
}
57+
Err(_) => {
58+
warn!("{ENV_OPERATION_TIMEOUT_SECS}={raw:?} is not a valid integer, using default");
59+
}
60+
}
61+
}
62+
63+
if let Ok(raw) = env_provider.var(ENV_PROMPT_TIMEOUT_SECS) {
64+
match raw.parse::<u64>() {
65+
Ok(secs) if secs >= MIN_TIMEOUT_SECS => {
66+
config = config.with_prompt_timeout(Duration::from_secs(secs));
67+
}
68+
Ok(secs) => {
69+
warn!(
70+
"{ENV_PROMPT_TIMEOUT_SECS}={secs} is below minimum ({MIN_TIMEOUT_SECS}), using default"
71+
);
72+
}
73+
Err(_) => {
74+
warn!("{ENV_PROMPT_TIMEOUT_SECS}={raw:?} is not a valid integer, using default");
75+
}
76+
}
77+
}
78+
79+
config
80+
}
81+
82+
pub fn nats_connect_timeout<E: ReadEnv>(env_provider: &E) -> Duration {
83+
let default = Duration::from_secs(DEFAULT_CONNECT_TIMEOUT_SECS);
84+
85+
match env_provider.var(ENV_CONNECT_TIMEOUT_SECS) {
86+
Ok(raw) => match raw.parse::<u64>() {
87+
Ok(secs) if secs >= MIN_TIMEOUT_SECS => Duration::from_secs(secs),
88+
Ok(secs) => {
89+
warn!(
90+
"{ENV_CONNECT_TIMEOUT_SECS}={secs} is below minimum ({MIN_TIMEOUT_SECS}), using default"
91+
);
92+
default
93+
}
94+
Err(_) => {
95+
warn!("{ENV_CONNECT_TIMEOUT_SECS}={raw:?} is not a valid integer, using default");
96+
default
97+
}
98+
},
99+
Err(_) => default,
100+
}
101+
}
102+
103+
#[cfg(test)]
104+
mod tests {
105+
use super::*;
106+
use trogon_std::env::InMemoryEnv;
107+
108+
fn config_from_env(env: &InMemoryEnv) -> Config {
109+
let args = Args { acp_prefix: None };
110+
let config = base_config_from_args(args, env).unwrap();
111+
apply_timeout_overrides(config, env)
112+
}
113+
114+
#[test]
115+
fn test_default_config() {
116+
let env = InMemoryEnv::new();
117+
let config = config_from_env(&env);
118+
assert_eq!(config.acp_prefix(), DEFAULT_ACP_PREFIX);
119+
assert_eq!(config.nats().servers, vec!["localhost:4222"]);
120+
assert!(matches!(&config.nats().auth, acp_nats::NatsAuth::None));
121+
}
122+
123+
#[test]
124+
fn test_acp_prefix_from_env_provider() {
125+
let env = InMemoryEnv::new();
126+
env.set("ACP_PREFIX", "custom-prefix");
127+
let config = config_from_env(&env);
128+
assert_eq!(config.acp_prefix(), "custom-prefix");
129+
}
130+
131+
#[test]
132+
fn test_acp_prefix_from_args() {
133+
let env = InMemoryEnv::new();
134+
let args = Args {
135+
acp_prefix: Some("cli-prefix".to_string()),
136+
};
137+
let config = base_config_from_args(args, &env).unwrap();
138+
assert_eq!(config.acp_prefix(), "cli-prefix");
139+
}
140+
141+
#[test]
142+
fn test_args_override_env() {
143+
let env = InMemoryEnv::new();
144+
env.set("ACP_PREFIX", "env-prefix");
145+
let args = Args {
146+
acp_prefix: Some("cli-prefix".to_string()),
147+
};
148+
let config = base_config_from_args(args, &env).unwrap();
149+
assert_eq!(config.acp_prefix(), "cli-prefix");
150+
}
151+
152+
#[test]
153+
fn test_nats_config_from_env() {
154+
let env = InMemoryEnv::new();
155+
env.set("NATS_URL", "host1:4222,host2:4222");
156+
env.set("NATS_TOKEN", "my-token");
157+
let config = config_from_env(&env);
158+
assert_eq!(config.nats().servers, vec!["host1:4222", "host2:4222"]);
159+
assert!(matches!(&config.nats().auth, acp_nats::NatsAuth::Token(t) if t == "my-token"));
160+
}
161+
162+
#[test]
163+
fn test_nats_connect_timeout_from_env() {
164+
let env = InMemoryEnv::new();
165+
assert_eq!(
166+
nats_connect_timeout(&env),
167+
Duration::from_secs(DEFAULT_CONNECT_TIMEOUT_SECS)
168+
);
169+
170+
env.set(ENV_CONNECT_TIMEOUT_SECS, "15");
171+
assert_eq!(nats_connect_timeout(&env), Duration::from_secs(15));
172+
env.set(ENV_CONNECT_TIMEOUT_SECS, "0");
173+
assert_eq!(
174+
nats_connect_timeout(&env),
175+
Duration::from_secs(DEFAULT_CONNECT_TIMEOUT_SECS)
176+
);
177+
env.set(ENV_CONNECT_TIMEOUT_SECS, "not-a-number");
178+
assert_eq!(
179+
nats_connect_timeout(&env),
180+
Duration::from_secs(DEFAULT_CONNECT_TIMEOUT_SECS)
181+
);
182+
}
183+
184+
#[test]
185+
fn test_operation_timeout_invalid_env_is_ignored() {
186+
let env = InMemoryEnv::new();
187+
let default = config_from_env(&env);
188+
let default_timeout = default.operation_timeout();
189+
190+
env.set("ACP_OPERATION_TIMEOUT_SECS", "not-a-number");
191+
let invalid = config_from_env(&env);
192+
assert_eq!(invalid.operation_timeout(), default_timeout);
193+
}
194+
195+
#[test]
196+
fn test_operation_timeout_under_min_is_ignored() {
197+
let env = InMemoryEnv::new();
198+
let default = config_from_env(&env);
199+
let default_timeout = default.operation_timeout();
200+
201+
env.set("ACP_OPERATION_TIMEOUT_SECS", "0");
202+
let ignored = config_from_env(&env);
203+
assert_eq!(ignored.operation_timeout(), default_timeout);
204+
}
205+
206+
#[test]
207+
fn test_prompt_timeout_invalid_env_is_ignored() {
208+
let env = InMemoryEnv::new();
209+
let default = config_from_env(&env);
210+
let default_timeout = default.prompt_timeout();
211+
212+
env.set("ACP_PROMPT_TIMEOUT_SECS", "not-a-number");
213+
let invalid = config_from_env(&env);
214+
assert_eq!(invalid.prompt_timeout(), default_timeout);
215+
}
216+
217+
#[test]
218+
fn test_prompt_timeout_under_min_is_ignored() {
219+
let env = InMemoryEnv::new();
220+
let default = config_from_env(&env);
221+
let default_timeout = default.prompt_timeout();
222+
223+
env.set("ACP_PROMPT_TIMEOUT_SECS", "0");
224+
let ignored = config_from_env(&env);
225+
assert_eq!(ignored.prompt_timeout(), default_timeout);
226+
}
227+
}
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
mod config;
2+
mod signal;
3+
mod telemetry;
4+
5+
use acp_nats::{StdJsonSerialize, agent::Bridge, client, nats};
6+
use agent_client_protocol::AgentSideConnection;
7+
use async_nats::Client as NatsAsyncClient;
8+
use std::rc::Rc;
9+
use tracing::{error, info};
10+
use trogon_std::env::SystemEnv;
11+
use trogon_std::fs::SystemFs;
12+
use trogon_std::time::SystemClock;
13+
14+
#[tokio::main]
15+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
16+
let config = config::base_config(&SystemEnv)?;
17+
telemetry::init_logger(config.acp_prefix(), &SystemEnv, &SystemFs);
18+
let config = config::apply_timeout_overrides(config, &SystemEnv);
19+
20+
info!("ACP bridge starting");
21+
22+
let nats_connect_timeout = config::nats_connect_timeout(&SystemEnv);
23+
let nats_client = nats::connect(config.nats(), nats_connect_timeout).await?;
24+
25+
let local = tokio::task::LocalSet::new();
26+
27+
let result = local.run_until(run_bridge(nats_client, &config)).await;
28+
29+
if let Err(ref e) = result {
30+
error!(error = %e, "ACP bridge stopped with error");
31+
} else {
32+
info!("ACP bridge stopped");
33+
}
34+
35+
telemetry::shutdown_otel();
36+
37+
result
38+
}
39+
40+
// `Rc` is intentional throughout this function: the ACP `Agent` trait is
41+
// `?Send`, so the entire bridge runs on a `LocalSet` with `spawn_local`.
42+
// Do not replace with `Arc` or move tasks to `tokio::spawn` — that would
43+
// violate the `!Send` constraint.
44+
async fn run_bridge(
45+
nats_client: NatsAsyncClient,
46+
config: &acp_nats::Config,
47+
) -> Result<(), Box<dyn std::error::Error>> {
48+
let stdin = async_compat::Compat::new(tokio::io::stdin());
49+
let stdout = async_compat::Compat::new(tokio::io::stdout());
50+
51+
let meter = opentelemetry::global::meter("acp-io-bridge-nats");
52+
let bridge = Rc::new(Bridge::new(
53+
nats_client.clone(),
54+
SystemClock,
55+
&meter,
56+
config.clone(),
57+
));
58+
59+
let (connection, io_task) = AgentSideConnection::new(bridge.clone(), stdout, stdin, |fut| {
60+
tokio::task::spawn_local(fut);
61+
});
62+
63+
let connection = Rc::new(connection);
64+
65+
let client_connection = connection.clone();
66+
let bridge_for_client = bridge.clone();
67+
let mut client_task = tokio::task::spawn_local(async move {
68+
client::run(
69+
nats_client,
70+
client_connection,
71+
bridge_for_client,
72+
StdJsonSerialize,
73+
)
74+
.await;
75+
});
76+
info!("ACP bridge running on stdio with NATS client proxy");
77+
78+
let shutdown_result = tokio::select! {
79+
result = &mut client_task => {
80+
match result {
81+
Ok(()) => {
82+
info!("ACP bridge client task completed");
83+
Ok(())
84+
}
85+
Err(e) => {
86+
error!(error = %e, "Client task ended with error");
87+
Err(Box::new(e) as Box<dyn std::error::Error>)
88+
}
89+
}
90+
}
91+
result = io_task => {
92+
match result {
93+
Err(e) => {
94+
error!(error = %e, "IO task error");
95+
Err(Box::new(e) as Box<dyn std::error::Error>)
96+
}
97+
Ok(()) => {
98+
info!("ACP bridge shutting down (IO closed)");
99+
Ok(())
100+
}
101+
}
102+
}
103+
_ = signal::shutdown_signal() => {
104+
info!("ACP bridge shutting down (signal received)");
105+
Ok(())
106+
}
107+
};
108+
109+
if !client_task.is_finished() {
110+
client_task.abort();
111+
let _ = client_task.await;
112+
}
113+
114+
shutdown_result
115+
}

0 commit comments

Comments
 (0)