Skip to content

Commit 76d0c93

Browse files
authored
test(jetsocat): stabilize jmux proxy cli tests (#1723)
Make the async JMUX wait for listener readiness explicitly and use the tokio command variant consistently.
1 parent 5668b18 commit 76d0c93

File tree

1 file changed

+85
-72
lines changed

1 file changed

+85
-72
lines changed

testsuite/tests/cli/jetsocat.rs

Lines changed: 85 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,33 @@
1-
use std::io::Read;
21
use std::sync::Arc;
32
use std::time::Duration;
43

54
use expect_test::expect;
65
use rstest::rstest;
76
use test_utils::find_unused_ports;
87
use testsuite::cli::{
9-
assert_stderr_eq, jetsocat_assert_cmd, jetsocat_cmd, jetsocat_tokio_cmd, wait_for_port_bound, wait_for_tcp_port,
8+
assert_stderr_eq, jetsocat_assert_cmd, jetsocat_tokio_cmd, wait_for_port_bound, wait_for_tcp_port,
109
};
1110

12-
// NOTE: Windows needs more time for listeners to be ready due to slower process startup.
11+
#[cfg(windows)]
12+
const WINDOWS_NAMED_PIPE_WAIT_DURATION: Duration = Duration::from_millis(600);
1313

1414
#[cfg(not(windows))]
15-
const LISTENER_WAIT_DURATION: Duration = Duration::from_millis(300);
15+
const ASSERT_CMD_TIMEOUT: Duration = Duration::from_millis(600);
1616
#[cfg(windows)]
17-
const LISTENER_WAIT_DURATION: Duration = Duration::from_millis(600);
17+
const ASSERT_CMD_TIMEOUT: Duration = Duration::from_millis(1300);
1818

1919
#[cfg(not(windows))]
20-
const COMMAND_TIMEOUT: Duration = Duration::from_millis(600);
20+
const MCP_REQUEST_SETTLE_DURATION: Duration = Duration::from_millis(600);
21+
#[cfg(windows)]
22+
const MCP_REQUEST_SETTLE_DURATION: Duration = Duration::from_millis(1300);
23+
2124
#[cfg(windows)]
22-
const COMMAND_TIMEOUT: Duration = Duration::from_millis(1300);
25+
async fn wait_for_windows_named_pipe_server() {
26+
tokio::time::sleep(WINDOWS_NAMED_PIPE_WAIT_DURATION).await;
27+
}
28+
29+
#[cfg(not(windows))]
30+
async fn wait_for_windows_named_pipe_server() {}
2331

2432
#[test]
2533
fn no_args_shows_help() {
@@ -62,7 +70,7 @@ fn all_subcommands() {
6270
#[case::env_force_color_1(&[], &[("FORCE_COLOR", "1"), ("TERM", "dumb")], true)]
6371
fn log_term_coloring(#[case] args: &[&str], #[case] envs: &[(&str, &str)], #[case] expect_ansi: bool) {
6472
let output = jetsocat_assert_cmd()
65-
.timeout(COMMAND_TIMEOUT)
73+
.timeout(ASSERT_CMD_TIMEOUT)
6674
.args(["forward", "-", "-", "--log-term"])
6775
.args(args)
6876
.envs(envs.iter().copied())
@@ -95,7 +103,7 @@ fn log_file_coloring(#[case] args: &[&str], #[case] envs: &[(&str, &str)], #[cas
95103
let log_file_path = tempdir.path().join("jetsocat.log");
96104

97105
jetsocat_assert_cmd()
98-
.timeout(COMMAND_TIMEOUT)
106+
.timeout(ASSERT_CMD_TIMEOUT)
99107
.args(["forward", "-", "-", "--log-file", log_file_path.to_str().unwrap()])
100108
.args(args)
101109
.envs(envs.iter().copied())
@@ -111,32 +119,32 @@ fn log_file_coloring(#[case] args: &[&str], #[case] envs: &[(&str, &str)], #[cas
111119
}
112120
}
113121

114-
#[test]
115-
fn forward_hello_world() {
122+
#[tokio::test]
123+
async fn forward_hello_world() {
116124
// Find an available port.
117125
let port = find_unused_ports(1)[0];
118126

119127
// Start jetsocat listener in background using JETSOCAT_ARGS.
120-
let mut listener = jetsocat_cmd()
128+
let mut listener = jetsocat_tokio_cmd()
121129
.env(
122130
"JETSOCAT_ARGS",
123131
format!("forward tcp-listen://127.0.0.1:{port} 'cmd://echo hello world' --no-proxy"),
124132
)
133+
.kill_on_drop(true)
125134
.spawn()
126135
.expect("failed to start jetsocat listener");
127136

128-
// Give the listener time to start.
129-
std::thread::sleep(LISTENER_WAIT_DURATION);
137+
wait_for_port_bound(port).await.expect("listener ready");
130138

131139
// Connect to the listener and read the output using assert_cmd.
132140
let client_output = jetsocat_assert_cmd()
133141
.env("JETSOCAT_ARGS", format!("forward - tcp://127.0.0.1:{port}"))
134-
.timeout(COMMAND_TIMEOUT)
142+
.timeout(ASSERT_CMD_TIMEOUT)
135143
.assert();
136144

137145
// Kill the listener.
138-
let _ = listener.kill();
139-
let _ = listener.wait();
146+
let _ = listener.start_kill();
147+
let _ = listener.wait().await;
140148

141149
// Check that we got the expected output.
142150
#[cfg(windows)]
@@ -145,69 +153,71 @@ fn forward_hello_world() {
145153
client_output.success().stdout("hello world\n");
146154
}
147155

148-
#[test]
149-
fn jmux_proxy_read_hello_world() {
156+
#[tokio::test]
157+
async fn jmux_proxy_read_hello_world() {
150158
// Find 3 available ports at once to avoid conflicts.
151159
let ports = find_unused_ports(3);
152160
let echo_server_port = ports[0];
153161
let jmux_server_port = ports[1];
154162
let proxy_listen_port = ports[2];
155163

156164
// Start echo server first.
157-
let mut echo_server = jetsocat_cmd()
165+
let mut echo_server = jetsocat_tokio_cmd()
158166
.env(
159167
"JETSOCAT_ARGS",
160168
format!("forward tcp-listen://127.0.0.1:{echo_server_port} 'cmd://echo hello world' --no-proxy"),
161169
)
170+
.kill_on_drop(true)
162171
.spawn()
163172
.expect("failed to start echo server");
164173

165-
// Give the echo server time to start.
166-
std::thread::sleep(LISTENER_WAIT_DURATION);
174+
wait_for_port_bound(echo_server_port).await.expect("echo server ready");
167175

168176
// Start JMUX server that will accept JMUX connections.
169-
let mut jmux_server = jetsocat_cmd()
177+
let mut jmux_server = jetsocat_tokio_cmd()
170178
.env(
171179
"JETSOCAT_ARGS",
172180
format!("jmux-proxy tcp-listen://127.0.0.1:{jmux_server_port} --allow-all --no-proxy"),
173181
)
182+
.kill_on_drop(true)
174183
.spawn()
175184
.expect("failed to start JMUX server");
176185

177-
// Give the JMUX server time to start.
178-
std::thread::sleep(LISTENER_WAIT_DURATION);
186+
wait_for_port_bound(jmux_server_port).await.expect("JMUX server ready");
179187

180188
// Start JMUX client proxy that connects to the JMUX server and provides a local TCP listener.
181189
// This creates a tunnel: client -> proxy_listen_port -> jmux_server_port -> echo_server_port
182-
let mut jmux_client = jetsocat_cmd()
190+
let mut jmux_client = jetsocat_tokio_cmd()
183191
.env(
184192
"JETSOCAT_ARGS",
185193
format!(
186194
"jmux-proxy tcp://127.0.0.1:{jmux_server_port} tcp-listen://127.0.0.1:{proxy_listen_port}/127.0.0.1:{echo_server_port} --no-proxy",
187195
),
188196
)
197+
.kill_on_drop(true)
189198
.spawn()
190199
.expect("failed to start JMUX client");
191200

192-
// Give the JMUX client time to establish connection and set up listener.
193-
std::thread::sleep(LISTENER_WAIT_DURATION);
201+
wait_for_port_bound(proxy_listen_port)
202+
.await
203+
.expect("JMUX client proxy ready");
194204

195205
// Connect to the JMUX client's local listener.
196206
let client_output = jetsocat_assert_cmd()
197207
.env(
198208
"JETSOCAT_ARGS",
199209
format!("forward - tcp://127.0.0.1:{proxy_listen_port}"),
200210
)
201-
.timeout(COMMAND_TIMEOUT)
211+
.timeout(ASSERT_CMD_TIMEOUT)
202212
.assert();
203213

204214
// Kill all processes.
205-
let _ = jmux_client.kill();
206-
let _ = jmux_server.kill();
207-
let _ = echo_server.kill();
208-
let _ = jmux_client.wait();
209-
let _ = jmux_server.wait();
210-
let _ = echo_server.wait();
215+
let _ = jmux_client.start_kill();
216+
let _ = jmux_server.start_kill();
217+
let _ = echo_server.start_kill();
218+
let _ = jmux_client.wait().await;
219+
let _ = jmux_server.wait().await;
220+
let _ = echo_server.wait().await;
211221

212222
// Check that we got the expected output through the JMUX proxy.
213223
#[cfg(windows)]
@@ -216,77 +226,84 @@ fn jmux_proxy_read_hello_world() {
216226
client_output.success().stdout("hello world\n");
217227
}
218228

219-
#[test]
220-
fn jmux_proxy_write_hello_world() {
229+
#[tokio::test]
230+
async fn jmux_proxy_write_hello_world() {
231+
use tokio::io::AsyncReadExt as _;
232+
221233
// Find 3 available ports at once to avoid conflicts.
222234
let ports = find_unused_ports(3);
223235
let read_server_port = ports[0];
224236
let jmux_server_port = ports[1];
225237
let proxy_listen_port = ports[2];
226238

227239
// Start read server first.
228-
let mut read_server = jetsocat_cmd()
240+
let mut read_server = jetsocat_tokio_cmd()
229241
.env(
230242
"JETSOCAT_ARGS",
231243
format!("forward tcp-listen://127.0.0.1:{read_server_port} stdio --no-proxy"),
232244
)
245+
.stdin(std::process::Stdio::null())
233246
.stdout(std::process::Stdio::piped())
247+
.kill_on_drop(true)
234248
.spawn()
235249
.expect("failed to start read server");
236250

237-
// Give the read server time to start.
238-
std::thread::sleep(LISTENER_WAIT_DURATION);
251+
wait_for_port_bound(read_server_port).await.expect("read server ready");
239252

240253
// Start JMUX server that will accept JMUX connections.
241-
let mut jmux_server = jetsocat_cmd()
254+
let mut jmux_server = jetsocat_tokio_cmd()
242255
.env(
243256
"JETSOCAT_ARGS",
244257
format!("jmux-proxy tcp-listen://127.0.0.1:{jmux_server_port} --allow-all --no-proxy"),
245258
)
259+
.kill_on_drop(true)
246260
.spawn()
247261
.expect("failed to start JMUX server");
248262

249-
// Give the JMUX server time to start.
250-
std::thread::sleep(LISTENER_WAIT_DURATION);
263+
wait_for_port_bound(jmux_server_port).await.expect("JMUX server ready");
251264

252265
// Start JMUX client proxy that connects to the JMUX server and provides a local TCP listener.
253-
let mut jmux_client = jetsocat_cmd()
266+
let mut jmux_client = jetsocat_tokio_cmd()
254267
.env(
255268
"JETSOCAT_ARGS",
256269
format!(
257270
"jmux-proxy tcp://127.0.0.1:{jmux_server_port} tcp-listen://127.0.0.1:{proxy_listen_port}/127.0.0.1:{read_server_port} --no-proxy",
258271
),
259272
)
273+
.kill_on_drop(true)
260274
.spawn()
261275
.expect("failed to start JMUX client");
262276

263-
// Give the JMUX client time to establish connection and set up listener.
264-
std::thread::sleep(LISTENER_WAIT_DURATION);
277+
wait_for_port_bound(proxy_listen_port)
278+
.await
279+
.expect("JMUX client proxy ready");
265280

266281
// Connect to the JMUX client's local listener.
267282
jetsocat_assert_cmd()
268283
.env(
269284
"JETSOCAT_ARGS",
270-
format!("forward tcp://127.0.0.1:{proxy_listen_port} 'cmd://echo hello world'"),
285+
format!("forward tcp://127.0.0.1:{proxy_listen_port} 'cmd://echo hello world' --no-proxy"),
271286
)
272-
.timeout(COMMAND_TIMEOUT)
287+
.timeout(ASSERT_CMD_TIMEOUT)
273288
.assert()
274289
.success();
275290

276291
// Kill all processes.
277-
let _ = jmux_client.kill();
278-
let _ = jmux_server.kill();
279-
let _ = read_server.kill();
280-
let _ = jmux_client.wait();
281-
let _ = jmux_server.wait();
282-
let _ = read_server.wait();
292+
let _ = jmux_client.start_kill();
293+
let _ = jmux_server.start_kill();
294+
let _ = read_server.start_kill();
295+
let _ = jmux_client.wait().await;
296+
let _ = jmux_server.wait().await;
297+
let _ = read_server.wait().await;
283298

284299
// Check that the read server received the payload.
285300
let mut read_server_stdout = String::new();
286301
read_server
287302
.stdout
303+
.take()
288304
.unwrap()
289305
.read_to_string(&mut read_server_stdout)
306+
.await
290307
.unwrap();
291308
assert_eq!(read_server_stdout.trim(), "hello world");
292309
}
@@ -621,7 +638,7 @@ fn jetsocat_log_environment_variable() {
621638
outfile.display()
622639
),
623640
)
624-
.timeout(COMMAND_TIMEOUT)
641+
.timeout(ASSERT_CMD_TIMEOUT)
625642
.assert();
626643

627644
let stdout = std::str::from_utf8(&output.get_output().stdout).unwrap();
@@ -747,8 +764,9 @@ async fn mcp_proxy_smoke_test(#[values(true, false)] http_transport: bool) {
747764
let server = McpServer::new(transport);
748765
let _server_handle = server.start().expect("start MCP server");
749766

750-
// Give the server time to start.
751-
tokio::time::sleep(LISTENER_WAIT_DURATION).await;
767+
if !http_transport {
768+
wait_for_windows_named_pipe_server().await;
769+
}
752770

753771
// Start jetsocat mcp-proxy with stdio pipe and HTTP transport.
754772
let mut jetsocat_process = jetsocat_tokio_cmd()
@@ -819,8 +837,9 @@ async fn mcp_proxy_with_tools(#[values(true, false)] http_transport: bool) {
819837
McpServer::new(transport).with_config(ServerConfig::new().with_tool(EchoTool).with_tool(CalculatorTool));
820838
let _server_handle = server.start().expect("start MCP server");
821839

822-
// Give the server time to start.
823-
tokio::time::sleep(LISTENER_WAIT_DURATION).await;
840+
if !http_transport {
841+
wait_for_windows_named_pipe_server().await;
842+
}
824843

825844
// Start jetsocat mcp-proxy with stdio pipe and HTTP transport.
826845
let mut jetsocat_process = jetsocat_tokio_cmd()
@@ -962,8 +981,9 @@ async fn mcp_proxy_notification(#[values(true, false)] http_transport: bool) {
962981
McpServer::new(transport).with_config(ServerConfig::new().with_notification_handler(notification_handler));
963982
let _server_handle = server.start().expect("start MCP server");
964983

965-
// Give the server time to start.
966-
tokio::time::sleep(LISTENER_WAIT_DURATION).await;
984+
if !http_transport {
985+
wait_for_windows_named_pipe_server().await;
986+
}
967987

968988
// Start jetsocat mcp-proxy with stdio pipe and HTTP transport.
969989
let mut jetsocat_process = jetsocat_tokio_cmd()
@@ -1010,9 +1030,6 @@ async fn execute_mcp_request(request: &str) -> String {
10101030
let server = McpServer::new(DynMcpTransport::new_box(transport));
10111031
let server_handle = server.start().expect("start MCP server");
10121032

1013-
// Give the server time to start.
1014-
tokio::time::sleep(LISTENER_WAIT_DURATION).await;
1015-
10161033
// Start jetsocat mcp-proxy with stdio pipe and HTTP transport.
10171034
let mut jetsocat_process = jetsocat_tokio_cmd()
10181035
.args(["mcp-proxy", "stdio", &server_url, "--log-term", "--color=never"])
@@ -1028,7 +1045,7 @@ async fn execute_mcp_request(request: &str) -> String {
10281045
// Write the request.
10291046
stdin.write_all(request.as_bytes()).await.unwrap();
10301047

1031-
tokio::time::sleep(COMMAND_TIMEOUT).await;
1048+
tokio::time::sleep(MCP_REQUEST_SETTLE_DURATION).await;
10321049

10331050
// Shutdown the MCP server.
10341051
server_handle.shutdown();
@@ -1073,9 +1090,6 @@ async fn mcp_proxy_http_error() {
10731090
let server = McpServer::new(DynMcpTransport::new_box(transport));
10741091
let _server_handle = server.start().expect("start MCP server");
10751092

1076-
// Give the server time to start.
1077-
tokio::time::sleep(LISTENER_WAIT_DURATION).await;
1078-
10791093
// Start jetsocat mcp-proxy with stdio pipe and HTTP transport.
10801094
let mut jetsocat_process = jetsocat_tokio_cmd()
10811095
.args(["mcp-proxy", "stdio", &server_url])
@@ -1114,8 +1128,7 @@ async fn mcp_proxy_terminated_on_broken_pipe() {
11141128
let server = McpServer::new(DynMcpTransport::new_box(np_transport));
11151129
let server_handle = server.start().expect("start MCP server");
11161130

1117-
// Give the server time to start.
1118-
tokio::time::sleep(LISTENER_WAIT_DURATION).await;
1131+
wait_for_windows_named_pipe_server().await;
11191132

11201133
// Start jetsocat mcp-proxy with stdio pipe.
11211134
let mut jetsocat_process = jetsocat_tokio_cmd()
@@ -1141,8 +1154,8 @@ async fn mcp_proxy_terminated_on_broken_pipe() {
11411154
// Stop the MCP server.
11421155
server_handle.shutdown();
11431156

1144-
// Wait for server to shut down.
1145-
tokio::time::sleep(LISTENER_WAIT_DURATION).await;
1157+
// Wait for the named pipe instance to be torn down on Windows.
1158+
wait_for_windows_named_pipe_server().await;
11461159

11471160
// Try to send a request - this should fail with a broken pipe error.
11481161
// The proxy will detect this and send an error response, then close.

0 commit comments

Comments
 (0)