Skip to content

Commit dcf065d

Browse files
committed
fix count bug in OrderedLocalQueue
1 parent 30280fd commit dcf065d

1 file changed

Lines changed: 53 additions & 34 deletions

File tree

core/src/common/ordered_work_steal.rs

Lines changed: 53 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -181,22 +181,13 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> {
181181
}
182182

183183
/// Returns `true` if the local queue is empty.
184-
///
185-
/// When the `len` counter indicates non-empty, this method verifies by
186-
/// scanning all Workers. The counter can become stale-high when sibling
187-
/// schedulers steal items via `stealer().steal()` without decrementing
188-
/// our counter.
184+
pub fn is_local_empty(&self) -> bool {
185+
self.local_len() == 0
186+
}
187+
188+
/// Returns `true` if all the queues are empty.
189189
pub fn is_empty(&self) -> bool {
190-
if self.len() == 0 {
191-
return true;
192-
}
193-
// len might be stale due to concurrent work-stealing — verify workers
194-
for entry in self.queue {
195-
if !entry.value().is_empty() {
196-
return false;
197-
}
198-
}
199-
true
190+
self.len() == 0
200191
}
201192

202193
/// Returns `true` if the local queue is full.
@@ -208,19 +199,19 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> {
208199
///
209200
/// let queue = OrderedWorkStealQueue::new(1, 2);
210201
/// let local = queue.local_queue();
211-
/// assert!(local.is_empty());
202+
/// assert!(local.is_local_empty());
212203
/// for i in 0..2 {
213204
/// local.push_with_priority(i, i);
214205
/// }
215-
/// assert!(local.is_full());
206+
/// assert!(local.is_local_full());
216207
/// assert_eq!(local.pop(), Some(0));
217-
/// assert_eq!(local.len(), 1);
208+
/// assert_eq!(local.local_len(), 1);
218209
/// assert_eq!(local.pop(), Some(1));
219210
/// assert_eq!(local.pop(), None);
220-
/// assert!(local.is_empty());
211+
/// assert!(local.is_local_empty());
221212
/// ```
222-
pub fn is_full(&self) -> bool {
223-
self.len() >= self.shared.local_capacity
213+
pub fn is_local_full(&self) -> bool {
214+
self.local_len() >= self.shared.local_capacity
224215
}
225216

226217
fn max_steal(&self) -> usize {
@@ -229,11 +220,11 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> {
229220
.local_capacity
230221
.saturating_add(1)
231222
.saturating_div(2)
232-
.saturating_sub(self.len())
223+
.saturating_sub(self.local_len())
233224
}
234225

235226
fn can_steal(&self) -> bool {
236-
self.len()
227+
self.local_len()
237228
< self
238229
.shared
239230
.local_capacity
@@ -242,10 +233,22 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> {
242233
}
243234

244235
/// Returns the number of elements in the queue.
245-
pub fn len(&self) -> usize {
236+
pub fn local_len(&self) -> usize {
246237
self.len.load(Ordering::Acquire)
247238
}
248239

240+
/// Returns the number of elements in the all queues.
241+
pub fn len(&self) -> usize {
242+
let mut full_len = self.shared.len();
243+
for local_queue in &self.shared.local_queues {
244+
for entry in local_queue {
245+
let worker = entry.value();
246+
full_len += worker.capacity() - worker.spare_capacity();
247+
}
248+
}
249+
full_len
250+
}
251+
249252
fn try_lock(&self) -> bool {
250253
self.stealing
251254
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
@@ -269,13 +272,14 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> {
269272
/// for i in 0..4 {
270273
/// local.push_with_priority(i, i);
271274
/// }
272-
/// for i in 0..4 {
273-
/// assert_eq!(local.pop(), Some(i));
274-
/// }
275+
/// assert_eq!(local.pop(), Some(0));
276+
/// assert_eq!(local.pop(), Some(3));
277+
/// assert_eq!(local.pop(), Some(1));
278+
/// assert_eq!(local.pop(), Some(2));
275279
/// assert_eq!(local.pop(), None);
276280
/// ```
277281
pub fn push_with_priority(&self, priority: c_longlong, item: T) {
278-
if self.is_full() {
282+
if self.is_local_full() {
279283
self.push_to_global(priority, item);
280284
return;
281285
}
@@ -289,20 +293,28 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> {
289293
} else {
290294
//add count
291295
self.len
292-
.store(self.len().saturating_add(1), Ordering::Release);
296+
.store(self.local_len().saturating_add(1), Ordering::Release);
293297
}
294298
}
295299

296300
fn push_to_global(&self, priority: c_longlong, item: T) {
297301
//把本地队列的一半放到全局队列
298-
let count = self.len() / 2;
299-
for _ in 0..count {
302+
let count = self.local_len() / 2;
303+
let mut done = 0;
304+
while done < count {
300305
for entry in self.queue.iter().rev() {
306+
if done >= count {
307+
break;
308+
}
301309
if let Some(item) = entry.value().pop() {
302310
self.shared.push_with_priority(*entry.key(), item);
311+
done += 1;
303312
}
304313
}
305314
}
315+
// refresh count
316+
self.len
317+
.store(self.local_len().saturating_sub(count), Ordering::Release);
306318
//直接放到全局队列
307319
self.shared.push_with_priority(priority, item);
308320
}
@@ -346,12 +358,12 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> {
346358
/// for i in 2..6 {
347359
/// local0.push_with_priority(i, i);
348360
/// }
349-
/// assert_eq!(local0.len(), 4);
361+
/// assert_eq!(local0.local_len(), 4);
350362
/// let local1 = queue.local_queue();
351363
/// for i in 0..2 {
352364
/// local1.push_with_priority(i, i);
353365
/// }
354-
/// assert_eq!(local1.len(), 2);
366+
/// assert_eq!(local1.local_len(), 2);
355367
/// for i in 0..6 {
356368
/// assert_eq!(local1.pop(), Some(i));
357369
/// }
@@ -411,6 +423,13 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> {
411423
})
412424
.is_ok()
413425
{
426+
// refresh local len
427+
self.len.store(
428+
self.local_len().saturating_add(
429+
into_queue.capacity() - into_queue.spare_capacity(),
430+
),
431+
Ordering::Release,
432+
);
414433
self.release_lock();
415434
return self.pop_local();
416435
}
@@ -429,7 +448,7 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> {
429448
if let Some(val) = entry.value().pop() {
430449
// Decrement the count.
431450
self.len
432-
.store(self.len().saturating_sub(1), Ordering::Release);
451+
.store(self.local_len().saturating_sub(1), Ordering::Release);
433452
return Some(val);
434453
}
435454
}

0 commit comments

Comments
 (0)