Skip to content

Commit a60d251

Browse files
authored
enhance stable (#432)
2 parents 5fabaa8 + dcf065d commit a60d251

2 files changed

Lines changed: 56 additions & 23 deletions

File tree

core/src/common/ordered_work_steal.rs

Lines changed: 52 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,11 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> {
181181
}
182182

183183
/// Returns `true` if the local queue is empty.
184+
pub fn is_local_empty(&self) -> bool {
185+
self.local_len() == 0
186+
}
187+
188+
/// Returns `true` if all the queues are empty.
184189
pub fn is_empty(&self) -> bool {
185190
self.len() == 0
186191
}
@@ -194,19 +199,19 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> {
194199
///
195200
/// let queue = OrderedWorkStealQueue::new(1, 2);
196201
/// let local = queue.local_queue();
197-
/// assert!(local.is_empty());
202+
/// assert!(local.is_local_empty());
198203
/// for i in 0..2 {
199204
/// local.push_with_priority(i, i);
200205
/// }
201-
/// assert!(local.is_full());
206+
/// assert!(local.is_local_full());
202207
/// assert_eq!(local.pop(), Some(0));
203-
/// assert_eq!(local.len(), 1);
208+
/// assert_eq!(local.local_len(), 1);
204209
/// assert_eq!(local.pop(), Some(1));
205210
/// assert_eq!(local.pop(), None);
206-
/// assert!(local.is_empty());
211+
/// assert!(local.is_local_empty());
207212
/// ```
208-
pub fn is_full(&self) -> bool {
209-
self.len() >= self.shared.local_capacity
213+
pub fn is_local_full(&self) -> bool {
214+
self.local_len() >= self.shared.local_capacity
210215
}
211216

212217
fn max_steal(&self) -> usize {
@@ -215,11 +220,11 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> {
215220
.local_capacity
216221
.saturating_add(1)
217222
.saturating_div(2)
218-
.saturating_sub(self.len())
223+
.saturating_sub(self.local_len())
219224
}
220225

221226
fn can_steal(&self) -> bool {
222-
self.len()
227+
self.local_len()
223228
< self
224229
.shared
225230
.local_capacity
@@ -228,10 +233,22 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> {
228233
}
229234

230235
/// Returns the number of elements in the queue.
231-
pub fn len(&self) -> usize {
236+
pub fn local_len(&self) -> usize {
232237
self.len.load(Ordering::Acquire)
233238
}
234239

240+
/// Returns the number of elements in the all queues.
241+
pub fn len(&self) -> usize {
242+
let mut full_len = self.shared.len();
243+
for local_queue in &self.shared.local_queues {
244+
for entry in local_queue {
245+
let worker = entry.value();
246+
full_len += worker.capacity() - worker.spare_capacity();
247+
}
248+
}
249+
full_len
250+
}
251+
235252
fn try_lock(&self) -> bool {
236253
self.stealing
237254
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
@@ -255,13 +272,14 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> {
255272
/// for i in 0..4 {
256273
/// local.push_with_priority(i, i);
257274
/// }
258-
/// for i in 0..4 {
259-
/// assert_eq!(local.pop(), Some(i));
260-
/// }
275+
/// assert_eq!(local.pop(), Some(0));
276+
/// assert_eq!(local.pop(), Some(3));
277+
/// assert_eq!(local.pop(), Some(1));
278+
/// assert_eq!(local.pop(), Some(2));
261279
/// assert_eq!(local.pop(), None);
262280
/// ```
263281
pub fn push_with_priority(&self, priority: c_longlong, item: T) {
264-
if self.is_full() {
282+
if self.is_local_full() {
265283
self.push_to_global(priority, item);
266284
return;
267285
}
@@ -275,20 +293,28 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> {
275293
} else {
276294
//add count
277295
self.len
278-
.store(self.len().saturating_add(1), Ordering::Release);
296+
.store(self.local_len().saturating_add(1), Ordering::Release);
279297
}
280298
}
281299

282300
fn push_to_global(&self, priority: c_longlong, item: T) {
283301
//把本地队列的一半放到全局队列
284-
let count = self.len() / 2;
285-
for _ in 0..count {
302+
let count = self.local_len() / 2;
303+
let mut done = 0;
304+
while done < count {
286305
for entry in self.queue.iter().rev() {
306+
if done >= count {
307+
break;
308+
}
287309
if let Some(item) = entry.value().pop() {
288310
self.shared.push_with_priority(*entry.key(), item);
311+
done += 1;
289312
}
290313
}
291314
}
315+
// refresh count
316+
self.len
317+
.store(self.local_len().saturating_sub(count), Ordering::Release);
292318
//直接放到全局队列
293319
self.shared.push_with_priority(priority, item);
294320
}
@@ -332,12 +358,12 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> {
332358
/// for i in 2..6 {
333359
/// local0.push_with_priority(i, i);
334360
/// }
335-
/// assert_eq!(local0.len(), 4);
361+
/// assert_eq!(local0.local_len(), 4);
336362
/// let local1 = queue.local_queue();
337363
/// for i in 0..2 {
338364
/// local1.push_with_priority(i, i);
339365
/// }
340-
/// assert_eq!(local1.len(), 2);
366+
/// assert_eq!(local1.local_len(), 2);
341367
/// for i in 0..6 {
342368
/// assert_eq!(local1.pop(), Some(i));
343369
/// }
@@ -397,6 +423,13 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> {
397423
})
398424
.is_ok()
399425
{
426+
// refresh local len
427+
self.len.store(
428+
self.local_len().saturating_add(
429+
into_queue.capacity() - into_queue.spare_capacity(),
430+
),
431+
Ordering::Release,
432+
);
400433
self.release_lock();
401434
return self.pop_local();
402435
}
@@ -415,7 +448,7 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> {
415448
if let Some(val) = entry.value().pop() {
416449
// Decrement the count.
417450
self.len
418-
.store(self.len().saturating_sub(1), Ordering::Release);
451+
.store(self.local_len().saturating_sub(1), Ordering::Release);
419452
return Some(val);
420453
}
421454
}

core/src/net/operator/windows/tests.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ fn crate_server2(port: u16, server_started: Arc<AtomicBool>) -> anyhow::Result<(
6666
server_started.store(true, Ordering::Release);
6767

6868
operator.accept(
69-
token_alloc.insert(Token::Accept),
69+
token_alloc.insert(Token::Accept) as _,
7070
listener.as_raw_socket() as _,
7171
std::ptr::null_mut(),
7272
std::ptr::null_mut(),
@@ -77,7 +77,7 @@ fn crate_server2(port: u16, server_started: Arc<AtomicBool>) -> anyhow::Result<(
7777
let (_, mut cq, _) = operator.select(None, 1)?;
7878
for cqe in &mut cq {
7979
let token_index = cqe.token;
80-
let token = &mut token_alloc[token_index];
80+
let token = &mut token_alloc[token_index as _];
8181
match token.clone() {
8282
Token::Accept => {
8383
println!("server accepted");
@@ -107,7 +107,7 @@ fn crate_server2(port: u16, server_started: Arc<AtomicBool>) -> anyhow::Result<(
107107
let ret = cqe.result as _;
108108
if ret == 0 {
109109
bufpool.push(buf_index);
110-
_ = token_alloc.remove(token_index);
110+
_ = token_alloc.remove(token_index as _);
111111
println!("shutdown connection1");
112112
_ = unsafe { closesocket(fd) };
113113
println!("Server closed1");
@@ -149,7 +149,7 @@ fn crate_server2(port: u16, server_started: Arc<AtomicBool>) -> anyhow::Result<(
149149
.is_err()
150150
{
151151
bufpool.push(buf_index);
152-
_ = token_alloc.remove(token_index);
152+
_ = token_alloc.remove(token_index as _);
153153
println!("shutdown connection2");
154154
_ = unsafe { closesocket(fd) };
155155
println!("Server closed2");

0 commit comments

Comments
 (0)