|
| 1 | +//! Integration test for the QUIC agent tunnel (Quinn). |
| 2 | +//! |
| 3 | +//! Verifies the full data path: |
| 4 | +//! TCP echo server ← Agent (Quinn client) ← QUIC mTLS ← Gateway listener ← TunnelStream |
| 5 | +//! |
| 6 | +//! This test runs entirely in-process with real UDP sockets on localhost. |
| 7 | +
|
| 8 | +#![allow(clippy::unwrap_used)] |
| 9 | + |
| 10 | +use std::net::SocketAddr; |
| 11 | +use std::sync::Arc; |
| 12 | +use std::time::Duration; |
| 13 | + |
| 14 | +use agent_tunnel_proto::{ConnectResponse, ControlMessage, ControlStream, DomainAdvertisement, SessionStream}; |
| 15 | +use camino::Utf8PathBuf; |
| 16 | +use ipnetwork::Ipv4Network; |
| 17 | +use tokio::io::{AsyncReadExt, AsyncWriteExt}; |
| 18 | +use tokio::net::{TcpListener, TcpStream}; |
| 19 | +use uuid::Uuid; |
| 20 | + |
| 21 | +use super::cert::CaManager; |
| 22 | +use super::listener::AgentTunnelListener; |
| 23 | + |
| 24 | +/// Start a TCP echo server that echoes back whatever it receives. |
| 25 | +async fn start_echo_server() -> (SocketAddr, tokio::task::JoinHandle<()>) { |
| 26 | + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); |
| 27 | + let addr = listener.local_addr().unwrap(); |
| 28 | + |
| 29 | + let handle = tokio::spawn(async move { |
| 30 | + loop { |
| 31 | + let (mut stream, _) = match listener.accept().await { |
| 32 | + Ok(v) => v, |
| 33 | + Err(_) => break, |
| 34 | + }; |
| 35 | + |
| 36 | + tokio::spawn(async move { |
| 37 | + let mut buf = vec![0u8; 65535]; |
| 38 | + loop { |
| 39 | + let n = match stream.read(&mut buf).await { |
| 40 | + Ok(0) | Err(_) => break, |
| 41 | + Ok(n) => n, |
| 42 | + }; |
| 43 | + if stream.write_all(&buf[..n]).await.is_err() { |
| 44 | + break; |
| 45 | + } |
| 46 | + } |
| 47 | + }); |
| 48 | + } |
| 49 | + }); |
| 50 | + |
| 51 | + (addr, handle) |
| 52 | +} |
| 53 | + |
| 54 | +/// Generate a key pair and CSR (same as the real agent does during enrollment). |
| 55 | +fn generate_test_key_and_csr(agent_name: &str) -> (rcgen::KeyPair, String) { |
| 56 | + let key_pair = rcgen::KeyPair::generate_for(&rcgen::PKCS_ECDSA_P256_SHA256).expect("generate test key pair"); |
| 57 | + let mut params = rcgen::CertificateParams::default(); |
| 58 | + params.distinguished_name.push(rcgen::DnType::CommonName, agent_name); |
| 59 | + let csr = params.serialize_request(&key_pair).expect("serialize test CSR"); |
| 60 | + let csr_pem = csr.pem().expect("CSR to PEM"); |
| 61 | + (key_pair, csr_pem) |
| 62 | +} |
| 63 | + |
| 64 | +/// Create a Quinn client connection to the gateway with mTLS. |
| 65 | +async fn connect_quinn_client( |
| 66 | + ca_cert_pem: &str, |
| 67 | + client_cert_pem: &str, |
| 68 | + client_key_pem: &str, |
| 69 | + server_addr: SocketAddr, |
| 70 | +) -> quinn::Connection { |
| 71 | + use rustls_pemfile::{certs, private_key}; |
| 72 | + |
| 73 | + let _ = rustls::crypto::ring::default_provider().install_default(); |
| 74 | + |
| 75 | + // Parse client cert + key. |
| 76 | + let client_certs: Vec<rustls_pki_types::CertificateDer<'static>> = |
| 77 | + certs(&mut std::io::BufReader::new(client_cert_pem.as_bytes())) |
| 78 | + .collect::<Result<Vec<_>, _>>() |
| 79 | + .expect("parse client certs"); |
| 80 | + let client_key = private_key(&mut std::io::BufReader::new(client_key_pem.as_bytes())) |
| 81 | + .expect("parse private key") |
| 82 | + .expect("no private key found"); |
| 83 | + |
| 84 | + // Build root store with the CA cert. |
| 85 | + let mut roots = rustls::RootCertStore::empty(); |
| 86 | + let ca_certs: Vec<rustls_pki_types::CertificateDer<'static>> = |
| 87 | + certs(&mut std::io::BufReader::new(ca_cert_pem.as_bytes())) |
| 88 | + .collect::<Result<Vec<_>, _>>() |
| 89 | + .expect("parse CA certs"); |
| 90 | + for cert in ca_certs { |
| 91 | + roots.add(cert).expect("add CA cert to root store"); |
| 92 | + } |
| 93 | + |
| 94 | + // Build client config — skip hostname verification for test (connect by IP). |
| 95 | + let verifier = rustls::client::WebPkiServerVerifier::builder(Arc::new(roots)) |
| 96 | + .build() |
| 97 | + .expect("build verifier"); |
| 98 | + |
| 99 | + let mut client_crypto = rustls::ClientConfig::builder() |
| 100 | + .dangerous() |
| 101 | + .with_custom_certificate_verifier(verifier) |
| 102 | + .with_client_auth_cert(client_certs, client_key) |
| 103 | + .expect("client auth config"); |
| 104 | + |
| 105 | + client_crypto.alpn_protocols = vec![agent_tunnel_proto::ALPN_PROTOCOL.to_vec()]; |
| 106 | + |
| 107 | + let client_config = quinn::ClientConfig::new(Arc::new( |
| 108 | + quinn::crypto::rustls::QuicClientConfig::try_from(client_crypto).expect("QUIC client config"), |
| 109 | + )); |
| 110 | + |
| 111 | + let mut endpoint = quinn::Endpoint::client("0.0.0.0:0".parse().expect("bind addr")).expect("create endpoint"); |
| 112 | + endpoint.set_default_client_config(client_config); |
| 113 | + |
| 114 | + endpoint |
| 115 | + .connect(server_addr, "localhost") |
| 116 | + .expect("initiate connection") |
| 117 | + .await |
| 118 | + .expect("QUIC handshake") |
| 119 | +} |
| 120 | + |
| 121 | +/// Full E2E integration test. |
| 122 | +/// |
| 123 | +/// 1. Start TCP echo server |
| 124 | +/// 2. Start QUIC listener (gateway, in-process) |
| 125 | +/// 3. Connect a simulated agent (Quinn client) with mTLS |
| 126 | +/// 4. Agent sends RouteAdvertise on control stream |
| 127 | +/// 5. Gateway opens a proxy stream via connect_via_agent |
| 128 | +/// 6. Agent reads ConnectRequest, connects to echo server, sends ConnectResponse::Success |
| 129 | +/// 7. Bidirectional data flows through the full tunnel |
| 130 | +/// 8. Verify echo response matches |
| 131 | +#[tokio::test] |
| 132 | +async fn quic_agent_tunnel_e2e() { |
| 133 | + // ── 1. Setup certificates ── |
| 134 | + |
| 135 | + let temp_dir = tempfile::tempdir().expect("create tempdir"); |
| 136 | + let data_dir = Utf8PathBuf::from_path_buf(temp_dir.path().to_path_buf()).expect("UTF-8 temp path"); |
| 137 | + |
| 138 | + let ca_manager = CaManager::load_or_generate(&data_dir).expect("CA generation"); |
| 139 | + |
| 140 | + let agent_id = Uuid::new_v4(); |
| 141 | + let (key_pair, csr_pem) = generate_test_key_and_csr("test-agent"); |
| 142 | + let signed = ca_manager |
| 143 | + .sign_agent_csr(agent_id, "test-agent", &csr_pem, Some("localhost")) |
| 144 | + .expect("sign agent CSR"); |
| 145 | + |
| 146 | + // ── 2. Start TCP echo server ── |
| 147 | + |
| 148 | + let (echo_addr, _echo_handle) = start_echo_server().await; |
| 149 | + let echo_subnet: Ipv4Network = format!("{}/32", echo_addr.ip()).parse().unwrap(); |
| 150 | + |
| 151 | + // ── 3. Start QUIC listener (gateway) ── |
| 152 | + |
| 153 | + let listen_addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); |
| 154 | + let (listener, handle) = AgentTunnelListener::bind(listen_addr, Arc::clone(&ca_manager), "localhost") |
| 155 | + .await |
| 156 | + .expect("bind QUIC listener"); |
| 157 | + |
| 158 | + let server_addr = listener.local_addr(); |
| 159 | + |
| 160 | + let (shutdown_handle, shutdown_signal) = devolutions_gateway_task::ShutdownHandle::new(); |
| 161 | + let listener_task = tokio::spawn(async move { |
| 162 | + use devolutions_gateway_task::Task; |
| 163 | + let _ = listener.run(shutdown_signal).await; |
| 164 | + }); |
| 165 | + |
| 166 | + // Give listener time to be ready. |
| 167 | + tokio::time::sleep(Duration::from_millis(50)).await; |
| 168 | + |
| 169 | + // ── 4. Connect simulated agent (Quinn client with mTLS) ── |
| 170 | + |
| 171 | + let connection = connect_quinn_client( |
| 172 | + &signed.ca_cert_pem, |
| 173 | + &signed.client_cert_pem, |
| 174 | + &key_pair.serialize_pem(), |
| 175 | + server_addr, |
| 176 | + ) |
| 177 | + .await; |
| 178 | + |
| 179 | + // ── 5. Open control stream and send RouteAdvertise ── |
| 180 | + |
| 181 | + let mut ctrl: ControlStream<_, _> = connection.open_bi().await.expect("open control stream").into(); |
| 182 | + |
| 183 | + let route_msg = ControlMessage::route_advertise(1, vec![echo_subnet], vec![]); |
| 184 | + ctrl.send(&route_msg).await.expect("send RouteAdvertise"); |
| 185 | + |
| 186 | + // Give gateway time to process. |
| 187 | + tokio::time::sleep(Duration::from_millis(200)).await; |
| 188 | + |
| 189 | + // Verify agent is registered. |
| 190 | + assert!( |
| 191 | + handle.registry().get(&agent_id).await.is_some(), |
| 192 | + "agent should be registered in the registry" |
| 193 | + ); |
| 194 | + assert_eq!(handle.registry().online_count().await, 1); |
| 195 | + |
| 196 | + // ── 6. Gateway opens proxy stream ── |
| 197 | + |
| 198 | + let session_id = Uuid::new_v4(); |
| 199 | + let target_str = echo_addr.to_string(); |
| 200 | + |
| 201 | + let handle_clone = handle.clone(); |
| 202 | + let target_clone = target_str.clone(); |
| 203 | + let proxy_task = tokio::spawn(async move { |
| 204 | + handle_clone |
| 205 | + .connect_via_agent(agent_id, session_id, &target_clone) |
| 206 | + .await |
| 207 | + }); |
| 208 | + |
| 209 | + // ── 7. Agent accepts session stream ── |
| 210 | + |
| 211 | + let (send, recv) = connection |
| 212 | + .accept_bi() |
| 213 | + .await |
| 214 | + .expect("accept session stream from gateway"); |
| 215 | + let mut session: SessionStream<_, _> = (send, recv).into(); |
| 216 | + |
| 217 | + let connect_msg = session.recv_request().await.expect("recv ConnectRequest"); |
| 218 | + assert_eq!(connect_msg.session_id(), session_id); |
| 219 | + assert_eq!(connect_msg.target(), target_str); |
| 220 | + |
| 221 | + // Connect to echo server. |
| 222 | + let mut tcp_stream = TcpStream::connect(echo_addr).await.expect("connect to echo server"); |
| 223 | + |
| 224 | + // Send success response. |
| 225 | + session |
| 226 | + .send_response(&ConnectResponse::success()) |
| 227 | + .await |
| 228 | + .expect("send ConnectResponse::Success"); |
| 229 | + |
| 230 | + // ── 8. Wait for proxy task to complete ── |
| 231 | + |
| 232 | + let tunnel_stream = tokio::time::timeout(Duration::from_secs(5), proxy_task) |
| 233 | + .await |
| 234 | + .expect("proxy task should complete in time") |
| 235 | + .expect("proxy task should not panic") |
| 236 | + .expect("connect_via_agent should succeed"); |
| 237 | + |
| 238 | + // ── 9. Bidirectional data test ── |
| 239 | + |
| 240 | + let test_data = b"Hello from the Quinn E2E integration test!"; |
| 241 | + let (mut quic_read, mut quic_write) = tokio::io::split(tunnel_stream); |
| 242 | + |
| 243 | + // Gateway writes test data. |
| 244 | + quic_write.write_all(test_data).await.expect("write to TunnelStream"); |
| 245 | + |
| 246 | + // Agent relays: QUIC → TCP echo → QUIC. |
| 247 | + let (mut session_send, mut session_recv) = session.into_inner(); |
| 248 | + let mut relay_buf = vec![0u8; test_data.len()]; |
| 249 | + session_recv |
| 250 | + .read_exact(&mut relay_buf) |
| 251 | + .await |
| 252 | + .expect("read from QUIC session stream"); |
| 253 | + assert_eq!(&relay_buf, test_data); |
| 254 | + |
| 255 | + // Forward to echo server. |
| 256 | + tcp_stream.write_all(&relay_buf).await.expect("write to echo server"); |
| 257 | + |
| 258 | + // Read echo response. |
| 259 | + let mut echo_buf = vec![0u8; test_data.len()]; |
| 260 | + tcp_stream.read_exact(&mut echo_buf).await.expect("read echo response"); |
| 261 | + assert_eq!(&echo_buf, test_data); |
| 262 | + |
| 263 | + // Send echo response back through QUIC. |
| 264 | + session_send |
| 265 | + .write_all(&echo_buf) |
| 266 | + .await |
| 267 | + .expect("write echo response to QUIC"); |
| 268 | + let _ = session_send.finish(); |
| 269 | + |
| 270 | + // Gateway reads the echoed data. |
| 271 | + let mut response_buf = vec![0u8; test_data.len()]; |
| 272 | + quic_read |
| 273 | + .read_exact(&mut response_buf) |
| 274 | + .await |
| 275 | + .expect("read from TunnelStream"); |
| 276 | + assert_eq!(&response_buf, test_data, "echo response should match"); |
| 277 | + |
| 278 | + // ── 10. Cleanup ── |
| 279 | + |
| 280 | + connection.close(0u32.into(), b"test done"); |
| 281 | + shutdown_handle.signal(); |
| 282 | + let _ = tokio::time::timeout(Duration::from_secs(2), listener_task).await; |
| 283 | +} |
| 284 | + |
| 285 | +/// Domain routing E2E test. |
| 286 | +/// |
| 287 | +/// Same as above but agent advertises domain "test.local" alongside subnet. |
| 288 | +/// Verifies domain appears in the registry. |
| 289 | +#[tokio::test] |
| 290 | +async fn quic_agent_tunnel_domain_routing_e2e() { |
| 291 | + let temp_dir = tempfile::tempdir().expect("create tempdir"); |
| 292 | + let data_dir = Utf8PathBuf::from_path_buf(temp_dir.path().to_path_buf()).expect("UTF-8 temp path"); |
| 293 | + |
| 294 | + let ca_manager = CaManager::load_or_generate(&data_dir).expect("CA generation"); |
| 295 | + |
| 296 | + let agent_id = Uuid::new_v4(); |
| 297 | + let (key_pair, csr_pem) = generate_test_key_and_csr("domain-agent"); |
| 298 | + let signed = ca_manager |
| 299 | + .sign_agent_csr(agent_id, "domain-agent", &csr_pem, Some("localhost")) |
| 300 | + .expect("sign agent CSR"); |
| 301 | + |
| 302 | + let (echo_addr, _echo_handle) = start_echo_server().await; |
| 303 | + let echo_subnet: Ipv4Network = format!("{}/32", echo_addr.ip()).parse().unwrap(); |
| 304 | + |
| 305 | + let listen_addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); |
| 306 | + let (listener, handle) = AgentTunnelListener::bind(listen_addr, Arc::clone(&ca_manager), "localhost") |
| 307 | + .await |
| 308 | + .expect("bind QUIC listener"); |
| 309 | + |
| 310 | + let server_addr = listener.local_addr(); |
| 311 | + |
| 312 | + let (shutdown_handle, shutdown_signal) = devolutions_gateway_task::ShutdownHandle::new(); |
| 313 | + let listener_task = tokio::spawn(async move { |
| 314 | + use devolutions_gateway_task::Task; |
| 315 | + let _ = listener.run(shutdown_signal).await; |
| 316 | + }); |
| 317 | + |
| 318 | + tokio::time::sleep(Duration::from_millis(50)).await; |
| 319 | + |
| 320 | + let connection = connect_quinn_client( |
| 321 | + &signed.ca_cert_pem, |
| 322 | + &signed.client_cert_pem, |
| 323 | + &key_pair.serialize_pem(), |
| 324 | + server_addr, |
| 325 | + ) |
| 326 | + .await; |
| 327 | + |
| 328 | + // Send RouteAdvertise with domain. |
| 329 | + let mut ctrl: ControlStream<_, _> = connection.open_bi().await.expect("open control stream").into(); |
| 330 | + |
| 331 | + let domains = vec![DomainAdvertisement { |
| 332 | + domain: agent_tunnel_proto::DomainName::new("test.local"), |
| 333 | + auto_detected: false, |
| 334 | + }]; |
| 335 | + let route_msg = ControlMessage::route_advertise(1, vec![echo_subnet], domains); |
| 336 | + ctrl.send(&route_msg).await.expect("send RouteAdvertise"); |
| 337 | + |
| 338 | + tokio::time::sleep(Duration::from_millis(200)).await; |
| 339 | + |
| 340 | + // Verify agent + domain registered. |
| 341 | + let peer = handle |
| 342 | + .registry() |
| 343 | + .get(&agent_id) |
| 344 | + .await |
| 345 | + .expect("agent should be registered"); |
| 346 | + |
| 347 | + let route_state = peer.route_state(); |
| 348 | + assert_eq!(route_state.domains.len(), 1); |
| 349 | + assert_eq!(route_state.domains[0].domain.as_str(), "test.local"); |
| 350 | + assert!(!route_state.domains[0].auto_detected); |
| 351 | + |
| 352 | + // Cleanup. |
| 353 | + connection.close(0u32.into(), b"test done"); |
| 354 | + shutdown_handle.signal(); |
| 355 | + let _ = tokio::time::timeout(Duration::from_secs(2), listener_task).await; |
| 356 | +} |
0 commit comments