Skip to content

Commit e52143b

Browse files
Use dynamic port allocation in operator tests (linux + windows) to prevent port conflicts
Agent-Logs-Url: https://github.com/acl-dev/open-coroutine/sessions/034a9a08-a55e-4f25-b34b-5d4982dfa51a Co-authored-by: loongs-zhang <38336731+loongs-zhang@users.noreply.github.com>
1 parent ea2ce66 commit e52143b

2 files changed

Lines changed: 61 additions & 38 deletions

File tree

core/src/net/operator/linux/tests.rs

Lines changed: 37 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,9 @@ use io_uring::{opcode, squeue, types, IoUring, SubmissionQueue};
33
use slab::Slab;
44
use std::collections::VecDeque;
55
use std::io::{BufRead, BufReader, Write};
6-
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, TcpStream};
6+
use std::net::{SocketAddr, TcpListener, TcpStream};
77
use std::os::unix::io::{AsRawFd, RawFd};
8-
use std::sync::atomic::{AtomicBool, Ordering};
9-
use std::sync::Arc;
8+
use std::sync::{Arc, Condvar, Mutex};
109
use std::time::Duration;
1110
use std::{io, ptr};
1211

@@ -55,20 +54,26 @@ impl AcceptCount {
5554
}
5655
}
5756

58-
fn crate_server(port: u16, server_started: Arc<AtomicBool>) -> anyhow::Result<()> {
57+
fn crate_server(server_started: Arc<(Mutex<Option<SocketAddr>>, Condvar)>) -> anyhow::Result<()> {
5958
let mut ring: IoUring = IoUring::builder()
6059
.setup_sqpoll(1000)
6160
.setup_sqpoll_cpu(0)
6261
.build(1024)?;
63-
let listener = TcpListener::bind(("127.0.0.1", port))?;
62+
let listener = TcpListener::bind("127.0.0.1:0")?;
6463

6564
let mut backlog = VecDeque::new();
6665
let mut bufpool = Vec::with_capacity(64);
6766
let mut buf_alloc = Slab::with_capacity(64);
6867
let mut token_alloc = Slab::with_capacity(64);
6968

70-
println!("listen {}", listener.local_addr()?);
71-
server_started.store(true, Ordering::Release);
69+
let local_addr = listener.local_addr()?;
70+
println!("listen {}", local_addr);
71+
{
72+
let (lock, cvar) = &*server_started;
73+
let mut addr = lock.lock().unwrap();
74+
*addr = Some(local_addr);
75+
cvar.notify_one();
76+
}
7277

7378
let (submitter, mut sq, mut cq) = ring.split();
7479

@@ -238,12 +243,17 @@ fn crate_server(port: u16, server_started: Arc<AtomicBool>) -> anyhow::Result<()
238243
}
239244
}
240245

241-
fn crate_client(port: u16, server_started: Arc<AtomicBool>) {
246+
fn crate_client(server_started: Arc<(Mutex<Option<SocketAddr>>, Condvar)>) {
242247
//等服务端起来
243-
while !server_started.load(Ordering::Acquire) {}
244-
let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port);
248+
let socket = {
249+
let (lock, cvar) = &*server_started;
250+
let guard = cvar
251+
.wait_while(lock.lock().unwrap(), |addr| addr.is_none())
252+
.unwrap();
253+
guard.unwrap()
254+
};
245255
let mut stream = TcpStream::connect_timeout(&socket, Duration::from_secs(3))
246-
.unwrap_or_else(|_| panic!("connect to 127.0.0.1:{port} failed !"));
256+
.unwrap_or_else(|_| panic!("connect to {} failed !", socket));
247257
let mut data: [u8; 512] = [b'1'; 512];
248258
data[511] = b'\n';
249259
let mut buffer: Vec<u8> = Vec::with_capacity(512);
@@ -271,26 +281,31 @@ fn crate_client(port: u16, server_started: Arc<AtomicBool>) {
271281

272282
#[test]
273283
fn original() -> anyhow::Result<()> {
274-
let port = 7060;
275-
let server_started = Arc::new(AtomicBool::new(false));
284+
let server_started = Arc::new((Mutex::new(None), Condvar::new()));
276285
let clone = server_started.clone();
277-
let handle = std::thread::spawn(move || crate_server(port, clone));
278-
std::thread::spawn(move || crate_client(port, server_started))
286+
let handle = std::thread::spawn(move || crate_server(clone));
287+
std::thread::spawn(move || crate_client(server_started))
279288
.join()
280289
.expect("client has error");
281290
handle.join().expect("server has error")
282291
}
283292

284-
fn crate_server2(port: u16, server_started: Arc<AtomicBool>) -> anyhow::Result<()> {
293+
fn crate_server2(server_started: Arc<(Mutex<Option<SocketAddr>>, Condvar)>) -> anyhow::Result<()> {
285294
let operator = Operator::new(0)?;
286-
let listener = TcpListener::bind(("127.0.0.1", port))?;
295+
let listener = TcpListener::bind("127.0.0.1:0")?;
287296

288297
let mut bufpool = Vec::with_capacity(64);
289298
let mut buf_alloc = Slab::with_capacity(64);
290299
let mut token_alloc = Slab::with_capacity(64);
291300

292-
println!("listen {}", listener.local_addr()?);
293-
server_started.store(true, Ordering::Release);
301+
let local_addr = listener.local_addr()?;
302+
println!("listen {}", local_addr);
303+
{
304+
let (lock, cvar) = &*server_started;
305+
let mut addr = lock.lock().unwrap();
306+
*addr = Some(local_addr);
307+
cvar.notify_one();
308+
}
294309

295310
operator.accept4(
296311
token_alloc.insert(Token::Accept) as _,
@@ -406,11 +421,10 @@ fn crate_server2(port: u16, server_started: Arc<AtomicBool>) -> anyhow::Result<(
406421

407422
#[test]
408423
fn framework() -> anyhow::Result<()> {
409-
let port = 7061;
410-
let server_started = Arc::new(AtomicBool::new(false));
424+
let server_started = Arc::new((Mutex::new(None), Condvar::new()));
411425
let clone = server_started.clone();
412-
let handle = std::thread::spawn(move || crate_server2(port, clone));
413-
std::thread::spawn(move || crate_client(port, server_started))
426+
let handle = std::thread::spawn(move || crate_server2(clone));
427+
std::thread::spawn(move || crate_client(server_started))
414428
.join()
415429
.expect("client has error");
416430
handle.join().expect("server has error")

core/src/net/operator/windows/tests.rs

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
use crate::net::operator::Operator;
22
use slab::Slab;
33
use std::io::{BufRead, BufReader, Write};
4-
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, TcpStream};
4+
use std::net::{SocketAddr, TcpListener, TcpStream};
55
use std::os::windows::io::AsRawSocket;
6-
use std::sync::atomic::{AtomicBool, Ordering};
7-
use std::sync::Arc;
6+
use std::sync::{Arc, Condvar, Mutex};
87
use std::time::Duration;
98
use windows_sys::Win32::Networking::WinSock::{closesocket, recv, send, SOCKET};
109

@@ -23,12 +22,17 @@ enum Token {
2322
},
2423
}
2524

26-
fn crate_client(port: u16, server_started: Arc<AtomicBool>) {
25+
fn crate_client(server_started: Arc<(Mutex<Option<SocketAddr>>, Condvar)>) {
2726
//等服务端起来
28-
while !server_started.load(Ordering::Acquire) {}
29-
let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port);
27+
let socket = {
28+
let (lock, cvar) = &*server_started;
29+
let guard = cvar
30+
.wait_while(lock.lock().unwrap(), |addr| addr.is_none())
31+
.unwrap();
32+
guard.unwrap()
33+
};
3034
let mut stream = TcpStream::connect_timeout(&socket, Duration::from_secs(3))
31-
.unwrap_or_else(|_| panic!("connect to 127.0.0.1:{port} failed !"));
35+
.unwrap_or_else(|_| panic!("connect to {} failed !", socket));
3236
let mut data: [u8; 512] = [b'1'; 512];
3337
data[511] = b'\n';
3438
let mut buffer: Vec<u8> = Vec::with_capacity(512);
@@ -54,16 +58,22 @@ fn crate_client(port: u16, server_started: Arc<AtomicBool>) {
5458
println!("client closed");
5559
}
5660

57-
fn crate_server2(port: u16, server_started: Arc<AtomicBool>) -> anyhow::Result<()> {
61+
fn crate_server2(server_started: Arc<(Mutex<Option<SocketAddr>>, Condvar)>) -> anyhow::Result<()> {
5862
let operator = Operator::new(0)?;
59-
let listener = TcpListener::bind(("127.0.0.1", port))?;
63+
let listener = TcpListener::bind("127.0.0.1:0")?;
6064

6165
let mut bufpool = Vec::with_capacity(64);
6266
let mut buf_alloc = Slab::with_capacity(64);
6367
let mut token_alloc = Slab::with_capacity(64);
6468

65-
println!("listen {}", listener.local_addr()?);
66-
server_started.store(true, Ordering::Release);
69+
let local_addr = listener.local_addr()?;
70+
println!("listen {}", local_addr);
71+
{
72+
let (lock, cvar) = &*server_started;
73+
let mut addr = lock.lock().unwrap();
74+
*addr = Some(local_addr);
75+
cvar.notify_one();
76+
}
6777

6878
operator.accept(
6979
token_alloc.insert(Token::Accept) as _,
@@ -184,11 +194,10 @@ fn framework() -> anyhow::Result<()> {
184194
time::format_description::well_known::Rfc2822,
185195
))
186196
.try_init();
187-
let port = 7061;
188-
let server_started = Arc::new(AtomicBool::new(false));
197+
let server_started = Arc::new((Mutex::new(None), Condvar::new()));
189198
let clone = server_started.clone();
190-
let handle = std::thread::spawn(move || crate_server2(port, clone));
191-
std::thread::spawn(move || crate_client(port, server_started))
199+
let handle = std::thread::spawn(move || crate_server2(clone));
200+
std::thread::spawn(move || crate_client(server_started))
192201
.join()
193202
.expect("client has error");
194203
handle.join().expect("server has error")

0 commit comments

Comments
 (0)