Skip to content

Commit 0013de0

Browse files
Fix socket test race: add server-ready synchronization before client connects
Agent-Logs-Url: https://github.com/acl-dev/open-coroutine/sessions/a2ca3c38-53d1-46ce-9e2c-d5e91dba0877 Co-authored-by: loongs-zhang <38336731+loongs-zhang@users.noreply.github.com>
1 parent b70b6ab commit 0013de0

4 files changed

Lines changed: 96 additions & 16 deletions

File tree

open-coroutine/examples/socket_co.rs

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,19 @@ use std::os::fd::AsRawFd;
66
use std::sync::{Arc, Condvar, Mutex};
77
use std::time::Duration;
88

9-
pub fn start_co_server<A: ToSocketAddrs>(addr: A, server_finished: Arc<(Mutex<bool>, Condvar)>) {
9+
pub fn start_co_server<A: ToSocketAddrs>(
10+
addr: A,
11+
server_finished: Arc<(Mutex<bool>, Condvar)>,
12+
server_started: Arc<(Mutex<bool>, Condvar)>,
13+
) {
1014
let listener = TcpListener::bind(addr).expect("start server failed");
15+
// Signal that server is ready to accept connections
16+
{
17+
let (lock, cvar) = &*server_started;
18+
let mut started = lock.lock().unwrap();
19+
*started = true;
20+
cvar.notify_one();
21+
}
1122
for stream in listener.incoming() {
1223
_ = task!(
1324
|mut socket| {
@@ -77,7 +88,14 @@ pub fn start_co_server<A: ToSocketAddrs>(addr: A, server_finished: Arc<(Mutex<bo
7788
}
7889
}
7990

80-
pub fn start_co_client<A: ToSocketAddrs>(addr: A) {
91+
pub fn start_co_client<A: ToSocketAddrs>(addr: A, server_started: Arc<(Mutex<bool>, Condvar)>) {
92+
// Wait for server to be ready
93+
{
94+
let (lock, cvar) = &*server_started;
95+
let _guard = cvar
96+
.wait_while(lock.lock().unwrap(), |started| !*started)
97+
.unwrap();
98+
}
8199
_ = task!(
82100
|mut stream| {
83101
let mut buffer1 = [0; 256];
@@ -157,13 +175,15 @@ pub fn main() -> std::io::Result<()> {
157175
let addr = "127.0.0.1:8999";
158176
let server_finished_pair = Arc::new((Mutex::new(true), Condvar::new()));
159177
let server_finished = Arc::clone(&server_finished_pair);
178+
let server_started_pair = Arc::new((Mutex::new(false), Condvar::new()));
179+
let server_started = Arc::clone(&server_started_pair);
160180
_ = std::thread::Builder::new()
161181
.name("crate_co_server".to_string())
162-
.spawn(move || start_co_server(addr, server_finished_pair))
182+
.spawn(move || start_co_server(addr, server_finished_pair, server_started_pair))
163183
.expect("failed to spawn thread");
164184
_ = std::thread::Builder::new()
165185
.name("crate_co_client".to_string())
166-
.spawn(move || start_co_client(addr))
186+
.spawn(move || start_co_client(addr, server_started))
167187
.expect("failed to spawn thread");
168188

169189
let (lock, cvar) = &*server_finished;

open-coroutine/examples/socket_co_client.rs

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,19 @@ use std::os::fd::AsRawFd;
66
use std::sync::{Arc, Condvar, Mutex};
77
use std::time::Duration;
88

9-
pub fn start_server<A: ToSocketAddrs>(addr: A, server_finished: Arc<(Mutex<bool>, Condvar)>) {
9+
pub fn start_server<A: ToSocketAddrs>(
10+
addr: A,
11+
server_finished: Arc<(Mutex<bool>, Condvar)>,
12+
server_started: Arc<(Mutex<bool>, Condvar)>,
13+
) {
1014
let listener = TcpListener::bind(addr).expect("start server failed");
15+
// Signal that server is ready to accept connections
16+
{
17+
let (lock, cvar) = &*server_started;
18+
let mut started = lock.lock().unwrap();
19+
*started = true;
20+
cvar.notify_one();
21+
}
1122
for stream in listener.incoming() {
1223
let mut socket = stream.expect("accept new connection failed");
1324
let mut buffer1 = [0; 256];
@@ -72,7 +83,14 @@ pub fn start_server<A: ToSocketAddrs>(addr: A, server_finished: Arc<(Mutex<bool>
7283
}
7384
}
7485

75-
pub fn start_co_client<A: ToSocketAddrs>(addr: A) {
86+
pub fn start_co_client<A: ToSocketAddrs>(addr: A, server_started: Arc<(Mutex<bool>, Condvar)>) {
87+
// Wait for server to be ready
88+
{
89+
let (lock, cvar) = &*server_started;
90+
let _guard = cvar
91+
.wait_while(lock.lock().unwrap(), |started| !*started)
92+
.unwrap();
93+
}
7694
_ = task!(
7795
|mut stream| {
7896
let mut buffer1 = [0; 256];
@@ -152,13 +170,15 @@ pub fn main() -> std::io::Result<()> {
152170
let addr = "127.0.0.1:8899";
153171
let server_finished_pair = Arc::new((Mutex::new(true), Condvar::new()));
154172
let server_finished = Arc::clone(&server_finished_pair);
173+
let server_started_pair = Arc::new((Mutex::new(false), Condvar::new()));
174+
let server_started = Arc::clone(&server_started_pair);
155175
_ = std::thread::Builder::new()
156176
.name("crate_server".to_string())
157-
.spawn(move || start_server(addr, server_finished_pair))
177+
.spawn(move || start_server(addr, server_finished_pair, server_started_pair))
158178
.expect("failed to spawn thread");
159179
_ = std::thread::Builder::new()
160180
.name("crate_co_client".to_string())
161-
.spawn(move || start_co_client(addr))
181+
.spawn(move || start_co_client(addr, server_started))
162182
.expect("failed to spawn thread");
163183

164184
let (lock, cvar) = &*server_finished;

open-coroutine/examples/socket_co_server.rs

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,19 @@ use std::os::fd::AsRawFd;
66
use std::sync::{Arc, Condvar, Mutex};
77
use std::time::Duration;
88

9-
pub fn start_co_server<A: ToSocketAddrs>(addr: A, server_finished: Arc<(Mutex<bool>, Condvar)>) {
9+
pub fn start_co_server<A: ToSocketAddrs>(
10+
addr: A,
11+
server_finished: Arc<(Mutex<bool>, Condvar)>,
12+
server_started: Arc<(Mutex<bool>, Condvar)>,
13+
) {
1014
let listener = TcpListener::bind(addr).expect("start server failed");
15+
// Signal that server is ready to accept connections
16+
{
17+
let (lock, cvar) = &*server_started;
18+
let mut started = lock.lock().unwrap();
19+
*started = true;
20+
cvar.notify_one();
21+
}
1122
for stream in listener.incoming() {
1223
_ = task!(
1324
|mut socket| {
@@ -77,7 +88,14 @@ pub fn start_co_server<A: ToSocketAddrs>(addr: A, server_finished: Arc<(Mutex<bo
7788
}
7889
}
7990

80-
pub fn start_client<A: ToSocketAddrs>(addr: A) {
91+
pub fn start_client<A: ToSocketAddrs>(addr: A, server_started: Arc<(Mutex<bool>, Condvar)>) {
92+
// Wait for server to be ready
93+
{
94+
let (lock, cvar) = &*server_started;
95+
let _guard = cvar
96+
.wait_while(lock.lock().unwrap(), |started| !*started)
97+
.unwrap();
98+
}
8199
let mut stream =
82100
open_coroutine::connect_timeout(addr, Duration::from_secs(3)).expect("connect failed");
83101
let mut buffer1 = [0; 256];
@@ -154,13 +172,15 @@ pub fn main() -> std::io::Result<()> {
154172
let addr = "127.0.0.1:8889";
155173
let server_finished_pair = Arc::new((Mutex::new(true), Condvar::new()));
156174
let server_finished = Arc::clone(&server_finished_pair);
175+
let server_started_pair = Arc::new((Mutex::new(false), Condvar::new()));
176+
let server_started = Arc::clone(&server_started_pair);
157177
_ = std::thread::Builder::new()
158178
.name("crate_co_server".to_string())
159-
.spawn(move || start_co_server(addr, server_finished_pair))
179+
.spawn(move || start_co_server(addr, server_finished_pair, server_started_pair))
160180
.expect("failed to spawn thread");
161181
_ = std::thread::Builder::new()
162182
.name("crate_client".to_string())
163-
.spawn(move || start_client(addr))
183+
.spawn(move || start_client(addr, server_started))
164184
.expect("failed to spawn thread");
165185

166186
let (lock, cvar) = &*server_finished;

open-coroutine/examples/socket_not_co.rs

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,19 @@ use std::os::fd::AsRawFd;
55
use std::sync::{Arc, Condvar, Mutex};
66
use std::time::Duration;
77

8-
fn start_server<A: ToSocketAddrs>(addr: A, server_finished: Arc<(Mutex<bool>, Condvar)>) {
8+
fn start_server<A: ToSocketAddrs>(
9+
addr: A,
10+
server_finished: Arc<(Mutex<bool>, Condvar)>,
11+
server_started: Arc<(Mutex<bool>, Condvar)>,
12+
) {
913
let listener = TcpListener::bind(addr).expect("start server failed");
14+
// Signal that server is ready to accept connections
15+
{
16+
let (lock, cvar) = &*server_started;
17+
let mut started = lock.lock().unwrap();
18+
*started = true;
19+
cvar.notify_one();
20+
}
1021
for stream in listener.incoming() {
1122
let mut socket = stream.expect("accept new connection failed");
1223
let mut buffer1 = [0; 256];
@@ -71,7 +82,14 @@ fn start_server<A: ToSocketAddrs>(addr: A, server_finished: Arc<(Mutex<bool>, Co
7182
}
7283
}
7384

74-
fn start_client<A: ToSocketAddrs>(addr: A) {
85+
fn start_client<A: ToSocketAddrs>(addr: A, server_started: Arc<(Mutex<bool>, Condvar)>) {
86+
// Wait for server to be ready
87+
{
88+
let (lock, cvar) = &*server_started;
89+
let _guard = cvar
90+
.wait_while(lock.lock().unwrap(), |started| !*started)
91+
.unwrap();
92+
}
7593
let mut stream =
7694
open_coroutine::connect_timeout(addr, Duration::from_secs(1)).expect("connect failed");
7795
let mut buffer1 = [0; 256];
@@ -148,13 +166,15 @@ pub fn main() -> std::io::Result<()> {
148166
let addr = "127.0.0.1:8888";
149167
let server_finished_pair = Arc::new((Mutex::new(true), Condvar::new()));
150168
let server_finished = Arc::clone(&server_finished_pair);
169+
let server_started_pair = Arc::new((Mutex::new(false), Condvar::new()));
170+
let server_started = Arc::clone(&server_started_pair);
151171
_ = std::thread::Builder::new()
152172
.name("crate_server".to_string())
153-
.spawn(move || start_server(addr, server_finished_pair))
173+
.spawn(move || start_server(addr, server_finished_pair, server_started_pair))
154174
.expect("failed to spawn thread");
155175
_ = std::thread::Builder::new()
156176
.name("crate_client".to_string())
157-
.spawn(move || start_client(addr))
177+
.spawn(move || start_client(addr, server_started))
158178
.expect("failed to spawn thread");
159179

160180
let (lock, cvar) = &*server_finished;

0 commit comments

Comments
 (0)