Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions core/src/co_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,13 @@ impl<'p> CoroutinePool<'p> {
self.task_queue.is_empty()
}

/// Returns `true` if the local&global task queue is empty.
///
/// Don't care about other local queue, but do care about global queue.
pub fn is_local_empty(&self) -> bool {
self.task_queue.is_local_empty() && self.task_queue.is_global_empty()
}

/// Returns the number of tasks owned by this pool.
pub fn size(&self) -> usize {
self.task_queue.len()
Expand Down
5 changes: 5 additions & 0 deletions core/src/common/ordered_work_steal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> {
self.local_len() == 0
}

/// Returns `true` if the global queue is empty.
pub fn is_global_empty(&self) -> bool {
self.shared.is_empty()
}

/// Returns `true` if all the queues are empty.
pub fn is_empty(&self) -> bool {
self.len() == 0
Expand Down
4 changes: 2 additions & 2 deletions core/src/net/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ impl<'e> EventLoop<'e> {
);
Self::init_current(consumer);
while PoolState::Running == consumer.state()
|| !consumer.is_empty()
|| !consumer.is_local_empty()
|| consumer.get_running_size() > 0
{
_ = consumer.wait_event(Some(SLICE));
Expand Down Expand Up @@ -444,7 +444,7 @@ impl<'e> EventLoop<'e> {
return Err(Error::new(ErrorKind::TimedOut, "stop timeout !"));
}
self.wait_event(Some(Duration::from_nanos(left_time).min(SLICE)))?;
if self.is_empty() && self.get_running_size() == 0 {
if self.is_local_empty() && self.get_running_size() == 0 {
assert_eq!(PoolState::Stopping, self.stopped()?);
return Ok(());
}
Expand Down
60 changes: 37 additions & 23 deletions core/src/net/operator/linux/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ use io_uring::{opcode, squeue, types, IoUring, SubmissionQueue};
use slab::Slab;
use std::collections::VecDeque;
use std::io::{BufRead, BufReader, Write};
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, TcpStream};
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::os::unix::io::{AsRawFd, RawFd};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Condvar, Mutex};
use std::time::Duration;
use std::{io, ptr};

Expand Down Expand Up @@ -55,20 +54,26 @@ impl AcceptCount {
}
}

fn crate_server(port: u16, server_started: Arc<AtomicBool>) -> anyhow::Result<()> {
fn crate_server(server_started: Arc<(Mutex<Option<SocketAddr>>, Condvar)>) -> anyhow::Result<()> {
let mut ring: IoUring = IoUring::builder()
.setup_sqpoll(1000)
.setup_sqpoll_cpu(0)
.build(1024)?;
let listener = TcpListener::bind(("127.0.0.1", port))?;
let listener = TcpListener::bind("127.0.0.1:0")?;

let mut backlog = VecDeque::new();
let mut bufpool = Vec::with_capacity(64);
let mut buf_alloc = Slab::with_capacity(64);
let mut token_alloc = Slab::with_capacity(64);

println!("listen {}", listener.local_addr()?);
server_started.store(true, Ordering::Release);
let local_addr = listener.local_addr()?;
println!("listen {}", local_addr);
{
let (lock, cvar) = &*server_started;
let mut addr = lock.lock().unwrap();
*addr = Some(local_addr);
cvar.notify_one();
}

let (submitter, mut sq, mut cq) = ring.split();

Expand Down Expand Up @@ -238,12 +243,17 @@ fn crate_server(port: u16, server_started: Arc<AtomicBool>) -> anyhow::Result<()
}
}

fn crate_client(port: u16, server_started: Arc<AtomicBool>) {
fn crate_client(server_started: Arc<(Mutex<Option<SocketAddr>>, Condvar)>) {
//等服务端起来
while !server_started.load(Ordering::Acquire) {}
let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port);
let socket = {
let (lock, cvar) = &*server_started;
let guard = cvar
.wait_while(lock.lock().unwrap(), |addr| addr.is_none())
.unwrap();
guard.unwrap()
};
let mut stream = TcpStream::connect_timeout(&socket, Duration::from_secs(3))
.unwrap_or_else(|_| panic!("connect to 127.0.0.1:{port} failed !"));
.unwrap_or_else(|_| panic!("connect to {} failed !", socket));
let mut data: [u8; 512] = [b'1'; 512];
data[511] = b'\n';
let mut buffer: Vec<u8> = Vec::with_capacity(512);
Expand Down Expand Up @@ -271,26 +281,31 @@ fn crate_client(port: u16, server_started: Arc<AtomicBool>) {

#[test]
fn original() -> anyhow::Result<()> {
let port = 7060;
let server_started = Arc::new(AtomicBool::new(false));
let server_started = Arc::new((Mutex::new(None), Condvar::new()));
let clone = server_started.clone();
let handle = std::thread::spawn(move || crate_server(port, clone));
std::thread::spawn(move || crate_client(port, server_started))
let handle = std::thread::spawn(move || crate_server(clone));
std::thread::spawn(move || crate_client(server_started))
.join()
.expect("client has error");
handle.join().expect("server has error")
}

fn crate_server2(port: u16, server_started: Arc<AtomicBool>) -> anyhow::Result<()> {
fn crate_server2(server_started: Arc<(Mutex<Option<SocketAddr>>, Condvar)>) -> anyhow::Result<()> {
let operator = Operator::new(0)?;
let listener = TcpListener::bind(("127.0.0.1", port))?;
let listener = TcpListener::bind("127.0.0.1:0")?;

let mut bufpool = Vec::with_capacity(64);
let mut buf_alloc = Slab::with_capacity(64);
let mut token_alloc = Slab::with_capacity(64);

println!("listen {}", listener.local_addr()?);
server_started.store(true, Ordering::Release);
let local_addr = listener.local_addr()?;
println!("listen {}", local_addr);
{
let (lock, cvar) = &*server_started;
let mut addr = lock.lock().unwrap();
*addr = Some(local_addr);
cvar.notify_one();
}

operator.accept4(
token_alloc.insert(Token::Accept) as _,
Expand Down Expand Up @@ -406,11 +421,10 @@ fn crate_server2(port: u16, server_started: Arc<AtomicBool>) -> anyhow::Result<(

#[test]
fn framework() -> anyhow::Result<()> {
let port = 7061;
let server_started = Arc::new(AtomicBool::new(false));
let server_started = Arc::new((Mutex::new(None), Condvar::new()));
let clone = server_started.clone();
let handle = std::thread::spawn(move || crate_server2(port, clone));
std::thread::spawn(move || crate_client(port, server_started))
let handle = std::thread::spawn(move || crate_server2(clone));
std::thread::spawn(move || crate_client(server_started))
.join()
.expect("client has error");
handle.join().expect("server has error")
Expand Down
39 changes: 24 additions & 15 deletions core/src/net/operator/windows/tests.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use crate::net::operator::Operator;
use slab::Slab;
use std::io::{BufRead, BufReader, Write};
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, TcpStream};
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::os::windows::io::AsRawSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Condvar, Mutex};
use std::time::Duration;
use windows_sys::Win32::Networking::WinSock::{closesocket, recv, send, SOCKET};

Expand All @@ -23,12 +22,17 @@ enum Token {
},
}

fn crate_client(port: u16, server_started: Arc<AtomicBool>) {
fn crate_client(server_started: Arc<(Mutex<Option<SocketAddr>>, Condvar)>) {
//等服务端起来
while !server_started.load(Ordering::Acquire) {}
let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port);
let socket = {
let (lock, cvar) = &*server_started;
let guard = cvar
.wait_while(lock.lock().unwrap(), |addr| addr.is_none())
.unwrap();
guard.unwrap()
};
let mut stream = TcpStream::connect_timeout(&socket, Duration::from_secs(3))
.unwrap_or_else(|_| panic!("connect to 127.0.0.1:{port} failed !"));
.unwrap_or_else(|_| panic!("connect to {} failed !", socket));
let mut data: [u8; 512] = [b'1'; 512];
data[511] = b'\n';
let mut buffer: Vec<u8> = Vec::with_capacity(512);
Expand All @@ -54,16 +58,22 @@ fn crate_client(port: u16, server_started: Arc<AtomicBool>) {
println!("client closed");
}

fn crate_server2(port: u16, server_started: Arc<AtomicBool>) -> anyhow::Result<()> {
fn crate_server2(server_started: Arc<(Mutex<Option<SocketAddr>>, Condvar)>) -> anyhow::Result<()> {
let operator = Operator::new(0)?;
let listener = TcpListener::bind(("127.0.0.1", port))?;
let listener = TcpListener::bind("127.0.0.1:0")?;

let mut bufpool = Vec::with_capacity(64);
let mut buf_alloc = Slab::with_capacity(64);
let mut token_alloc = Slab::with_capacity(64);

println!("listen {}", listener.local_addr()?);
server_started.store(true, Ordering::Release);
let local_addr = listener.local_addr()?;
println!("listen {}", local_addr);
{
let (lock, cvar) = &*server_started;
let mut addr = lock.lock().unwrap();
*addr = Some(local_addr);
cvar.notify_one();
}

operator.accept(
token_alloc.insert(Token::Accept) as _,
Expand Down Expand Up @@ -184,11 +194,10 @@ fn framework() -> anyhow::Result<()> {
time::format_description::well_known::Rfc2822,
))
.try_init();
let port = 7061;
let server_started = Arc::new(AtomicBool::new(false));
let server_started = Arc::new((Mutex::new(None), Condvar::new()));
let clone = server_started.clone();
let handle = std::thread::spawn(move || crate_server2(port, clone));
std::thread::spawn(move || crate_client(port, server_started))
let handle = std::thread::spawn(move || crate_server2(clone));
std::thread::spawn(move || crate_client(server_started))
.join()
.expect("client has error");
handle.join().expect("server has error")
Expand Down
13 changes: 9 additions & 4 deletions core/src/syscall/unix/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ impl<I: ConnectSyscall> ConnectSyscall for NioConnectSyscall<I> {
{
break;
}
unsafe {
let mut address = std::mem::zeroed();
let mut address_len = socklen_t::try_from(size_of_val(&address)).expect("overflow");
r = libc::getpeername(fd, &raw mut address, &raw mut address_len);
}
let connected = r == 0;
let mut err = 0;
unsafe {
let mut len = socklen_t::try_from(size_of_val(&err)).expect("overflow");
Expand All @@ -85,10 +91,9 @@ impl<I: ConnectSyscall> ConnectSyscall for NioConnectSyscall<I> {
r = -1;
break;
}
unsafe {
let mut address = std::mem::zeroed();
let mut address_len = socklen_t::try_from(size_of_val(&address)).expect("overflow");
r = libc::getpeername(fd, &raw mut address, &raw mut address_len);
if !connected {
set_errno(libc::EINPROGRESS);
r = -1;
}
} else if errno != Some(libc::EINTR) {
break;
Expand Down
13 changes: 9 additions & 4 deletions core/src/syscall/windows/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ impl<I: ConnectSyscall> ConnectSyscall for NioConnectSyscall<I> {
).is_err() {
break;
}
unsafe {
let mut address = std::mem::zeroed();
let mut address_len = c_int::try_from(size_of_val(&address)).expect("overflow");
r = getpeername(fd, &raw mut address, &raw mut address_len);
}
let connected = r == 0;
let mut err = 0;
unsafe {
let mut len = c_int::try_from(size_of_val(&err)).expect("overflow");
Expand All @@ -83,10 +89,9 @@ impl<I: ConnectSyscall> ConnectSyscall for NioConnectSyscall<I> {
r = -1;
break;
}
unsafe {
let mut address = std::mem::zeroed();
let mut address_len = c_int::try_from(size_of_val(&address)).expect("overflow");
r = getpeername(fd, &raw mut address, &raw mut address_len);
if !connected {
set_errno(WSAEINPROGRESS.try_into().expect("overflow"));
r = -1;
}
} else if errno != Some(WSAEINTR) {
break;
Expand Down
11 changes: 10 additions & 1 deletion core/src/syscall/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,21 @@ macro_rules! impl_facade {
}
}
let r = self.inner.$syscall(fn_ptr, $($arg, )*);
// Save errno immediately—logging and coroutine bookkeeping
// call Win32 APIs (e.g. CreateFileW) that clobber GetLastError().
let saved_errno = std::io::Error::last_os_error();
if let Some(co) = $crate::scheduler::SchedulableCoroutine::current() {
if co.running().is_err() {
$crate::error!("{} change to running state failed !", co.name());
}
}
$crate::info!("exit syscall {} {:?} {}", syscall, r, std::io::Error::last_os_error());
$crate::info!("exit syscall {} {:?} {}", syscall, r, saved_errno);
// Restore errno so callers see the correct error.
if let Some(e) = saved_errno.raw_os_error() {
$crate::syscall::set_errno(
u32::try_from(e).unwrap_or_default()
);
}
r
}
}
Expand Down
34 changes: 27 additions & 7 deletions open-coroutine/examples/socket_co.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,24 @@
use open_coroutine::task;
use std::io::{Error, IoSlice, IoSliceMut, Read, Write};
use std::net::{Shutdown, TcpListener, ToSocketAddrs};
use std::net::{Shutdown, SocketAddr, TcpListener};
#[cfg(unix)]
use std::os::fd::AsRawFd;
use std::sync::{Arc, Condvar, Mutex};
use std::time::Duration;

pub fn start_co_server<A: ToSocketAddrs>(addr: A, server_finished: Arc<(Mutex<bool>, Condvar)>) {
let listener = TcpListener::bind(addr).expect("start server failed");
pub fn start_co_server(
server_finished: Arc<(Mutex<bool>, Condvar)>,
server_started: Arc<(Mutex<Option<SocketAddr>>, Condvar)>,
) {
let listener = TcpListener::bind("127.0.0.1:0").expect("start server failed");
let local_addr = listener.local_addr().expect("get local addr failed");
// Signal that server is ready to accept connections
{
let (lock, cvar) = &*server_started;
let mut addr = lock.lock().unwrap();
*addr = Some(local_addr);
cvar.notify_one();
}
for stream in listener.incoming() {
_ = task!(
|mut socket| {
Expand Down Expand Up @@ -77,7 +88,15 @@ pub fn start_co_server<A: ToSocketAddrs>(addr: A, server_finished: Arc<(Mutex<bo
}
}

pub fn start_co_client<A: ToSocketAddrs>(addr: A) {
pub fn start_co_client(server_started: Arc<(Mutex<Option<SocketAddr>>, Condvar)>) {
// Wait for server to be ready and get the actual address
let addr = {
let (lock, cvar) = &*server_started;
let guard = cvar
.wait_while(lock.lock().unwrap(), |addr| addr.is_none())
.unwrap();
guard.unwrap()
};
_ = task!(
|mut stream| {
let mut buffer1 = [0; 256];
Expand Down Expand Up @@ -154,16 +173,17 @@ pub fn start_co_client<A: ToSocketAddrs>(addr: A) {

#[open_coroutine::main(event_loop_size = 1, max_size = 2)]
pub fn main() -> std::io::Result<()> {
let addr = "127.0.0.1:8999";
let server_finished_pair = Arc::new((Mutex::new(true), Condvar::new()));
let server_finished = Arc::clone(&server_finished_pair);
let server_started_pair = Arc::new((Mutex::new(None), Condvar::new()));
let server_started = Arc::clone(&server_started_pair);
_ = std::thread::Builder::new()
.name("crate_co_server".to_string())
.spawn(move || start_co_server(addr, server_finished_pair))
.spawn(move || start_co_server(server_finished_pair, server_started_pair))
.expect("failed to spawn thread");
_ = std::thread::Builder::new()
.name("crate_co_client".to_string())
.spawn(move || start_co_client(addr))
.spawn(move || start_co_client(server_started))
.expect("failed to spawn thread");

let (lock, cvar) = &*server_finished;
Expand Down
Loading
Loading