Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 52 additions & 19 deletions core/src/common/ordered_work_steal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> {
}

/// Returns `true` if the local queue is empty.
pub fn is_local_empty(&self) -> bool {
self.local_len() == 0
}

/// Returns `true` if all the queues are empty.
pub fn is_empty(&self) -> bool {
self.len() == 0
}
Comment thread
loongs-zhang marked this conversation as resolved.
Expand All @@ -194,19 +199,19 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> {
///
/// let queue = OrderedWorkStealQueue::new(1, 2);
/// let local = queue.local_queue();
/// assert!(local.is_empty());
/// assert!(local.is_local_empty());
/// for i in 0..2 {
/// local.push_with_priority(i, i);
/// }
/// assert!(local.is_full());
/// assert!(local.is_local_full());
/// assert_eq!(local.pop(), Some(0));
/// assert_eq!(local.len(), 1);
/// assert_eq!(local.local_len(), 1);
/// assert_eq!(local.pop(), Some(1));
/// assert_eq!(local.pop(), None);
/// assert!(local.is_empty());
/// assert!(local.is_local_empty());
/// ```
pub fn is_full(&self) -> bool {
self.len() >= self.shared.local_capacity
pub fn is_local_full(&self) -> bool {
self.local_len() >= self.shared.local_capacity
}

fn max_steal(&self) -> usize {
Expand All @@ -215,11 +220,11 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> {
.local_capacity
.saturating_add(1)
.saturating_div(2)
.saturating_sub(self.len())
.saturating_sub(self.local_len())
}

fn can_steal(&self) -> bool {
self.len()
self.local_len()
< self
.shared
.local_capacity
Expand All @@ -228,10 +233,22 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> {
}

/// Returns the number of elements in the queue.
pub fn len(&self) -> usize {
pub fn local_len(&self) -> usize {
self.len.load(Ordering::Acquire)
}

/// Returns the number of elements in the all queues.
pub fn len(&self) -> usize {
let mut full_len = self.shared.len();
for local_queue in &self.shared.local_queues {
for entry in local_queue {
let worker = entry.value();
full_len += worker.capacity() - worker.spare_capacity();
}
}
full_len
Comment thread
loongs-zhang marked this conversation as resolved.
}

fn try_lock(&self) -> bool {
self.stealing
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
Expand All @@ -255,13 +272,14 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> {
/// for i in 0..4 {
/// local.push_with_priority(i, i);
/// }
/// for i in 0..4 {
/// assert_eq!(local.pop(), Some(i));
/// }
/// assert_eq!(local.pop(), Some(0));
/// assert_eq!(local.pop(), Some(3));
/// assert_eq!(local.pop(), Some(1));
/// assert_eq!(local.pop(), Some(2));
/// assert_eq!(local.pop(), None);
/// ```
pub fn push_with_priority(&self, priority: c_longlong, item: T) {
if self.is_full() {
if self.is_local_full() {
self.push_to_global(priority, item);
return;
}
Expand All @@ -275,20 +293,28 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> {
} else {
//add count
self.len
.store(self.len().saturating_add(1), Ordering::Release);
.store(self.local_len().saturating_add(1), Ordering::Release);
}
}

fn push_to_global(&self, priority: c_longlong, item: T) {
//把本地队列的一半放到全局队列
let count = self.len() / 2;
for _ in 0..count {
let count = self.local_len() / 2;
let mut done = 0;
while done < count {
for entry in self.queue.iter().rev() {
if done >= count {
break;
}
if let Some(item) = entry.value().pop() {
self.shared.push_with_priority(*entry.key(), item);
done += 1;
}
}
}
Comment on lines 301 to 314
Copy link

Copilot AI Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

push_to_global subtracts count from the local counter, but the nested loops can pop more (or fewer) than count items because each outer iteration can pop up to one item per priority entry. This will desynchronize local_len() from the actual number of local items. Track the exact number of items actually moved to the shared queue and adjust the local counter by that value (or recompute local occupancy after the transfer).

Copilot uses AI. Check for mistakes.
// refresh count
self.len
.store(self.local_len().saturating_sub(count), Ordering::Release);
//直接放到全局队列
self.shared.push_with_priority(priority, item);
}
Expand Down Expand Up @@ -332,12 +358,12 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> {
/// for i in 2..6 {
/// local0.push_with_priority(i, i);
/// }
/// assert_eq!(local0.len(), 4);
/// assert_eq!(local0.local_len(), 4);
/// let local1 = queue.local_queue();
/// for i in 0..2 {
/// local1.push_with_priority(i, i);
/// }
/// assert_eq!(local1.len(), 2);
/// assert_eq!(local1.local_len(), 2);
/// for i in 0..6 {
/// assert_eq!(local1.pop(), Some(i));
/// }
Expand Down Expand Up @@ -397,6 +423,13 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> {
})
.is_ok()
{
// refresh local len
self.len.store(
self.local_len().saturating_add(
into_queue.capacity() - into_queue.spare_capacity(),
),
Ordering::Release,
);
Comment on lines +426 to +432
Copy link

Copilot AI Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After a successful sibling steal, the “refresh local len” logic adds into_queue.capacity() - into_queue.spare_capacity() to the existing local_len(). This value is the total current size of into_queue, not the number of newly stolen items, so it can overcount (especially when the priority bucket already had items). Consider capturing the stolen count inside the steal closure or recomputing the full local occupancy across all buckets and storing that.

Copilot uses AI. Check for mistakes.
self.release_lock();
return self.pop_local();
}
Expand All @@ -415,7 +448,7 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> {
if let Some(val) = entry.value().pop() {
// Decrement the count.
self.len
.store(self.len().saturating_sub(1), Ordering::Release);
.store(self.local_len().saturating_sub(1), Ordering::Release);
return Some(val);
}
}
Expand Down
8 changes: 4 additions & 4 deletions core/src/net/operator/windows/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ fn crate_server2(port: u16, server_started: Arc<AtomicBool>) -> anyhow::Result<(
server_started.store(true, Ordering::Release);

operator.accept(
token_alloc.insert(Token::Accept),
token_alloc.insert(Token::Accept) as _,
listener.as_raw_socket() as _,
Comment thread
loongs-zhang marked this conversation as resolved.
std::ptr::null_mut(),
std::ptr::null_mut(),
Expand All @@ -77,7 +77,7 @@ fn crate_server2(port: u16, server_started: Arc<AtomicBool>) -> anyhow::Result<(
let (_, mut cq, _) = operator.select(None, 1)?;
for cqe in &mut cq {
let token_index = cqe.token;
let token = &mut token_alloc[token_index];
let token = &mut token_alloc[token_index as _];
match token.clone() {
Token::Accept => {
Comment thread
loongs-zhang marked this conversation as resolved.
println!("server accepted");
Expand Down Expand Up @@ -107,7 +107,7 @@ fn crate_server2(port: u16, server_started: Arc<AtomicBool>) -> anyhow::Result<(
let ret = cqe.result as _;
if ret == 0 {
bufpool.push(buf_index);
_ = token_alloc.remove(token_index);
_ = token_alloc.remove(token_index as _);
println!("shutdown connection1");
_ = unsafe { closesocket(fd) };
println!("Server closed1");
Expand Down Expand Up @@ -149,7 +149,7 @@ fn crate_server2(port: u16, server_started: Arc<AtomicBool>) -> anyhow::Result<(
.is_err()
{
bufpool.push(buf_index);
_ = token_alloc.remove(token_index);
_ = token_alloc.remove(token_index as _);
println!("shutdown connection2");
_ = unsafe { closesocket(fd) };
println!("Server closed2");
Expand Down
Loading