|
| 1 | +//! End-to-end integration tests: WebSocket bridge + real RpcServer + real NATS. |
| 2 | +//! |
| 3 | +//! These tests verify the full ACP request-reply flow: |
| 4 | +//! WS client → acp-nats-ws → NATS → RpcServer (trogon-acp-runner) → back |
| 5 | +//! |
| 6 | +//! Requires Docker (testcontainers starts a NATS server with JetStream). |
| 7 | +//! |
| 8 | +//! Run with: |
| 9 | +//! cargo test -p acp-nats-ws --test e2e_runner |
| 10 | +
|
| 11 | +use std::sync::Arc; |
| 12 | +use std::time::Duration; |
| 13 | + |
| 14 | +use acp_nats::{AcpPrefix, Config, NatsAuth, NatsConfig}; |
| 15 | +use acp_nats_ws::upgrade::{ConnectionRequest, UpgradeState}; |
| 16 | +use acp_nats_ws::{THREAD_NAME, run_connection_thread, upgrade}; |
| 17 | +use async_nats::jetstream; |
| 18 | +use futures_util::{SinkExt, StreamExt}; |
| 19 | +use testcontainers_modules::nats::Nats; |
| 20 | +use testcontainers_modules::testcontainers::{ContainerAsync, ImageExt, runners::AsyncRunner}; |
| 21 | +use tokio::net::TcpListener; |
| 22 | +use tokio::sync::{RwLock, mpsc, watch}; |
| 23 | +use tokio_tungstenite::connect_async; |
| 24 | +use tokio_tungstenite::tungstenite::Message; |
| 25 | +use trogon_acp_runner::{RpcServer, SessionStore}; |
| 26 | + |
| 27 | +// ── helpers ─────────────────────────────────────────────────────────────────── |
| 28 | + |
| 29 | +async fn start_nats() -> (ContainerAsync<Nats>, async_nats::Client, jetstream::Context, u16) { |
| 30 | + let container = Nats::default() |
| 31 | + .with_cmd(["--jetstream"]) |
| 32 | + .start() |
| 33 | + .await |
| 34 | + .expect("Failed to start NATS container — is Docker running?"); |
| 35 | + let port = container.get_host_port_ipv4(4222).await.unwrap(); |
| 36 | + let nats = async_nats::connect(format!("127.0.0.1:{port}")) |
| 37 | + .await |
| 38 | + .expect("connect to NATS"); |
| 39 | + let js = jetstream::new(nats.clone()); |
| 40 | + (container, nats, js, port) |
| 41 | +} |
| 42 | + |
| 43 | +fn make_config(nats_port: u16) -> Config { |
| 44 | + Config::new( |
| 45 | + AcpPrefix::new("acp").unwrap(), |
| 46 | + NatsConfig { |
| 47 | + servers: vec![format!("127.0.0.1:{nats_port}")], |
| 48 | + auth: NatsAuth::None, |
| 49 | + }, |
| 50 | + ) |
| 51 | + .with_operation_timeout(Duration::from_secs(5)) |
| 52 | +} |
| 53 | + |
| 54 | +async fn start_rpc_server(nats: async_nats::Client, js: jetstream::Context) -> SessionStore { |
| 55 | + let store = SessionStore::open(&js).await.unwrap(); |
| 56 | + let store_clone = store.clone(); |
| 57 | + let gateway_config = Arc::new(RwLock::new(None)); |
| 58 | + let server = RpcServer::new(nats, store_clone, "acp", gateway_config); |
| 59 | + tokio::spawn(async move { server.run().await }); |
| 60 | + tokio::time::sleep(Duration::from_millis(50)).await; |
| 61 | + store |
| 62 | +} |
| 63 | + |
| 64 | +async fn start_ws_server( |
| 65 | + nats_port: u16, |
| 66 | +) -> (String, watch::Sender<bool>, std::thread::JoinHandle<()>) { |
| 67 | + let nats_client = async_nats::connect(format!("127.0.0.1:{nats_port}")) |
| 68 | + .await |
| 69 | + .expect("connect to NATS for WS bridge"); |
| 70 | + let config = make_config(nats_port); |
| 71 | + let (shutdown_tx, mut shutdown_rx) = watch::channel(false); |
| 72 | + let (conn_tx, conn_rx) = mpsc::unbounded_channel::<ConnectionRequest>(); |
| 73 | + |
| 74 | + let conn_thread = std::thread::Builder::new() |
| 75 | + .name(THREAD_NAME.into()) |
| 76 | + .spawn(move || run_connection_thread(conn_rx, nats_client, config)) |
| 77 | + .expect("failed to spawn connection thread"); |
| 78 | + |
| 79 | + let state = UpgradeState { |
| 80 | + conn_tx, |
| 81 | + shutdown_tx: shutdown_tx.clone(), |
| 82 | + }; |
| 83 | + |
| 84 | + let app = axum::Router::new() |
| 85 | + .route("/ws", axum::routing::get(upgrade::handle)) |
| 86 | + .with_state(state); |
| 87 | + |
| 88 | + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); |
| 89 | + let addr = listener.local_addr().unwrap(); |
| 90 | + |
| 91 | + tokio::spawn(async move { |
| 92 | + axum::serve(listener, app) |
| 93 | + .with_graceful_shutdown(async move { |
| 94 | + let _ = shutdown_rx.changed().await; |
| 95 | + }) |
| 96 | + .await |
| 97 | + .unwrap(); |
| 98 | + }); |
| 99 | + |
| 100 | + (format!("ws://{addr}/ws"), shutdown_tx, conn_thread) |
| 101 | +} |
| 102 | + |
| 103 | +/// Read the next Text message from a WS stream, skipping non-Text frames. |
| 104 | +async fn next_text(ws: &mut tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>) -> String { |
| 105 | + loop { |
| 106 | + match ws.next().await { |
| 107 | + Some(Ok(Message::Text(t))) => return t.to_string(), |
| 108 | + Some(Ok(_)) => continue, |
| 109 | + other => panic!("unexpected ws message: {other:?}"), |
| 110 | + } |
| 111 | + } |
| 112 | +} |
| 113 | + |
| 114 | +// ── tests ───────────────────────────────────────────────────────────────────── |
| 115 | + |
| 116 | +/// Full E2E: WS client → bridge → NATS → RpcServer → back. |
| 117 | +/// The RpcServer handles `initialize` and returns capabilities. |
| 118 | +#[tokio::test] |
| 119 | +async fn e2e_initialize_returns_protocol_version_and_capabilities() { |
| 120 | + let (_container, nats, js, nats_port) = start_nats().await; |
| 121 | + let _ = start_rpc_server(nats, js).await; |
| 122 | + let (ws_url, shutdown_tx, conn_thread) = start_ws_server(nats_port).await; |
| 123 | + |
| 124 | + let (mut ws, _) = connect_async(&ws_url).await.unwrap(); |
| 125 | + |
| 126 | + let req = r#"{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":0}}"#; |
| 127 | + ws.send(Message::Text(req.into())).await.unwrap(); |
| 128 | + |
| 129 | + let text = tokio::time::timeout(Duration::from_secs(10), next_text(&mut ws)) |
| 130 | + .await |
| 131 | + .expect("timed out waiting for initialize response"); |
| 132 | + |
| 133 | + let val: serde_json::Value = serde_json::from_str(&text).unwrap(); |
| 134 | + assert_eq!(val["id"], 1, "response id must match request id"); |
| 135 | + assert!( |
| 136 | + val["result"]["protocolVersion"].is_number(), |
| 137 | + "must have protocolVersion: {text}" |
| 138 | + ); |
| 139 | + assert!( |
| 140 | + val["result"]["agentCapabilities"]["loadSession"].as_bool().unwrap_or(false), |
| 141 | + "must advertise loadSession: {text}" |
| 142 | + ); |
| 143 | + |
| 144 | + shutdown_tx.send(true).unwrap(); |
| 145 | + let _ = tokio::task::spawn_blocking(move || conn_thread.join()).await; |
| 146 | +} |
| 147 | + |
| 148 | +/// E2E new_session: bridge → NATS → RpcServer creates session → client gets session ID. |
| 149 | +#[tokio::test] |
| 150 | +async fn e2e_new_session_returns_session_id() { |
| 151 | + let (_container, nats, js, nats_port) = start_nats().await; |
| 152 | + let store = start_rpc_server(nats, js).await; |
| 153 | + let (ws_url, shutdown_tx, conn_thread) = start_ws_server(nats_port).await; |
| 154 | + |
| 155 | + let (mut ws, _) = connect_async(&ws_url).await.unwrap(); |
| 156 | + |
| 157 | + let req = r#"{"jsonrpc":"2.0","id":2,"method":"session/new","params":{"cwd":"/tmp","mcpServers":[]}}"#; |
| 158 | + ws.send(Message::Text(req.into())).await.unwrap(); |
| 159 | + |
| 160 | + let text = tokio::time::timeout(Duration::from_secs(10), next_text(&mut ws)) |
| 161 | + .await |
| 162 | + .expect("timed out waiting for session/new response"); |
| 163 | + |
| 164 | + let val: serde_json::Value = serde_json::from_str(&text).unwrap(); |
| 165 | + assert_eq!(val["id"], 2); |
| 166 | + let session_id = val["result"]["sessionId"] |
| 167 | + .as_str() |
| 168 | + .unwrap_or_else(|| panic!("must have sessionId in response: {text}")); |
| 169 | + assert!(!session_id.is_empty(), "sessionId must not be empty"); |
| 170 | + |
| 171 | + // Verify the session was persisted in the store. |
| 172 | + let state = store.load(session_id).await.unwrap(); |
| 173 | + assert_eq!(state.cwd, "/tmp"); |
| 174 | + |
| 175 | + shutdown_tx.send(true).unwrap(); |
| 176 | + let _ = tokio::task::spawn_blocking(move || conn_thread.join()).await; |
| 177 | +} |
| 178 | + |
| 179 | +/// E2E list_sessions: after creating two sessions, listing returns both. |
| 180 | +#[tokio::test] |
| 181 | +async fn e2e_list_sessions_returns_created_sessions() { |
| 182 | + let (_container, nats, js, nats_port) = start_nats().await; |
| 183 | + let _ = start_rpc_server(nats, js).await; |
| 184 | + let (ws_url, shutdown_tx, conn_thread) = start_ws_server(nats_port).await; |
| 185 | + |
| 186 | + let (mut ws, _) = connect_async(&ws_url).await.unwrap(); |
| 187 | + |
| 188 | + // Create two sessions. |
| 189 | + for (id, cwd) in [(3, "/proj1"), (4, "/proj2")] { |
| 190 | + let req = format!( |
| 191 | + r#"{{"jsonrpc":"2.0","id":{id},"method":"session/new","params":{{"cwd":"{cwd}","mcpServers":[]}}}}"# |
| 192 | + ); |
| 193 | + ws.send(Message::Text(req.into())).await.unwrap(); |
| 194 | + tokio::time::timeout(Duration::from_secs(10), next_text(&mut ws)) |
| 195 | + .await |
| 196 | + .expect("timed out waiting for session/new"); |
| 197 | + } |
| 198 | + |
| 199 | + // List sessions. |
| 200 | + let req = r#"{"jsonrpc":"2.0","id":5,"method":"session/list","params":{}}"#; |
| 201 | + ws.send(Message::Text(req.into())).await.unwrap(); |
| 202 | + let text = tokio::time::timeout(Duration::from_secs(10), next_text(&mut ws)) |
| 203 | + .await |
| 204 | + .expect("timed out waiting for session/list"); |
| 205 | + |
| 206 | + let val: serde_json::Value = serde_json::from_str(&text).unwrap(); |
| 207 | + assert_eq!(val["id"], 5); |
| 208 | + let sessions = val["result"]["sessions"].as_array().expect("must have sessions array"); |
| 209 | + assert_eq!(sessions.len(), 2, "expected 2 sessions: {text}"); |
| 210 | + |
| 211 | + shutdown_tx.send(true).unwrap(); |
| 212 | + let _ = tokio::task::spawn_blocking(move || conn_thread.join()).await; |
| 213 | +} |
| 214 | + |
| 215 | +/// E2E authenticate: bridge routes authenticate to RpcServer, which replies with empty response. |
| 216 | +#[tokio::test] |
| 217 | +async fn e2e_authenticate_returns_ok() { |
| 218 | + let (_container, nats, js, nats_port) = start_nats().await; |
| 219 | + let _ = start_rpc_server(nats, js).await; |
| 220 | + let (ws_url, shutdown_tx, conn_thread) = start_ws_server(nats_port).await; |
| 221 | + |
| 222 | + let (mut ws, _) = connect_async(&ws_url).await.unwrap(); |
| 223 | + |
| 224 | + let req = r#"{"jsonrpc":"2.0","id":6,"method":"authenticate","params":{"methodId":"password"}}"#; |
| 225 | + ws.send(Message::Text(req.into())).await.unwrap(); |
| 226 | + |
| 227 | + let text = tokio::time::timeout(Duration::from_secs(10), next_text(&mut ws)) |
| 228 | + .await |
| 229 | + .expect("timed out waiting for authenticate response"); |
| 230 | + |
| 231 | + let val: serde_json::Value = serde_json::from_str(&text).unwrap(); |
| 232 | + assert_eq!(val["id"], 6); |
| 233 | + assert!(val["result"].is_object(), "must have result: {text}"); |
| 234 | + assert!(val["error"].is_null(), "must not have error: {text}"); |
| 235 | + |
| 236 | + shutdown_tx.send(true).unwrap(); |
| 237 | + let _ = tokio::task::spawn_blocking(move || conn_thread.join()).await; |
| 238 | +} |
0 commit comments