Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions core/src/co_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,13 @@ impl<'p> CoroutinePool<'p> {
self.task_queue.is_empty()
}

/// Returns `true` if the local&global task queue is empty.
///
/// Don't care about other local queue, but do care about global queue.
pub fn is_local_empty(&self) -> bool {
self.task_queue.is_local_empty() && self.task_queue.is_global_empty()
}

/// Returns the number of tasks owned by this pool.
pub fn size(&self) -> usize {
self.task_queue.len()
Expand Down
5 changes: 5 additions & 0 deletions core/src/common/ordered_work_steal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> {
self.local_len() == 0
}

/// Returns `true` if the global queue is empty.
pub fn is_global_empty(&self) -> bool {
self.shared.is_empty()
}

/// Returns `true` if all the queues are empty.
pub fn is_empty(&self) -> bool {
self.len() == 0
Expand Down
4 changes: 2 additions & 2 deletions core/src/net/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ impl<'e> EventLoop<'e> {
);
Self::init_current(consumer);
while PoolState::Running == consumer.state()
|| !consumer.is_empty()
|| !consumer.is_local_empty()
|| consumer.get_running_size() > 0
{
_ = consumer.wait_event(Some(SLICE));
Expand Down Expand Up @@ -444,7 +444,7 @@ impl<'e> EventLoop<'e> {
return Err(Error::new(ErrorKind::TimedOut, "stop timeout !"));
}
self.wait_event(Some(Duration::from_nanos(left_time).min(SLICE)))?;
if self.is_empty() && self.get_running_size() == 0 {
if self.is_local_empty() && self.get_running_size() == 0 {
assert_eq!(PoolState::Stopping, self.stopped()?);
return Ok(());
}
Expand Down
11 changes: 10 additions & 1 deletion core/src/syscall/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,21 @@ macro_rules! impl_facade {
}
}
let r = self.inner.$syscall(fn_ptr, $($arg, )*);
// Save errno immediately—logging and coroutine bookkeeping
// call Win32 APIs (e.g. CreateFileW) that clobber GetLastError().
let saved_errno = std::io::Error::last_os_error();
if let Some(co) = $crate::scheduler::SchedulableCoroutine::current() {
if co.running().is_err() {
$crate::error!("{} change to running state failed !", co.name());
}
}
$crate::info!("exit syscall {} {:?} {}", syscall, r, std::io::Error::last_os_error());
$crate::info!("exit syscall {} {:?} {}", syscall, r, saved_errno);
// Restore errno so callers see the correct error.
if let Some(e) = saved_errno.raw_os_error() {
$crate::syscall::set_errno(
u32::try_from(e).unwrap_or_default()
);
}
r
}
}
Expand Down
28 changes: 24 additions & 4 deletions open-coroutine/examples/socket_co.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,19 @@ use std::os::fd::AsRawFd;
use std::sync::{Arc, Condvar, Mutex};
use std::time::Duration;

pub fn start_co_server<A: ToSocketAddrs>(addr: A, server_finished: Arc<(Mutex<bool>, Condvar)>) {
pub fn start_co_server<A: ToSocketAddrs>(
addr: A,
server_finished: Arc<(Mutex<bool>, Condvar)>,
server_started: Arc<(Mutex<bool>, Condvar)>,
) {
let listener = TcpListener::bind(addr).expect("start server failed");
// Signal that server is ready to accept connections
{
let (lock, cvar) = &*server_started;
let mut started = lock.lock().unwrap();
*started = true;
cvar.notify_one();
}
for stream in listener.incoming() {
_ = task!(
|mut socket| {
Expand Down Expand Up @@ -77,7 +88,14 @@ pub fn start_co_server<A: ToSocketAddrs>(addr: A, server_finished: Arc<(Mutex<bo
}
}

pub fn start_co_client<A: ToSocketAddrs>(addr: A) {
pub fn start_co_client<A: ToSocketAddrs>(addr: A, server_started: Arc<(Mutex<bool>, Condvar)>) {
// Wait for server to be ready
{
let (lock, cvar) = &*server_started;
let _guard = cvar
.wait_while(lock.lock().unwrap(), |started| !*started)
.unwrap();
}
_ = task!(
|mut stream| {
let mut buffer1 = [0; 256];
Expand Down Expand Up @@ -157,13 +175,15 @@ pub fn main() -> std::io::Result<()> {
let addr = "127.0.0.1:8999";
let server_finished_pair = Arc::new((Mutex::new(true), Condvar::new()));
let server_finished = Arc::clone(&server_finished_pair);
let server_started_pair = Arc::new((Mutex::new(false), Condvar::new()));
let server_started = Arc::clone(&server_started_pair);
_ = std::thread::Builder::new()
.name("crate_co_server".to_string())
.spawn(move || start_co_server(addr, server_finished_pair))
.spawn(move || start_co_server(addr, server_finished_pair, server_started_pair))
.expect("failed to spawn thread");
_ = std::thread::Builder::new()
.name("crate_co_client".to_string())
.spawn(move || start_co_client(addr))
.spawn(move || start_co_client(addr, server_started))
.expect("failed to spawn thread");

let (lock, cvar) = &*server_finished;
Expand Down
28 changes: 24 additions & 4 deletions open-coroutine/examples/socket_co_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,19 @@ use std::os::fd::AsRawFd;
use std::sync::{Arc, Condvar, Mutex};
use std::time::Duration;

pub fn start_server<A: ToSocketAddrs>(addr: A, server_finished: Arc<(Mutex<bool>, Condvar)>) {
pub fn start_server<A: ToSocketAddrs>(
addr: A,
server_finished: Arc<(Mutex<bool>, Condvar)>,
server_started: Arc<(Mutex<bool>, Condvar)>,
) {
let listener = TcpListener::bind(addr).expect("start server failed");
// Signal that server is ready to accept connections
{
let (lock, cvar) = &*server_started;
let mut started = lock.lock().unwrap();
*started = true;
cvar.notify_one();
}
for stream in listener.incoming() {
let mut socket = stream.expect("accept new connection failed");
let mut buffer1 = [0; 256];
Expand Down Expand Up @@ -72,7 +83,14 @@ pub fn start_server<A: ToSocketAddrs>(addr: A, server_finished: Arc<(Mutex<bool>
}
}

pub fn start_co_client<A: ToSocketAddrs>(addr: A) {
pub fn start_co_client<A: ToSocketAddrs>(addr: A, server_started: Arc<(Mutex<bool>, Condvar)>) {
// Wait for server to be ready
{
let (lock, cvar) = &*server_started;
let _guard = cvar
.wait_while(lock.lock().unwrap(), |started| !*started)
.unwrap();
}
_ = task!(
|mut stream| {
let mut buffer1 = [0; 256];
Expand Down Expand Up @@ -152,13 +170,15 @@ pub fn main() -> std::io::Result<()> {
let addr = "127.0.0.1:8899";
let server_finished_pair = Arc::new((Mutex::new(true), Condvar::new()));
let server_finished = Arc::clone(&server_finished_pair);
let server_started_pair = Arc::new((Mutex::new(false), Condvar::new()));
let server_started = Arc::clone(&server_started_pair);
_ = std::thread::Builder::new()
.name("crate_server".to_string())
.spawn(move || start_server(addr, server_finished_pair))
.spawn(move || start_server(addr, server_finished_pair, server_started_pair))
.expect("failed to spawn thread");
_ = std::thread::Builder::new()
.name("crate_co_client".to_string())
.spawn(move || start_co_client(addr))
.spawn(move || start_co_client(addr, server_started))
.expect("failed to spawn thread");

let (lock, cvar) = &*server_finished;
Expand Down
28 changes: 24 additions & 4 deletions open-coroutine/examples/socket_co_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,19 @@ use std::os::fd::AsRawFd;
use std::sync::{Arc, Condvar, Mutex};
use std::time::Duration;

pub fn start_co_server<A: ToSocketAddrs>(addr: A, server_finished: Arc<(Mutex<bool>, Condvar)>) {
pub fn start_co_server<A: ToSocketAddrs>(
addr: A,
server_finished: Arc<(Mutex<bool>, Condvar)>,
server_started: Arc<(Mutex<bool>, Condvar)>,
) {
let listener = TcpListener::bind(addr).expect("start server failed");
// Signal that server is ready to accept connections
{
let (lock, cvar) = &*server_started;
let mut started = lock.lock().unwrap();
*started = true;
cvar.notify_one();
}
for stream in listener.incoming() {
_ = task!(
|mut socket| {
Expand Down Expand Up @@ -77,7 +88,14 @@ pub fn start_co_server<A: ToSocketAddrs>(addr: A, server_finished: Arc<(Mutex<bo
}
}

pub fn start_client<A: ToSocketAddrs>(addr: A) {
pub fn start_client<A: ToSocketAddrs>(addr: A, server_started: Arc<(Mutex<bool>, Condvar)>) {
// Wait for server to be ready
{
let (lock, cvar) = &*server_started;
let _guard = cvar
.wait_while(lock.lock().unwrap(), |started| !*started)
.unwrap();
}
let mut stream =
open_coroutine::connect_timeout(addr, Duration::from_secs(3)).expect("connect failed");
let mut buffer1 = [0; 256];
Expand Down Expand Up @@ -154,13 +172,15 @@ pub fn main() -> std::io::Result<()> {
let addr = "127.0.0.1:8889";
let server_finished_pair = Arc::new((Mutex::new(true), Condvar::new()));
let server_finished = Arc::clone(&server_finished_pair);
let server_started_pair = Arc::new((Mutex::new(false), Condvar::new()));
let server_started = Arc::clone(&server_started_pair);
_ = std::thread::Builder::new()
.name("crate_co_server".to_string())
.spawn(move || start_co_server(addr, server_finished_pair))
.spawn(move || start_co_server(addr, server_finished_pair, server_started_pair))
.expect("failed to spawn thread");
_ = std::thread::Builder::new()
.name("crate_client".to_string())
.spawn(move || start_client(addr))
.spawn(move || start_client(addr, server_started))
.expect("failed to spawn thread");

let (lock, cvar) = &*server_finished;
Expand Down
28 changes: 24 additions & 4 deletions open-coroutine/examples/socket_not_co.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,19 @@ use std::os::fd::AsRawFd;
use std::sync::{Arc, Condvar, Mutex};
use std::time::Duration;

fn start_server<A: ToSocketAddrs>(addr: A, server_finished: Arc<(Mutex<bool>, Condvar)>) {
fn start_server<A: ToSocketAddrs>(
addr: A,
server_finished: Arc<(Mutex<bool>, Condvar)>,
server_started: Arc<(Mutex<bool>, Condvar)>,
) {
let listener = TcpListener::bind(addr).expect("start server failed");
// Signal that server is ready to accept connections
{
let (lock, cvar) = &*server_started;
let mut started = lock.lock().unwrap();
*started = true;
cvar.notify_one();
}
for stream in listener.incoming() {
let mut socket = stream.expect("accept new connection failed");
let mut buffer1 = [0; 256];
Expand Down Expand Up @@ -71,7 +82,14 @@ fn start_server<A: ToSocketAddrs>(addr: A, server_finished: Arc<(Mutex<bool>, Co
}
}

fn start_client<A: ToSocketAddrs>(addr: A) {
fn start_client<A: ToSocketAddrs>(addr: A, server_started: Arc<(Mutex<bool>, Condvar)>) {
// Wait for server to be ready
{
let (lock, cvar) = &*server_started;
let _guard = cvar
.wait_while(lock.lock().unwrap(), |started| !*started)
.unwrap();
}
let mut stream =
open_coroutine::connect_timeout(addr, Duration::from_secs(1)).expect("connect failed");
let mut buffer1 = [0; 256];
Expand Down Expand Up @@ -148,13 +166,15 @@ pub fn main() -> std::io::Result<()> {
let addr = "127.0.0.1:8888";
let server_finished_pair = Arc::new((Mutex::new(true), Condvar::new()));
let server_finished = Arc::clone(&server_finished_pair);
let server_started_pair = Arc::new((Mutex::new(false), Condvar::new()));
let server_started = Arc::clone(&server_started_pair);
_ = std::thread::Builder::new()
.name("crate_server".to_string())
.spawn(move || start_server(addr, server_finished_pair))
.spawn(move || start_server(addr, server_finished_pair, server_started_pair))
.expect("failed to spawn thread");
_ = std::thread::Builder::new()
.name("crate_client".to_string())
.spawn(move || start_client(addr))
.spawn(move || start_client(addr, server_started))
.expect("failed to spawn thread");

let (lock, cvar) = &*server_finished;
Expand Down
Loading