Skip to content

Commit 8d252e5

Browse files
committed
chore(coverage): remove ignore-filename-regex and add inline tests
Remove the blanket --ignore-filename-regex from cargo-llvm-cov config and add targeted tests for previously excluded code instead. Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent 035824c commit 8d252e5

File tree

9 files changed

+263
-2
lines changed

9 files changed

+263
-2
lines changed

rsworkspace/.cargo/config.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,4 @@ cov = [
33
"llvm-cov",
44
"--all-features",
55
"--workspace",
6-
"--ignore-filename-regex",
7-
"crates/(acp-nats-stdio|acp-telemetry)/|crates/acp-nats-ws/src/(main|connection|upgrade)\\.rs"
86
]

rsworkspace/crates/acp-nats-ws/src/connection.rs

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,3 +180,66 @@ async fn run_send_pump(
180180
}
181181
let _ = ws_sender.close().await;
182182
}
183+
184+
#[cfg(test)]
185+
mod tests {
186+
use super::*;
187+
use axum::extract::State;
188+
use axum::extract::ws::WebSocketUpgrade;
189+
use axum::response::Response;
190+
use std::time::Duration;
191+
use tokio::io::AsyncReadExt;
192+
use tokio::net::TcpListener;
193+
use tokio_tungstenite::connect_async;
194+
use tokio_tungstenite::tungstenite::Message as TungsteniteMessage;
195+
196+
#[derive(Clone)]
197+
struct EchoState;
198+
199+
async fn echo_handler(ws: WebSocketUpgrade, State(_): State<EchoState>) -> Response {
200+
ws.on_upgrade(|socket| async move {
201+
let (ws_sender, ws_receiver) = socket.split();
202+
let (duplex_write, duplex_read) = tokio::io::duplex(DUPLEX_BUFFER_SIZE);
203+
let recv = run_recv_pump(ws_receiver, duplex_write);
204+
let send = run_send_pump(ws_sender, duplex_read);
205+
tokio::join!(recv, send);
206+
})
207+
}
208+
209+
async fn start_echo_server() -> String {
210+
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
211+
let addr = listener.local_addr().unwrap();
212+
let app = axum::Router::new()
213+
.route("/ws", axum::routing::get(echo_handler))
214+
.with_state(EchoState);
215+
tokio::spawn(async move {
216+
axum::serve(listener, app).await.unwrap();
217+
});
218+
format!("ws://{}/ws", addr)
219+
}
220+
221+
#[tokio::test]
222+
async fn multiple_messages_round_trip() {
223+
let url = start_echo_server().await;
224+
let (mut ws, _) = connect_async(&url).await.unwrap();
225+
226+
let messages = vec!["alpha", "beta", "gamma"];
227+
for msg in &messages {
228+
ws.send(TungsteniteMessage::Text((*msg).into()))
229+
.await
230+
.unwrap();
231+
}
232+
233+
for expected in &messages {
234+
let msg = tokio::time::timeout(Duration::from_secs(2), ws.next())
235+
.await
236+
.expect("timeout")
237+
.expect("stream ended")
238+
.unwrap();
239+
match msg {
240+
TungsteniteMessage::Text(t) => assert_eq!(t, *expected),
241+
other => panic!("expected Text('{expected}'), got {other:?}"),
242+
}
243+
}
244+
}
245+
}

rsworkspace/crates/acp-nats-ws/src/main.rs

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,4 +252,67 @@ mod tests {
252252

253253
conn_thread.join().unwrap();
254254
}
255+
256+
#[tokio::test]
257+
async fn test_shutdown_while_connection_active() {
258+
let nats_mock = AdvancedMockNatsClient::new();
259+
let config = Config::new(
260+
acp_nats::AcpPrefix::new("acp").unwrap(),
261+
acp_nats::NatsConfig {
262+
servers: vec!["localhost:4222".to_string()],
263+
auth: trogon_nats::NatsAuth::None,
264+
},
265+
);
266+
267+
let _injector = nats_mock.inject_messages();
268+
269+
nats_mock.hang_next_request();
270+
271+
let (shutdown_tx, mut shutdown_rx) = watch::channel(false);
272+
let (conn_tx, conn_rx) = mpsc::unbounded_channel::<ConnectionRequest>();
273+
274+
let nats_mock_clone = nats_mock.clone();
275+
let conn_thread = std::thread::Builder::new()
276+
.name(THREAD_NAME.into())
277+
.spawn(move || run_connection_thread(conn_rx, nats_mock_clone, config))
278+
.expect("failed to spawn connection thread");
279+
280+
let state = UpgradeState {
281+
conn_tx,
282+
shutdown_tx: shutdown_tx.clone(),
283+
};
284+
285+
let app = axum::Router::new()
286+
.route("/ws", axum::routing::get(upgrade::handle))
287+
.with_state(state);
288+
289+
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
290+
let addr = listener.local_addr().unwrap();
291+
292+
let server_task = tokio::spawn(async move {
293+
axum::serve(listener, app)
294+
.with_graceful_shutdown(async move {
295+
let _ = shutdown_rx.changed().await;
296+
})
297+
.await
298+
.unwrap();
299+
});
300+
301+
let ws_url = format!("ws://{}/ws", addr);
302+
let (mut ws_stream, _) = connect_async(&ws_url).await.unwrap();
303+
304+
let req =
305+
r#"{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion": 0}}"#;
306+
ws_stream.send(Message::Text(req.into())).await.unwrap();
307+
308+
tokio::time::sleep(Duration::from_millis(100)).await;
309+
310+
shutdown_tx.send(true).unwrap();
311+
312+
let _ = tokio::time::timeout(Duration::from_secs(5), server_task)
313+
.await
314+
.expect("server task did not shut down");
315+
316+
conn_thread.join().unwrap();
317+
}
255318
}

rsworkspace/crates/acp-nats-ws/src/upgrade.rs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,72 @@ pub async fn handle(ws: WebSocketUpgrade, State(state): State<UpgradeState>) ->
3030
}
3131
})
3232
}
33+
34+
#[cfg(test)]
35+
mod tests {
36+
use super::*;
37+
use futures_util::SinkExt;
38+
use std::time::Duration;
39+
use tokio::net::TcpListener;
40+
use tokio_tungstenite::connect_async;
41+
use tokio_tungstenite::tungstenite::Message;
42+
43+
#[tokio::test]
44+
async fn handle_sends_connection_request_through_channel() {
45+
let (shutdown_tx, _shutdown_rx) = watch::channel(false);
46+
let (conn_tx, mut conn_rx) = mpsc::unbounded_channel::<ConnectionRequest>();
47+
48+
let state = UpgradeState {
49+
conn_tx,
50+
shutdown_tx: shutdown_tx.clone(),
51+
};
52+
53+
let app = axum::Router::new()
54+
.route("/ws", axum::routing::get(handle))
55+
.with_state(state);
56+
57+
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
58+
let addr = listener.local_addr().unwrap();
59+
tokio::spawn(async move {
60+
axum::serve(listener, app).await.unwrap();
61+
});
62+
63+
let url = format!("ws://{}/ws", addr);
64+
let (_ws, _) = connect_async(&url).await.unwrap();
65+
66+
let req = tokio::time::timeout(Duration::from_secs(2), conn_rx.recv())
67+
.await
68+
.expect("timeout waiting for ConnectionRequest")
69+
.expect("channel closed");
70+
71+
assert!(!*req.shutdown_rx.borrow());
72+
}
73+
74+
#[tokio::test]
75+
async fn handle_logs_error_when_conn_rx_dropped() {
76+
let (shutdown_tx, _shutdown_rx) = watch::channel(false);
77+
let (conn_tx, conn_rx) = mpsc::unbounded_channel::<ConnectionRequest>();
78+
79+
let state = UpgradeState {
80+
conn_tx,
81+
shutdown_tx: shutdown_tx.clone(),
82+
};
83+
84+
let app = axum::Router::new()
85+
.route("/ws", axum::routing::get(handle))
86+
.with_state(state);
87+
88+
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
89+
let addr = listener.local_addr().unwrap();
90+
tokio::spawn(async move {
91+
axum::serve(listener, app).await.unwrap();
92+
});
93+
94+
drop(conn_rx);
95+
96+
let url = format!("ws://{}/ws", addr);
97+
let (_ws, _) = connect_async(&url).await.unwrap();
98+
99+
tokio::time::sleep(Duration::from_millis(100)).await;
100+
}
101+
}

rsworkspace/crates/acp-telemetry/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,5 @@ tracing-subscriber = { version = "0.3.22", features = ["env-filter", "fmt", "jso
1515
trogon-std = { path = "../trogon-std" }
1616

1717
[dev-dependencies]
18+
tokio = { version = "1.49.0", features = ["rt", "macros", "signal"] }
1819
trogon-std = { path = "../trogon-std", features = ["test-support"] }

rsworkspace/crates/acp-telemetry/src/log.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ fn platform_log_dir(service_name: ServiceName) -> Result<PathBuf, Box<dyn std::e
8181
#[cfg(test)]
8282
mod tests {
8383
use super::*;
84+
use opentelemetry::KeyValue;
8485
use trogon_std::env::InMemoryEnv;
8586
use trogon_std::fs::MemFs;
8687

@@ -109,4 +110,15 @@ mod tests {
109110
let dir = platform_log_dir(ServiceName::AcpNatsWs).unwrap();
110111
assert!(dir.ends_with(ServiceName::AcpNatsWs.as_str()));
111112
}
113+
114+
#[test]
115+
fn init_provider_lifecycle() {
116+
let resource = opentelemetry_sdk::Resource::builder()
117+
.with_service_name("test-log")
118+
.with_attributes(vec![KeyValue::new("test", "true")])
119+
.build();
120+
121+
let provider = init_provider(&resource);
122+
assert!(provider.is_ok());
123+
}
112124
}

rsworkspace/crates/acp-telemetry/src/metric.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,21 @@ pub(crate) fn shutdown() {
4242
eprintln!("Failed to shutdown meter provider: {e}");
4343
}
4444
}
45+
46+
#[cfg(test)]
47+
mod tests {
48+
use super::*;
49+
use opentelemetry::KeyValue;
50+
use opentelemetry_sdk::Resource;
51+
52+
#[test]
53+
fn init_provider_returns_valid_provider() {
54+
let resource = Resource::builder()
55+
.with_service_name("test-metric")
56+
.with_attributes(vec![KeyValue::new("test", "true")])
57+
.build();
58+
59+
let provider = init_provider(&resource);
60+
assert!(provider.is_ok());
61+
}
62+
}

rsworkspace/crates/acp-telemetry/src/trace.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,21 @@ pub(crate) fn shutdown() {
3232
eprintln!("Failed to shutdown tracer provider: {e}");
3333
}
3434
}
35+
36+
#[cfg(test)]
37+
mod tests {
38+
use super::*;
39+
use opentelemetry::KeyValue;
40+
use opentelemetry_sdk::Resource;
41+
42+
#[test]
43+
fn init_provider_returns_valid_provider() {
44+
let resource = Resource::builder()
45+
.with_service_name("test-trace")
46+
.with_attributes(vec![KeyValue::new("test", "true")])
47+
.build();
48+
49+
let provider = init_provider(&resource);
50+
assert!(provider.is_ok());
51+
}
52+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
use acp_telemetry::ServiceName;
2+
use trogon_std::env::InMemoryEnv;
3+
use trogon_std::fs::MemFs;
4+
5+
#[test]
6+
fn init_logger_creates_log_dir_and_shuts_down_cleanly() {
7+
let env = InMemoryEnv::new();
8+
env.set("ACP_LOG_DIR", "/tmp/test-coverage-lifecycle");
9+
let fs = MemFs::new();
10+
11+
acp_telemetry::init_logger(ServiceName::AcpNatsStdio, "test", &env, &fs);
12+
13+
assert!(
14+
fs.dir_exists(&std::path::PathBuf::from("/tmp/test-coverage-lifecycle")),
15+
"init_logger should create the configured log directory"
16+
);
17+
18+
acp_telemetry::shutdown_otel();
19+
}

0 commit comments

Comments
 (0)