Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion core/src/common/ordered_work_steal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,22 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> {
}

/// Returns `true` if the local queue is empty.
///
/// When the `len` counter indicates non-empty, this method verifies by
/// scanning all Workers. The counter can become stale-high when sibling
/// schedulers steal items via `stealer().steal()` without decrementing
/// our counter.
pub fn is_empty(&self) -> bool {
self.len() == 0
if self.len() == 0 {
Comment thread
loongs-zhang marked this conversation as resolved.
Outdated
return true;
}
// len might be stale due to concurrent work-stealing — verify workers
for entry in self.queue {
if !entry.value().is_empty() {
return false;
Comment thread
loongs-zhang marked this conversation as resolved.
Outdated
}
}
true
}
Comment thread
loongs-zhang marked this conversation as resolved.

/// Returns `true` if the local queue is full.
Expand Down
8 changes: 4 additions & 4 deletions core/src/net/operator/windows/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ fn crate_server2(port: u16, server_started: Arc<AtomicBool>) -> anyhow::Result<(
server_started.store(true, Ordering::Release);

operator.accept(
token_alloc.insert(Token::Accept),
token_alloc.insert(Token::Accept) as _,
listener.as_raw_socket() as _,
Comment thread
loongs-zhang marked this conversation as resolved.
std::ptr::null_mut(),
std::ptr::null_mut(),
Expand All @@ -77,7 +77,7 @@ fn crate_server2(port: u16, server_started: Arc<AtomicBool>) -> anyhow::Result<(
let (_, mut cq, _) = operator.select(None, 1)?;
for cqe in &mut cq {
let token_index = cqe.token;
let token = &mut token_alloc[token_index];
let token = &mut token_alloc[token_index as _];
match token.clone() {
Token::Accept => {
Comment thread
loongs-zhang marked this conversation as resolved.
println!("server accepted");
Expand Down Expand Up @@ -107,7 +107,7 @@ fn crate_server2(port: u16, server_started: Arc<AtomicBool>) -> anyhow::Result<(
let ret = cqe.result as _;
if ret == 0 {
bufpool.push(buf_index);
_ = token_alloc.remove(token_index);
_ = token_alloc.remove(token_index as _);
println!("shutdown connection1");
_ = unsafe { closesocket(fd) };
println!("Server closed1");
Expand Down Expand Up @@ -149,7 +149,7 @@ fn crate_server2(port: u16, server_started: Arc<AtomicBool>) -> anyhow::Result<(
.is_err()
{
bufpool.push(buf_index);
_ = token_alloc.remove(token_index);
_ = token_alloc.remove(token_index as _);
println!("shutdown connection2");
_ = unsafe { closesocket(fd) };
println!("Server closed2");
Expand Down
Loading