Skip to content

Commit c80991a

Browse files
committed
Fix ws-upstream relay: defer upstream connection to main runtime
The host_ws_upgrade host function was connecting to the upstream WebSocket on a temporary single-threaded tokio runtime that was destroyed immediately after block_on returned. The TcpStream inside the WebSocket was bound to that dead runtime's I/O driver, so all reads/writes in the relay task failed instantly (close code 1006). The fix stores the WsUpgradeRequest in PluginState and defers the actual TCP connection to the main async context where the relay task runs, ensuring the I/O driver stays alive for the lifetime of the connection.
1 parent 1d4807a commit c80991a

5 files changed

Lines changed: 253 additions & 75 deletions

File tree

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/barbacane-test/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ rcgen = { workspace = true }
1818
rustls = { workspace = true }
1919
base64 = { workspace = true }
2020
wiremock = "0.6"
21+
tokio-tungstenite = { workspace = true }
22+
futures-util = { workspace = true }
2123
assert_cmd = { workspace = true }
2224
predicates = { workspace = true }
2325

crates/barbacane-test/tests/plugins.rs

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1234,3 +1234,195 @@ async fn test_ws_upstream_upstream_unavailable() {
12341234
resp.status()
12351235
);
12361236
}
1237+
1238+
/// Start a simple WebSocket echo server on a random port.
1239+
/// Returns the `(join_handle, "ws://127.0.0.1:PORT")`.
1240+
fn start_ws_echo_server() -> (tokio::task::JoinHandle<()>, String) {
1241+
let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
1242+
let port = listener.local_addr().unwrap().port();
1243+
let url = format!("ws://127.0.0.1:{}", port);
1244+
1245+
// Convert to a tokio TcpListener inside the spawned task
1246+
listener.set_nonblocking(true).unwrap();
1247+
1248+
let handle = tokio::spawn(async move {
1249+
use futures_util::{SinkExt, StreamExt};
1250+
1251+
let listener = tokio::net::TcpListener::from_std(listener).unwrap();
1252+
while let Ok((stream, _)) = listener.accept().await {
1253+
tokio::spawn(async move {
1254+
let ws = tokio_tungstenite::accept_async(stream).await.unwrap();
1255+
let (mut tx, mut rx) = ws.split();
1256+
while let Some(Ok(msg)) = rx.next().await {
1257+
if msg.is_close() {
1258+
break;
1259+
}
1260+
if msg.is_text() || msg.is_binary() {
1261+
if tx.send(msg).await.is_err() {
1262+
break;
1263+
}
1264+
}
1265+
}
1266+
});
1267+
}
1268+
});
1269+
1270+
(handle, url)
1271+
}
1272+
1273+
/// Create a temporary spec + manifest for ws-upstream relay tests.
1274+
fn create_ws_spec(upstream_url: &str) -> (tempfile::TempDir, std::path::PathBuf) {
1275+
let temp_dir = tempfile::TempDir::new().expect("failed to create temp dir");
1276+
let spec_path = temp_dir.path().join("ws-relay.yaml");
1277+
1278+
let ws_wasm = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
1279+
.parent()
1280+
.unwrap()
1281+
.parent()
1282+
.unwrap()
1283+
.join("plugins/ws-upstream/ws-upstream.wasm");
1284+
let mock_wasm = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
1285+
.parent()
1286+
.unwrap()
1287+
.parent()
1288+
.unwrap()
1289+
.join("plugins/mock/mock.wasm");
1290+
1291+
let manifest = format!(
1292+
"plugins:\n ws-upstream:\n path: {ws}\n mock:\n path: {mock}\n",
1293+
ws = ws_wasm.display(),
1294+
mock = mock_wasm.display(),
1295+
);
1296+
std::fs::write(temp_dir.path().join("barbacane.yaml"), manifest).expect("write manifest");
1297+
1298+
let spec = format!(
1299+
r#"openapi: "3.1.0"
1300+
info:
1301+
title: WS Relay Test
1302+
version: "1.0.0"
1303+
1304+
paths:
1305+
/health:
1306+
get:
1307+
operationId: health
1308+
x-barbacane-dispatch:
1309+
name: mock
1310+
config:
1311+
status: 200
1312+
body: '{{"ok":true}}'
1313+
responses:
1314+
"200":
1315+
description: OK
1316+
1317+
/ws/echo:
1318+
get:
1319+
operationId: wsEcho
1320+
x-barbacane-dispatch:
1321+
name: ws-upstream
1322+
config:
1323+
url: "{upstream}"
1324+
responses:
1325+
"101":
1326+
description: Switching Protocols
1327+
"#,
1328+
upstream = upstream_url,
1329+
);
1330+
std::fs::write(&spec_path, spec).expect("write spec");
1331+
1332+
(temp_dir, spec_path)
1333+
}
1334+
1335+
#[tokio::test]
1336+
async fn test_ws_relay_stays_alive_and_echoes_frames() {
1337+
use futures_util::{SinkExt, StreamExt};
1338+
use tokio_tungstenite::tungstenite::Message;
1339+
1340+
// Start a WS echo server
1341+
let (_echo_handle, echo_url) = start_ws_echo_server();
1342+
1343+
// Create spec pointing to the echo server and boot the gateway
1344+
let (_temp_dir, spec_path) = create_ws_spec(&echo_url);
1345+
let gateway = TestGateway::from_spec(spec_path.to_str().unwrap())
1346+
.await
1347+
.expect("failed to start gateway");
1348+
1349+
// Connect through the gateway
1350+
let ws_url = format!("ws://127.0.0.1:{}/ws/echo", gateway.port());
1351+
let (mut ws, _) = tokio_tungstenite::connect_async(&ws_url)
1352+
.await
1353+
.expect("WebSocket connection through gateway failed");
1354+
1355+
// Send multiple text frames and verify each echoes back
1356+
for i in 0..5 {
1357+
let msg = format!("hello {}", i);
1358+
ws.send(Message::Text(msg.clone().into()))
1359+
.await
1360+
.expect("send failed");
1361+
1362+
let reply = tokio::time::timeout(std::time::Duration::from_secs(5), ws.next())
1363+
.await
1364+
.expect("timed out waiting for echo")
1365+
.expect("stream ended")
1366+
.expect("read error");
1367+
1368+
assert_eq!(reply, Message::Text(msg.into()), "frame {} did not echo", i);
1369+
}
1370+
1371+
// Send a binary frame too
1372+
ws.send(Message::Binary(vec![1, 2, 3].into()))
1373+
.await
1374+
.expect("binary send failed");
1375+
1376+
let reply = tokio::time::timeout(std::time::Duration::from_secs(5), ws.next())
1377+
.await
1378+
.expect("timed out waiting for binary echo")
1379+
.expect("stream ended")
1380+
.expect("read error");
1381+
1382+
assert_eq!(reply, Message::Binary(vec![1, 2, 3].into()));
1383+
1384+
// Clean close
1385+
ws.close(None).await.expect("close failed");
1386+
}
1387+
1388+
#[tokio::test]
1389+
async fn test_ws_relay_upstream_close_propagates_to_client() {
1390+
use futures_util::{SinkExt, StreamExt};
1391+
use tokio_tungstenite::tungstenite::Message;
1392+
1393+
let (_echo_handle, echo_url) = start_ws_echo_server();
1394+
let (_temp_dir, spec_path) = create_ws_spec(&echo_url);
1395+
let gateway = TestGateway::from_spec(spec_path.to_str().unwrap())
1396+
.await
1397+
.expect("failed to start gateway");
1398+
1399+
let ws_url = format!("ws://127.0.0.1:{}/ws/echo", gateway.port());
1400+
let (mut ws, _) = tokio_tungstenite::connect_async(&ws_url)
1401+
.await
1402+
.expect("WebSocket connection through gateway failed");
1403+
1404+
// Verify the connection works
1405+
ws.send(Message::Text("ping".into()))
1406+
.await
1407+
.expect("send failed");
1408+
1409+
let reply = tokio::time::timeout(std::time::Duration::from_secs(5), ws.next())
1410+
.await
1411+
.expect("timed out")
1412+
.expect("stream ended")
1413+
.expect("read error");
1414+
1415+
assert_eq!(reply, Message::Text("ping".into()));
1416+
1417+
// Send close and verify the stream ends cleanly
1418+
ws.send(Message::Close(None)).await.expect("close failed");
1419+
1420+
// The next read should be a close frame, end-of-stream, or a reset
1421+
// (the proxy may drop the connection without a close handshake).
1422+
let final_msg = tokio::time::timeout(std::time::Duration::from_secs(5), ws.next()).await;
1423+
match final_msg {
1424+
Ok(Some(Ok(Message::Close(_)))) | Ok(None) => {} // Clean close
1425+
Ok(Some(Err(_))) => {} // Reset (proxy dropped connection)
1426+
other => panic!("expected close or end-of-stream, got {:?}", other),
1427+
}
1428+
}

crates/barbacane-wasm/src/instance.rs

Lines changed: 29 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -137,11 +137,13 @@ pub struct PluginState {
137137
/// each body chunk, then drops the sender to signal end-of-stream.
138138
pub stream_sender: Option<Arc<tokio::sync::mpsc::UnboundedSender<StreamEvent>>>,
139139

140-
/// Upstream WebSocket connection established by `host_ws_upgrade` (ADR-0026).
140+
/// Upstream WebSocket upgrade request from `host_ws_upgrade` (ADR-0026).
141141
///
142-
/// After a successful `host_ws_upgrade`, the connected stream is stored here
143-
/// for the data plane to pick up and relay frames bidirectionally.
144-
pub ws_upstream: Option<crate::ws_client::UpstreamWsStream>,
142+
/// After a successful `host_ws_upgrade`, the request params are stored here.
143+
/// The actual connection is deferred to the async relay task on the main
144+
/// runtime, because the TcpStream must be created on the runtime that will
145+
/// drive it (a temporary runtime's I/O driver dies when the runtime drops).
146+
pub ws_upgrade_request: Option<crate::ws_client::WsUpgradeRequest>,
145147
}
146148

147149
#[allow(dead_code)] // Constructors used by different pool configurations
@@ -167,7 +169,7 @@ impl PluginState {
167169
last_broker_result: None,
168170
last_uuid_result: None,
169171
stream_sender: None,
170-
ws_upstream: None,
172+
ws_upgrade_request: None,
171173
}
172174
}
173175

@@ -196,7 +198,7 @@ impl PluginState {
196198
last_broker_result: None,
197199
last_uuid_result: None,
198200
stream_sender: None,
199-
ws_upstream: None,
201+
ws_upgrade_request: None,
200202
}
201203
}
202204

@@ -226,7 +228,7 @@ impl PluginState {
226228
last_broker_result: None,
227229
last_uuid_result: None,
228230
stream_sender: None,
229-
ws_upstream: None,
231+
ws_upgrade_request: None,
230232
}
231233
}
232234

@@ -261,7 +263,7 @@ impl PluginState {
261263
last_broker_result: None,
262264
last_uuid_result: None,
263265
stream_sender: None,
264-
ws_upstream: None,
266+
ws_upgrade_request: None,
265267
}
266268
}
267269

@@ -297,7 +299,7 @@ impl PluginState {
297299
last_broker_result: None,
298300
last_uuid_result: None,
299301
stream_sender: None,
300-
ws_upstream: None,
302+
ws_upgrade_request: None,
301303
}
302304
}
303305

@@ -319,9 +321,9 @@ impl PluginState {
319321
self.stream_sender = Some(sender);
320322
}
321323

322-
/// Take the upstream WebSocket connection established by host_ws_upgrade (ADR-0026).
323-
pub fn take_ws_upstream(&mut self) -> Option<crate::ws_client::UpstreamWsStream> {
324-
self.ws_upstream.take()
324+
/// Take the upstream WebSocket upgrade request from host_ws_upgrade (ADR-0026).
325+
pub fn take_ws_upgrade_request(&mut self) -> Option<crate::ws_client::WsUpgradeRequest> {
326+
self.ws_upgrade_request.take()
325327
}
326328
}
327329

@@ -620,12 +622,12 @@ impl PluginInstance {
620622
self.store.data_mut().set_stream_sender(sender);
621623
}
622624

623-
/// Take the upstream WebSocket connection established by `host_ws_upgrade` (ADR-0026).
625+
/// Take the upstream WebSocket upgrade request from `host_ws_upgrade` (ADR-0026).
624626
///
625-
/// Returns `None` if no WebSocket upgrade was performed or the connection
627+
/// Returns `None` if no WebSocket upgrade was requested or the request
626628
/// was already taken.
627-
pub fn take_ws_upstream(&mut self) -> Option<crate::ws_client::UpstreamWsStream> {
628-
self.store.data_mut().take_ws_upstream()
629+
pub fn take_ws_upgrade_request(&mut self) -> Option<crate::ws_client::WsUpgradeRequest> {
630+
self.store.data_mut().take_ws_upgrade_request()
629631
}
630632
}
631633

@@ -1159,13 +1161,14 @@ fn add_host_functions(linker: &mut Linker<PluginState>) -> Result<(), WasmError>
11591161
)
11601162
.map_err(|e| WasmError::Instantiation(format!("failed to add host_http_stream: {}", e)))?;
11611163

1162-
// host_ws_upgrade - connect to an upstream WebSocket endpoint (ADR-0026)
1164+
// host_ws_upgrade - request an upstream WebSocket connection (ADR-0026)
11631165
//
11641166
// The plugin sends a JSON payload: { url, connect_timeout_ms, headers }.
1165-
// On success: the upstream WebSocket connection is stored in PluginState
1166-
// for the data plane to pick up, and returns 0.
1167-
// On failure: the error string is stored in last_http_result (readable via
1168-
// host_http_read_result), and returns -1.
1167+
// The request is validated and stored in PluginState. The actual TCP
1168+
// connection is deferred to the async relay task on the main tokio runtime,
1169+
// because a TcpStream must be created on the runtime that will drive it
1170+
// (a temporary runtime's I/O driver dies when the runtime drops).
1171+
// Returns 0 on success (valid request), -1 on parse failure.
11691172
linker
11701173
.func_wrap(
11711174
"barbacane",
@@ -1199,54 +1202,13 @@ fn add_host_functions(linker: &mut Linker<PluginState>) -> Result<(), WasmError>
11991202
tracing::debug!(
12001203
plugin = %plugin_name,
12011204
url = %ws_request.url,
1202-
"host_ws_upgrade: connecting to upstream"
1205+
"host_ws_upgrade: storing request for deferred connection"
12031206
);
12041207

1205-
// Connect to upstream WebSocket on a dedicated tokio runtime
1206-
let result = std::thread::scope(|s| {
1207-
let handle = s.spawn(|| {
1208-
let rt = match tokio::runtime::Builder::new_current_thread()
1209-
.enable_all()
1210-
.build()
1211-
{
1212-
Ok(rt) => rt,
1213-
Err(e) => {
1214-
tracing::error!("host_ws_upgrade: failed to create runtime: {}", e);
1215-
return Err(format!("runtime error: {}", e));
1216-
}
1217-
};
1218-
1219-
rt.block_on(crate::ws_client::connect_upstream(ws_request))
1220-
});
1221-
1222-
match handle.join() {
1223-
Ok(result) => result,
1224-
Err(e) => {
1225-
tracing::error!("host_ws_upgrade: worker thread panicked: {:?}", e);
1226-
Err("internal error: worker thread panicked".to_string())
1227-
}
1228-
}
1229-
});
1230-
1231-
match result {
1232-
Ok(ws_stream) => {
1233-
tracing::debug!(
1234-
plugin = %plugin_name,
1235-
"host_ws_upgrade: upstream connection established"
1236-
);
1237-
caller.data_mut().ws_upstream = Some(ws_stream);
1238-
0
1239-
}
1240-
Err(err_msg) => {
1241-
tracing::warn!(
1242-
plugin = %plugin_name,
1243-
error = %err_msg,
1244-
"host_ws_upgrade: upstream connection failed"
1245-
);
1246-
caller.data_mut().last_http_result = Some(err_msg.into_bytes());
1247-
-1
1248-
}
1249-
}
1208+
// Store the request; the actual connection happens on the main
1209+
// runtime inside the relay task (see relay_websocket).
1210+
caller.data_mut().ws_upgrade_request = Some(ws_request);
1211+
0
12501212
},
12511213
)
12521214
.map_err(|e| WasmError::Instantiation(format!("failed to add host_ws_upgrade: {}", e)))?;

0 commit comments

Comments
 (0)