-
Notifications
You must be signed in to change notification settings - Fork 5
enhance stable #432
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
enhance stable #432
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -181,6 +181,11 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> { | |
| } | ||
|
|
||
| /// Returns `true` if the local queue is empty. | ||
| pub fn is_local_empty(&self) -> bool { | ||
| self.local_len() == 0 | ||
| } | ||
|
|
||
| /// Returns `true` if all the queues are empty. | ||
| pub fn is_empty(&self) -> bool { | ||
| self.len() == 0 | ||
| } | ||
|
|
@@ -194,19 +199,19 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> { | |
| /// | ||
| /// let queue = OrderedWorkStealQueue::new(1, 2); | ||
| /// let local = queue.local_queue(); | ||
| /// assert!(local.is_empty()); | ||
| /// assert!(local.is_local_empty()); | ||
| /// for i in 0..2 { | ||
| /// local.push_with_priority(i, i); | ||
| /// } | ||
| /// assert!(local.is_full()); | ||
| /// assert!(local.is_local_full()); | ||
| /// assert_eq!(local.pop(), Some(0)); | ||
| /// assert_eq!(local.len(), 1); | ||
| /// assert_eq!(local.local_len(), 1); | ||
| /// assert_eq!(local.pop(), Some(1)); | ||
| /// assert_eq!(local.pop(), None); | ||
| /// assert!(local.is_empty()); | ||
| /// assert!(local.is_local_empty()); | ||
| /// ``` | ||
| pub fn is_full(&self) -> bool { | ||
| self.len() >= self.shared.local_capacity | ||
| pub fn is_local_full(&self) -> bool { | ||
| self.local_len() >= self.shared.local_capacity | ||
| } | ||
|
|
||
| fn max_steal(&self) -> usize { | ||
|
|
@@ -215,11 +220,11 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> { | |
| .local_capacity | ||
| .saturating_add(1) | ||
| .saturating_div(2) | ||
| .saturating_sub(self.len()) | ||
| .saturating_sub(self.local_len()) | ||
| } | ||
|
|
||
| fn can_steal(&self) -> bool { | ||
| self.len() | ||
| self.local_len() | ||
| < self | ||
| .shared | ||
| .local_capacity | ||
|
|
@@ -228,10 +233,22 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> { | |
| } | ||
|
|
||
| /// Returns the number of elements in the queue. | ||
| pub fn len(&self) -> usize { | ||
| pub fn local_len(&self) -> usize { | ||
| self.len.load(Ordering::Acquire) | ||
| } | ||
|
|
||
| /// Returns the number of elements in the all queues. | ||
| pub fn len(&self) -> usize { | ||
| let mut full_len = self.shared.len(); | ||
| for local_queue in &self.shared.local_queues { | ||
| for entry in local_queue { | ||
| let worker = entry.value(); | ||
| full_len += worker.capacity() - worker.spare_capacity(); | ||
| } | ||
| } | ||
| full_len | ||
|
loongs-zhang marked this conversation as resolved.
|
||
| } | ||
|
|
||
| fn try_lock(&self) -> bool { | ||
| self.stealing | ||
| .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) | ||
|
|
@@ -255,13 +272,14 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> { | |
| /// for i in 0..4 { | ||
| /// local.push_with_priority(i, i); | ||
| /// } | ||
| /// for i in 0..4 { | ||
| /// assert_eq!(local.pop(), Some(i)); | ||
| /// } | ||
| /// assert_eq!(local.pop(), Some(0)); | ||
| /// assert_eq!(local.pop(), Some(3)); | ||
| /// assert_eq!(local.pop(), Some(1)); | ||
| /// assert_eq!(local.pop(), Some(2)); | ||
| /// assert_eq!(local.pop(), None); | ||
| /// ``` | ||
| pub fn push_with_priority(&self, priority: c_longlong, item: T) { | ||
| if self.is_full() { | ||
| if self.is_local_full() { | ||
| self.push_to_global(priority, item); | ||
| return; | ||
| } | ||
|
|
@@ -275,20 +293,28 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> { | |
| } else { | ||
| //add count | ||
| self.len | ||
| .store(self.len().saturating_add(1), Ordering::Release); | ||
| .store(self.local_len().saturating_add(1), Ordering::Release); | ||
| } | ||
| } | ||
|
|
||
| fn push_to_global(&self, priority: c_longlong, item: T) { | ||
| //把本地队列的一半放到全局队列 | ||
| let count = self.len() / 2; | ||
| for _ in 0..count { | ||
| let count = self.local_len() / 2; | ||
| let mut done = 0; | ||
| while done < count { | ||
| for entry in self.queue.iter().rev() { | ||
| if done >= count { | ||
| break; | ||
| } | ||
| if let Some(item) = entry.value().pop() { | ||
| self.shared.push_with_priority(*entry.key(), item); | ||
| done += 1; | ||
| } | ||
| } | ||
| } | ||
|
Comment on lines
301
to
314
|
||
| // refresh count | ||
| self.len | ||
| .store(self.local_len().saturating_sub(count), Ordering::Release); | ||
| //直接放到全局队列 | ||
| self.shared.push_with_priority(priority, item); | ||
| } | ||
|
|
@@ -332,12 +358,12 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> { | |
| /// for i in 2..6 { | ||
| /// local0.push_with_priority(i, i); | ||
| /// } | ||
| /// assert_eq!(local0.len(), 4); | ||
| /// assert_eq!(local0.local_len(), 4); | ||
| /// let local1 = queue.local_queue(); | ||
| /// for i in 0..2 { | ||
| /// local1.push_with_priority(i, i); | ||
| /// } | ||
| /// assert_eq!(local1.len(), 2); | ||
| /// assert_eq!(local1.local_len(), 2); | ||
| /// for i in 0..6 { | ||
| /// assert_eq!(local1.pop(), Some(i)); | ||
| /// } | ||
|
|
@@ -397,6 +423,13 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> { | |
| }) | ||
| .is_ok() | ||
| { | ||
| // refresh local len | ||
| self.len.store( | ||
| self.local_len().saturating_add( | ||
| into_queue.capacity() - into_queue.spare_capacity(), | ||
| ), | ||
| Ordering::Release, | ||
| ); | ||
|
Comment on lines
+426
to
+432
|
||
| self.release_lock(); | ||
| return self.pop_local(); | ||
| } | ||
|
|
@@ -415,7 +448,7 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> { | |
| if let Some(val) = entry.value().pop() { | ||
| // Decrement the count. | ||
| self.len | ||
| .store(self.len().saturating_sub(1), Ordering::Release); | ||
| .store(self.local_len().saturating_sub(1), Ordering::Release); | ||
| return Some(val); | ||
| } | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.