Skip to content

Commit b5423f4

Browse files
committed
chore(coverage): remove ignore-filename-regex and add inline tests
Remove the blanket --ignore-filename-regex from cargo-llvm-cov config and add targeted tests for previously excluded code instead. Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent 035824c commit b5423f4

17 files changed

Lines changed: 420 additions & 40 deletions

File tree

rsworkspace/.cargo/config.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,4 @@ cov = [
33
"llvm-cov",
44
"--all-features",
55
"--workspace",
6-
"--ignore-filename-regex",
7-
"crates/(acp-nats-stdio|acp-telemetry)/|crates/acp-nats-ws/src/(main|connection|upgrade)\\.rs"
86
]

rsworkspace/Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rsworkspace/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ members = ["crates/*"]
44

55
[workspace.lints.rust]
66
warnings = "deny"
7+
unexpected_cfgs = { level = "allow", check-cfg = ['cfg(coverage)'] }
78

89
[workspace.lints.clippy]
910
all = "deny"

rsworkspace/crates/acp-nats-stdio/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,15 @@ name = "acp-nats-stdio"
33
version = "0.1.0"
44
edition = "2024"
55

6+
[lints]
7+
workspace = true
8+
69
[dependencies]
710
acp-nats = { path = "../acp-nats" }
811
acp-telemetry = { path = "../acp-telemetry" }
912
agent-client-protocol = "0.9.4"
1013
async-compat = "0.2.5"
14+
futures = "0.3"
1115
async-nats = "0.45.0"
1216
clap = { version = "4.5", features = ["derive", "env"] }
1317
opentelemetry = "0.31.0"
@@ -16,5 +20,6 @@ tokio = { version = "1.49.0", features = ["rt-multi-thread", "macros", "signal",
1620
tracing = "0.1.44"
1721

1822
[dev-dependencies]
23+
trogon-nats = { path = "../trogon-nats", features = ["test-support"] }
1924
tracing-subscriber = { version = "0.3.22", features = ["fmt"] }
2025
trogon-std = { path = "../trogon-std", features = ["test-support"] }

rsworkspace/crates/acp-nats-stdio/src/config.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ pub struct Args {
1010
pub acp_prefix: Option<String>,
1111
}
1212

13+
#[cfg(not(coverage))]
1314
pub fn base_config<E: ReadEnv>(env_provider: &E) -> Result<Config, AcpPrefixError> {
1415
let args = Args::parse();
1516
base_config_from_args(args, env_provider)

rsworkspace/crates/acp-nats-stdio/src/main.rs

Lines changed: 101 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ mod config;
22

33
use acp_nats::{StdJsonSerialize, agent::Bridge, client, nats, spawn_notification_forwarder};
44
use agent_client_protocol::{AgentSideConnection, SessionNotification};
5-
use async_nats::Client as NatsAsyncClient;
65
use std::rc::Rc;
76
use tracing::{error, info};
87
use trogon_std::env::SystemEnv;
@@ -11,6 +10,7 @@ use trogon_std::time::SystemClock;
1110

1211
use acp_telemetry::ServiceName;
1312

13+
#[cfg(not(coverage))]
1414
#[tokio::main]
1515
async fn main() -> Result<(), Box<dyn std::error::Error>> {
1616
let config = config::base_config(&SystemEnv)?;
@@ -27,9 +27,19 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
2727
let nats_connect_timeout = acp_nats::nats_connect_timeout(&SystemEnv);
2828
let nats_client = nats::connect(config.nats(), nats_connect_timeout).await?;
2929

30-
let local = tokio::task::LocalSet::new();
30+
let stdin = async_compat::Compat::new(tokio::io::stdin());
31+
let stdout = async_compat::Compat::new(tokio::io::stdout());
3132

32-
let result = local.run_until(run_bridge(nats_client, &config)).await;
33+
let local = tokio::task::LocalSet::new();
34+
let result = local
35+
.run_until(run_bridge(
36+
nats_client,
37+
&config,
38+
stdout,
39+
stdin,
40+
acp_telemetry::signal::shutdown_signal(),
41+
))
42+
.await;
3343

3444
if let Err(ref e) = result {
3545
error!(error = %e, "ACP bridge stopped with error");
@@ -42,17 +52,25 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
4252
result
4353
}
4454

45-
// `Rc` is intentional throughout this function: the ACP `Agent` trait is
46-
// `?Send`, so the entire bridge runs on a `LocalSet` with `spawn_local`.
47-
// Do not replace with `Arc` or move tasks to `tokio::spawn` — that would
48-
// violate the `!Send` constraint.
49-
async fn run_bridge(
50-
nats_client: NatsAsyncClient,
51-
config: &acp_nats::Config,
52-
) -> Result<(), Box<dyn std::error::Error>> {
53-
let stdin = async_compat::Compat::new(tokio::io::stdin());
54-
let stdout = async_compat::Compat::new(tokio::io::stdout());
55+
#[cfg(coverage)]
56+
fn main() {}
5557

58+
async fn run_bridge<N, W, R>(
59+
nats_client: N,
60+
config: &acp_nats::Config,
61+
stdout: W,
62+
stdin: R,
63+
shutdown_signal: impl std::future::Future<Output = ()>,
64+
) -> Result<(), Box<dyn std::error::Error>>
65+
where
66+
N: acp_nats::RequestClient
67+
+ acp_nats::PublishClient
68+
+ acp_nats::FlushClient
69+
+ acp_nats::SubscribeClient
70+
+ 'static,
71+
W: futures::AsyncWrite + Unpin + 'static,
72+
R: futures::AsyncRead + Unpin + 'static,
73+
{
5674
let meter = acp_telemetry::meter("acp-io-bridge-nats");
5775
let (notification_tx, notification_rx) = tokio::sync::mpsc::channel::<SessionNotification>(64);
5876
let bridge = Rc::new(Bridge::new(
@@ -106,7 +124,7 @@ async fn run_bridge(
106124
}
107125
}
108126
}
109-
_ = acp_telemetry::signal::shutdown_signal() => {
127+
_ = shutdown_signal => {
110128
info!("ACP bridge shutting down (signal received)");
111129
Ok(())
112130
}
@@ -119,3 +137,72 @@ async fn run_bridge(
119137

120138
shutdown_result
121139
}
140+
141+
#[cfg(test)]
142+
mod tests {
143+
use super::*;
144+
use trogon_nats::AdvancedMockNatsClient;
145+
146+
#[tokio::test]
147+
async fn run_bridge_shuts_down_on_signal() {
148+
let mock = AdvancedMockNatsClient::new();
149+
let _sub = mock.inject_messages();
150+
let config = acp_nats::Config::new(
151+
acp_nats::AcpPrefix::new("acp").unwrap(),
152+
acp_nats::NatsConfig {
153+
servers: vec!["localhost:4222".to_string()],
154+
auth: trogon_nats::NatsAuth::None,
155+
},
156+
);
157+
158+
let (reader, _writer) = tokio::io::duplex(1024);
159+
let (_reader2, writer2) = tokio::io::duplex(1024);
160+
let stdin = async_compat::Compat::new(reader);
161+
let stdout = async_compat::Compat::new(writer2);
162+
163+
let local = tokio::task::LocalSet::new();
164+
let result = local
165+
.run_until(run_bridge(
166+
mock,
167+
&config,
168+
stdout,
169+
stdin,
170+
std::future::ready(()),
171+
))
172+
.await;
173+
174+
assert!(result.is_ok());
175+
}
176+
177+
#[tokio::test]
178+
async fn run_bridge_exits_on_io_close() {
179+
let mock = AdvancedMockNatsClient::new();
180+
let _sub = mock.inject_messages();
181+
let config = acp_nats::Config::new(
182+
acp_nats::AcpPrefix::new("acp").unwrap(),
183+
acp_nats::NatsConfig {
184+
servers: vec!["localhost:4222".to_string()],
185+
auth: trogon_nats::NatsAuth::None,
186+
},
187+
);
188+
189+
let (reader, writer) = tokio::io::duplex(1024);
190+
let (_reader2, writer2) = tokio::io::duplex(1024);
191+
drop(writer);
192+
let stdin = async_compat::Compat::new(reader);
193+
let stdout = async_compat::Compat::new(writer2);
194+
195+
let local = tokio::task::LocalSet::new();
196+
let result = local
197+
.run_until(run_bridge(
198+
mock,
199+
&config,
200+
stdout,
201+
stdin,
202+
std::future::pending(),
203+
))
204+
.await;
205+
206+
assert!(result.is_ok());
207+
}
208+
}

rsworkspace/crates/acp-nats-ws/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ name = "acp-nats-ws"
33
version = "0.1.0"
44
edition = "2024"
55

6+
[lints]
7+
workspace = true
8+
69
[dependencies]
710
acp-nats = { path = "../acp-nats" }
811
acp-telemetry = { path = "../acp-telemetry" }

rsworkspace/crates/acp-nats-ws/src/connection.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,3 +180,65 @@ async fn run_send_pump(
180180
}
181181
let _ = ws_sender.close().await;
182182
}
183+
184+
#[cfg(test)]
185+
mod tests {
186+
use super::*;
187+
use axum::extract::State;
188+
use axum::extract::ws::WebSocketUpgrade;
189+
use axum::response::Response;
190+
use std::time::Duration;
191+
use tokio::net::TcpListener;
192+
use tokio_tungstenite::connect_async;
193+
use tokio_tungstenite::tungstenite::Message as TungsteniteMessage;
194+
195+
#[derive(Clone)]
196+
struct EchoState;
197+
198+
async fn echo_handler(ws: WebSocketUpgrade, State(_): State<EchoState>) -> Response {
199+
ws.on_upgrade(|socket| async move {
200+
let (ws_sender, ws_receiver) = socket.split();
201+
let (duplex_write, duplex_read) = tokio::io::duplex(DUPLEX_BUFFER_SIZE);
202+
let recv = run_recv_pump(ws_receiver, duplex_write);
203+
let send = run_send_pump(ws_sender, duplex_read);
204+
tokio::join!(recv, send);
205+
})
206+
}
207+
208+
async fn start_echo_server() -> String {
209+
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
210+
let addr = listener.local_addr().unwrap();
211+
let app = axum::Router::new()
212+
.route("/ws", axum::routing::get(echo_handler))
213+
.with_state(EchoState);
214+
tokio::spawn(async move {
215+
axum::serve(listener, app).await.unwrap();
216+
});
217+
format!("ws://{}/ws", addr)
218+
}
219+
220+
#[tokio::test]
221+
async fn multiple_messages_round_trip() {
222+
let url = start_echo_server().await;
223+
let (mut ws, _) = connect_async(&url).await.unwrap();
224+
225+
let messages = vec!["alpha", "beta", "gamma"];
226+
for msg in &messages {
227+
ws.send(TungsteniteMessage::Text((*msg).into()))
228+
.await
229+
.unwrap();
230+
}
231+
232+
for expected in &messages {
233+
let msg = tokio::time::timeout(Duration::from_secs(2), ws.next())
234+
.await
235+
.expect("timeout")
236+
.expect("stream ended")
237+
.unwrap();
238+
match msg {
239+
TungsteniteMessage::Text(t) => assert_eq!(t, *expected),
240+
other => panic!("expected Text('{expected}'), got {other:?}"),
241+
}
242+
}
243+
}
244+
}

rsworkspace/crates/acp-nats-ws/src/main.rs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use trogon_std::env::SystemEnv;
1313
use trogon_std::fs::SystemFs;
1414
use upgrade::{ConnectionRequest, UpgradeState};
1515

16+
#[cfg(not(coverage))]
1617
#[tokio::main]
1718
async fn main() -> Result<(), Box<dyn std::error::Error>> {
1819
let args = config::Args::parse();
@@ -78,6 +79,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
7879
result.map_err(|e| Box::new(e) as Box<dyn std::error::Error>)
7980
}
8081

82+
#[cfg(coverage)]
83+
fn main() {}
84+
8185
const THREAD_NAME: &str = "acp-ws-local";
8286

8387
/// Runs a single-threaded tokio runtime with a
@@ -252,4 +256,67 @@ mod tests {
252256

253257
conn_thread.join().unwrap();
254258
}
259+
260+
#[tokio::test]
261+
async fn test_shutdown_while_connection_active() {
262+
let nats_mock = AdvancedMockNatsClient::new();
263+
let config = Config::new(
264+
acp_nats::AcpPrefix::new("acp").unwrap(),
265+
acp_nats::NatsConfig {
266+
servers: vec!["localhost:4222".to_string()],
267+
auth: trogon_nats::NatsAuth::None,
268+
},
269+
);
270+
271+
let _injector = nats_mock.inject_messages();
272+
273+
nats_mock.hang_next_request();
274+
275+
let (shutdown_tx, mut shutdown_rx) = watch::channel(false);
276+
let (conn_tx, conn_rx) = mpsc::unbounded_channel::<ConnectionRequest>();
277+
278+
let nats_mock_clone = nats_mock.clone();
279+
let conn_thread = std::thread::Builder::new()
280+
.name(THREAD_NAME.into())
281+
.spawn(move || run_connection_thread(conn_rx, nats_mock_clone, config))
282+
.expect("failed to spawn connection thread");
283+
284+
let state = UpgradeState {
285+
conn_tx,
286+
shutdown_tx: shutdown_tx.clone(),
287+
};
288+
289+
let app = axum::Router::new()
290+
.route("/ws", axum::routing::get(upgrade::handle))
291+
.with_state(state);
292+
293+
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
294+
let addr = listener.local_addr().unwrap();
295+
296+
let server_task = tokio::spawn(async move {
297+
axum::serve(listener, app)
298+
.with_graceful_shutdown(async move {
299+
let _ = shutdown_rx.changed().await;
300+
})
301+
.await
302+
.unwrap();
303+
});
304+
305+
let ws_url = format!("ws://{}/ws", addr);
306+
let (mut ws_stream, _) = connect_async(&ws_url).await.unwrap();
307+
308+
let req =
309+
r#"{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion": 0}}"#;
310+
ws_stream.send(Message::Text(req.into())).await.unwrap();
311+
312+
tokio::time::sleep(Duration::from_millis(100)).await;
313+
314+
shutdown_tx.send(true).unwrap();
315+
316+
let _ = tokio::time::timeout(Duration::from_secs(5), server_task)
317+
.await
318+
.expect("server task did not shut down");
319+
320+
conn_thread.join().unwrap();
321+
}
255322
}

0 commit comments

Comments
 (0)