Skip to content

Commit 8bffd25

Browse files
authored
Merge branch 'master' into copilot/implement-preemptive-feature-windows
2 parents be9844b + 95b56bf commit 8bffd25

12 files changed

Lines changed: 211 additions & 77 deletions

File tree

core/src/co_pool/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,13 @@ impl<'p> CoroutinePool<'p> {
191191
self.task_queue.is_empty()
192192
}
193193

194+
/// Returns `true` if the local&global task queue is empty.
195+
///
196+
/// Don't care about other local queue, but do care about global queue.
197+
pub fn is_local_empty(&self) -> bool {
198+
self.task_queue.is_local_empty() && self.task_queue.is_global_empty()
199+
}
200+
194201
/// Returns the number of tasks owned by this pool.
195202
pub fn size(&self) -> usize {
196203
self.task_queue.len()

core/src/common/ordered_work_steal.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,11 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> {
185185
self.local_len() == 0
186186
}
187187

188+
/// Returns `true` if the global queue is empty.
189+
pub fn is_global_empty(&self) -> bool {
190+
self.shared.is_empty()
191+
}
192+
188193
/// Returns `true` if all the queues are empty.
189194
pub fn is_empty(&self) -> bool {
190195
self.len() == 0

core/src/net/event_loop.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,7 @@ impl<'e> EventLoop<'e> {
391391
);
392392
Self::init_current(consumer);
393393
while PoolState::Running == consumer.state()
394-
|| !consumer.is_empty()
394+
|| !consumer.is_local_empty()
395395
|| consumer.get_running_size() > 0
396396
{
397397
_ = consumer.wait_event(Some(SLICE));
@@ -444,7 +444,7 @@ impl<'e> EventLoop<'e> {
444444
return Err(Error::new(ErrorKind::TimedOut, "stop timeout !"));
445445
}
446446
self.wait_event(Some(Duration::from_nanos(left_time).min(SLICE)))?;
447-
if self.is_empty() && self.get_running_size() == 0 {
447+
if self.is_local_empty() && self.get_running_size() == 0 {
448448
assert_eq!(PoolState::Stopping, self.stopped()?);
449449
return Ok(());
450450
}

core/src/net/operator/linux/tests.rs

Lines changed: 37 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,9 @@ use io_uring::{opcode, squeue, types, IoUring, SubmissionQueue};
33
use slab::Slab;
44
use std::collections::VecDeque;
55
use std::io::{BufRead, BufReader, Write};
6-
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, TcpStream};
6+
use std::net::{SocketAddr, TcpListener, TcpStream};
77
use std::os::unix::io::{AsRawFd, RawFd};
8-
use std::sync::atomic::{AtomicBool, Ordering};
9-
use std::sync::Arc;
8+
use std::sync::{Arc, Condvar, Mutex};
109
use std::time::Duration;
1110
use std::{io, ptr};
1211

@@ -55,20 +54,26 @@ impl AcceptCount {
5554
}
5655
}
5756

58-
fn crate_server(port: u16, server_started: Arc<AtomicBool>) -> anyhow::Result<()> {
57+
fn crate_server(server_started: Arc<(Mutex<Option<SocketAddr>>, Condvar)>) -> anyhow::Result<()> {
5958
let mut ring: IoUring = IoUring::builder()
6059
.setup_sqpoll(1000)
6160
.setup_sqpoll_cpu(0)
6261
.build(1024)?;
63-
let listener = TcpListener::bind(("127.0.0.1", port))?;
62+
let listener = TcpListener::bind("127.0.0.1:0")?;
6463

6564
let mut backlog = VecDeque::new();
6665
let mut bufpool = Vec::with_capacity(64);
6766
let mut buf_alloc = Slab::with_capacity(64);
6867
let mut token_alloc = Slab::with_capacity(64);
6968

70-
println!("listen {}", listener.local_addr()?);
71-
server_started.store(true, Ordering::Release);
69+
let local_addr = listener.local_addr()?;
70+
println!("listen {}", local_addr);
71+
{
72+
let (lock, cvar) = &*server_started;
73+
let mut addr = lock.lock().unwrap();
74+
*addr = Some(local_addr);
75+
cvar.notify_one();
76+
}
7277

7378
let (submitter, mut sq, mut cq) = ring.split();
7479

@@ -238,12 +243,17 @@ fn crate_server(port: u16, server_started: Arc<AtomicBool>) -> anyhow::Result<()
238243
}
239244
}
240245

241-
fn crate_client(port: u16, server_started: Arc<AtomicBool>) {
246+
fn crate_client(server_started: Arc<(Mutex<Option<SocketAddr>>, Condvar)>) {
242247
//等服务端起来
243-
while !server_started.load(Ordering::Acquire) {}
244-
let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port);
248+
let socket = {
249+
let (lock, cvar) = &*server_started;
250+
let guard = cvar
251+
.wait_while(lock.lock().unwrap(), |addr| addr.is_none())
252+
.unwrap();
253+
guard.unwrap()
254+
};
245255
let mut stream = TcpStream::connect_timeout(&socket, Duration::from_secs(3))
246-
.unwrap_or_else(|_| panic!("connect to 127.0.0.1:{port} failed !"));
256+
.unwrap_or_else(|_| panic!("connect to {} failed !", socket));
247257
let mut data: [u8; 512] = [b'1'; 512];
248258
data[511] = b'\n';
249259
let mut buffer: Vec<u8> = Vec::with_capacity(512);
@@ -271,26 +281,31 @@ fn crate_client(port: u16, server_started: Arc<AtomicBool>) {
271281

272282
#[test]
273283
fn original() -> anyhow::Result<()> {
274-
let port = 7060;
275-
let server_started = Arc::new(AtomicBool::new(false));
284+
let server_started = Arc::new((Mutex::new(None), Condvar::new()));
276285
let clone = server_started.clone();
277-
let handle = std::thread::spawn(move || crate_server(port, clone));
278-
std::thread::spawn(move || crate_client(port, server_started))
286+
let handle = std::thread::spawn(move || crate_server(clone));
287+
std::thread::spawn(move || crate_client(server_started))
279288
.join()
280289
.expect("client has error");
281290
handle.join().expect("server has error")
282291
}
283292

284-
fn crate_server2(port: u16, server_started: Arc<AtomicBool>) -> anyhow::Result<()> {
293+
fn crate_server2(server_started: Arc<(Mutex<Option<SocketAddr>>, Condvar)>) -> anyhow::Result<()> {
285294
let operator = Operator::new(0)?;
286-
let listener = TcpListener::bind(("127.0.0.1", port))?;
295+
let listener = TcpListener::bind("127.0.0.1:0")?;
287296

288297
let mut bufpool = Vec::with_capacity(64);
289298
let mut buf_alloc = Slab::with_capacity(64);
290299
let mut token_alloc = Slab::with_capacity(64);
291300

292-
println!("listen {}", listener.local_addr()?);
293-
server_started.store(true, Ordering::Release);
301+
let local_addr = listener.local_addr()?;
302+
println!("listen {}", local_addr);
303+
{
304+
let (lock, cvar) = &*server_started;
305+
let mut addr = lock.lock().unwrap();
306+
*addr = Some(local_addr);
307+
cvar.notify_one();
308+
}
294309

295310
operator.accept4(
296311
token_alloc.insert(Token::Accept) as _,
@@ -406,11 +421,10 @@ fn crate_server2(port: u16, server_started: Arc<AtomicBool>) -> anyhow::Result<(
406421

407422
#[test]
408423
fn framework() -> anyhow::Result<()> {
409-
let port = 7061;
410-
let server_started = Arc::new(AtomicBool::new(false));
424+
let server_started = Arc::new((Mutex::new(None), Condvar::new()));
411425
let clone = server_started.clone();
412-
let handle = std::thread::spawn(move || crate_server2(port, clone));
413-
std::thread::spawn(move || crate_client(port, server_started))
426+
let handle = std::thread::spawn(move || crate_server2(clone));
427+
std::thread::spawn(move || crate_client(server_started))
414428
.join()
415429
.expect("client has error");
416430
handle.join().expect("server has error")

core/src/net/operator/windows/tests.rs

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
use crate::net::operator::Operator;
22
use slab::Slab;
33
use std::io::{BufRead, BufReader, Write};
4-
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, TcpStream};
4+
use std::net::{SocketAddr, TcpListener, TcpStream};
55
use std::os::windows::io::AsRawSocket;
6-
use std::sync::atomic::{AtomicBool, Ordering};
7-
use std::sync::Arc;
6+
use std::sync::{Arc, Condvar, Mutex};
87
use std::time::Duration;
98
use windows_sys::Win32::Networking::WinSock::{closesocket, recv, send, SOCKET};
109

@@ -23,12 +22,17 @@ enum Token {
2322
},
2423
}
2524

26-
fn crate_client(port: u16, server_started: Arc<AtomicBool>) {
25+
fn crate_client(server_started: Arc<(Mutex<Option<SocketAddr>>, Condvar)>) {
2726
//等服务端起来
28-
while !server_started.load(Ordering::Acquire) {}
29-
let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port);
27+
let socket = {
28+
let (lock, cvar) = &*server_started;
29+
let guard = cvar
30+
.wait_while(lock.lock().unwrap(), |addr| addr.is_none())
31+
.unwrap();
32+
guard.unwrap()
33+
};
3034
let mut stream = TcpStream::connect_timeout(&socket, Duration::from_secs(3))
31-
.unwrap_or_else(|_| panic!("connect to 127.0.0.1:{port} failed !"));
35+
.unwrap_or_else(|_| panic!("connect to {} failed !", socket));
3236
let mut data: [u8; 512] = [b'1'; 512];
3337
data[511] = b'\n';
3438
let mut buffer: Vec<u8> = Vec::with_capacity(512);
@@ -54,16 +58,22 @@ fn crate_client(port: u16, server_started: Arc<AtomicBool>) {
5458
println!("client closed");
5559
}
5660

57-
fn crate_server2(port: u16, server_started: Arc<AtomicBool>) -> anyhow::Result<()> {
61+
fn crate_server2(server_started: Arc<(Mutex<Option<SocketAddr>>, Condvar)>) -> anyhow::Result<()> {
5862
let operator = Operator::new(0)?;
59-
let listener = TcpListener::bind(("127.0.0.1", port))?;
63+
let listener = TcpListener::bind("127.0.0.1:0")?;
6064

6165
let mut bufpool = Vec::with_capacity(64);
6266
let mut buf_alloc = Slab::with_capacity(64);
6367
let mut token_alloc = Slab::with_capacity(64);
6468

65-
println!("listen {}", listener.local_addr()?);
66-
server_started.store(true, Ordering::Release);
69+
let local_addr = listener.local_addr()?;
70+
println!("listen {}", local_addr);
71+
{
72+
let (lock, cvar) = &*server_started;
73+
let mut addr = lock.lock().unwrap();
74+
*addr = Some(local_addr);
75+
cvar.notify_one();
76+
}
6777

6878
operator.accept(
6979
token_alloc.insert(Token::Accept) as _,
@@ -184,11 +194,10 @@ fn framework() -> anyhow::Result<()> {
184194
time::format_description::well_known::Rfc2822,
185195
))
186196
.try_init();
187-
let port = 7061;
188-
let server_started = Arc::new(AtomicBool::new(false));
197+
let server_started = Arc::new((Mutex::new(None), Condvar::new()));
189198
let clone = server_started.clone();
190-
let handle = std::thread::spawn(move || crate_server2(port, clone));
191-
std::thread::spawn(move || crate_client(port, server_started))
199+
let handle = std::thread::spawn(move || crate_server2(clone));
200+
std::thread::spawn(move || crate_client(server_started))
192201
.join()
193202
.expect("client has error");
194203
handle.join().expect("server has error")

core/src/syscall/unix/connect.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,12 @@ impl<I: ConnectSyscall> ConnectSyscall for NioConnectSyscall<I> {
6565
{
6666
break;
6767
}
68+
unsafe {
69+
let mut address = std::mem::zeroed();
70+
let mut address_len = socklen_t::try_from(size_of_val(&address)).expect("overflow");
71+
r = libc::getpeername(fd, &raw mut address, &raw mut address_len);
72+
}
73+
let connected = r == 0;
6874
let mut err = 0;
6975
unsafe {
7076
let mut len = socklen_t::try_from(size_of_val(&err)).expect("overflow");
@@ -85,10 +91,9 @@ impl<I: ConnectSyscall> ConnectSyscall for NioConnectSyscall<I> {
8591
r = -1;
8692
break;
8793
}
88-
unsafe {
89-
let mut address = std::mem::zeroed();
90-
let mut address_len = socklen_t::try_from(size_of_val(&address)).expect("overflow");
91-
r = libc::getpeername(fd, &raw mut address, &raw mut address_len);
94+
if !connected {
95+
set_errno(libc::EINPROGRESS);
96+
r = -1;
9297
}
9398
} else if errno != Some(libc::EINTR) {
9499
break;

core/src/syscall/windows/connect.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,12 @@ impl<I: ConnectSyscall> ConnectSyscall for NioConnectSyscall<I> {
6363
).is_err() {
6464
break;
6565
}
66+
unsafe {
67+
let mut address = std::mem::zeroed();
68+
let mut address_len = c_int::try_from(size_of_val(&address)).expect("overflow");
69+
r = getpeername(fd, &raw mut address, &raw mut address_len);
70+
}
71+
let connected = r == 0;
6672
let mut err = 0;
6773
unsafe {
6874
let mut len = c_int::try_from(size_of_val(&err)).expect("overflow");
@@ -83,10 +89,9 @@ impl<I: ConnectSyscall> ConnectSyscall for NioConnectSyscall<I> {
8389
r = -1;
8490
break;
8591
}
86-
unsafe {
87-
let mut address = std::mem::zeroed();
88-
let mut address_len = c_int::try_from(size_of_val(&address)).expect("overflow");
89-
r = getpeername(fd, &raw mut address, &raw mut address_len);
92+
if !connected {
93+
set_errno(WSAEINPROGRESS.try_into().expect("overflow"));
94+
r = -1;
9095
}
9196
} else if errno != Some(WSAEINTR) {
9297
break;

core/src/syscall/windows/mod.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,21 @@ macro_rules! impl_facade {
8686
}
8787
}
8888
let r = self.inner.$syscall(fn_ptr, $($arg, )*);
89+
// Save errno immediately—logging and coroutine bookkeeping
90+
// call Win32 APIs (e.g. CreateFileW) that clobber GetLastError().
91+
let saved_errno = std::io::Error::last_os_error();
8992
if let Some(co) = $crate::scheduler::SchedulableCoroutine::current() {
9093
if co.running().is_err() {
9194
$crate::error!("{} change to running state failed !", co.name());
9295
}
9396
}
94-
$crate::info!("exit syscall {} {:?} {}", syscall, r, std::io::Error::last_os_error());
97+
$crate::info!("exit syscall {} {:?} {}", syscall, r, saved_errno);
98+
// Restore errno so callers see the correct error.
99+
if let Some(e) = saved_errno.raw_os_error() {
100+
$crate::syscall::set_errno(
101+
u32::try_from(e).unwrap_or_default()
102+
);
103+
}
95104
r
96105
}
97106
}

open-coroutine/examples/socket_co.rs

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,24 @@
11
use open_coroutine::task;
22
use std::io::{Error, IoSlice, IoSliceMut, Read, Write};
3-
use std::net::{Shutdown, TcpListener, ToSocketAddrs};
3+
use std::net::{Shutdown, SocketAddr, TcpListener};
44
#[cfg(unix)]
55
use std::os::fd::AsRawFd;
66
use std::sync::{Arc, Condvar, Mutex};
77
use std::time::Duration;
88

9-
pub fn start_co_server<A: ToSocketAddrs>(addr: A, server_finished: Arc<(Mutex<bool>, Condvar)>) {
10-
let listener = TcpListener::bind(addr).expect("start server failed");
9+
pub fn start_co_server(
10+
server_finished: Arc<(Mutex<bool>, Condvar)>,
11+
server_started: Arc<(Mutex<Option<SocketAddr>>, Condvar)>,
12+
) {
13+
let listener = TcpListener::bind("127.0.0.1:0").expect("start server failed");
14+
let local_addr = listener.local_addr().expect("get local addr failed");
15+
// Signal that server is ready to accept connections
16+
{
17+
let (lock, cvar) = &*server_started;
18+
let mut addr = lock.lock().unwrap();
19+
*addr = Some(local_addr);
20+
cvar.notify_one();
21+
}
1122
for stream in listener.incoming() {
1223
_ = task!(
1324
|mut socket| {
@@ -77,7 +88,15 @@ pub fn start_co_server<A: ToSocketAddrs>(addr: A, server_finished: Arc<(Mutex<bo
7788
}
7889
}
7990

80-
pub fn start_co_client<A: ToSocketAddrs>(addr: A) {
91+
pub fn start_co_client(server_started: Arc<(Mutex<Option<SocketAddr>>, Condvar)>) {
92+
// Wait for server to be ready and get the actual address
93+
let addr = {
94+
let (lock, cvar) = &*server_started;
95+
let guard = cvar
96+
.wait_while(lock.lock().unwrap(), |addr| addr.is_none())
97+
.unwrap();
98+
guard.unwrap()
99+
};
81100
_ = task!(
82101
|mut stream| {
83102
let mut buffer1 = [0; 256];
@@ -154,16 +173,17 @@ pub fn start_co_client<A: ToSocketAddrs>(addr: A) {
154173

155174
#[open_coroutine::main(event_loop_size = 1, max_size = 2)]
156175
pub fn main() -> std::io::Result<()> {
157-
let addr = "127.0.0.1:8999";
158176
let server_finished_pair = Arc::new((Mutex::new(true), Condvar::new()));
159177
let server_finished = Arc::clone(&server_finished_pair);
178+
let server_started_pair = Arc::new((Mutex::new(None), Condvar::new()));
179+
let server_started = Arc::clone(&server_started_pair);
160180
_ = std::thread::Builder::new()
161181
.name("crate_co_server".to_string())
162-
.spawn(move || start_co_server(addr, server_finished_pair))
182+
.spawn(move || start_co_server(server_finished_pair, server_started_pair))
163183
.expect("failed to spawn thread");
164184
_ = std::thread::Builder::new()
165185
.name("crate_co_client".to_string())
166-
.spawn(move || start_co_client(addr))
186+
.spawn(move || start_co_client(server_started))
167187
.expect("failed to spawn thread");
168188

169189
let (lock, cvar) = &*server_finished;

0 commit comments

Comments
 (0)