Skip to content

Commit b2153e7

Browse files
Fix trait Source
1 parent c918d73 commit b2153e7

5 files changed

Lines changed: 35 additions & 38 deletions

File tree

src/sync/channel.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use super::shared_state::{SharedState, Source};
22
use crate::sealed;
33
use std::cell::Cell;
4+
use std::ops::ControlFlow;
45
use std::pin::Pin;
56
use std::rc::Rc;
67
use std::task::{Context, Poll};
@@ -14,12 +15,14 @@ struct Data<T> {
1415
impl<T> Source for Data<T> {
1516
type Item = T;
1617

17-
fn closed(&self) -> bool {
18-
self.sender_count.get() == 0
19-
}
20-
21-
fn extract_item(&self) -> Option<Self::Item> {
22-
self.queue.pop()
18+
fn try_yield_one(&self) -> ControlFlow<Option<Self::Item>> {
19+
if let Some(item) = self.queue.pop() {
20+
ControlFlow::Break(Some(item))
21+
} else if self.sender_count.get() == 0 {
22+
ControlFlow::Break(None)
23+
} else {
24+
ControlFlow::Continue(())
25+
}
2326
}
2427
}
2528

src/sync/condvar.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use super::shared_state::{SharedState, Source};
22
use futures::FutureExt;
33
use std::cell::Cell;
44
use std::future::{poll_fn, Future};
5+
use std::ops::ControlFlow;
56
use std::rc::Rc;
67

78
struct Data {
@@ -14,16 +15,13 @@ struct Data {
1415
impl Source for Data {
1516
type Item = ();
1617

17-
fn closed(&self) -> bool {
18-
!self.has_sender.get()
19-
}
20-
21-
fn extract_item(&self) -> Option<Self::Item> {
22-
if !self.closed() && self.notified.get() {
23-
self.notified.set(false);
24-
Some(())
18+
fn try_yield_one(&self) -> ControlFlow<Option<Self::Item>> {
19+
if !self.has_sender.get() {
20+
ControlFlow::Break(None)
21+
} else if self.notified.replace(false) {
22+
ControlFlow::Break(Some(()))
2523
} else {
26-
None
24+
ControlFlow::Continue(())
2725
}
2826
}
2927
}

src/sync/oneshot.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use super::shared_state::{SharedState, Source};
22
use std::cell::Cell;
33
use std::future::Future;
4+
use std::ops::ControlFlow;
45
use std::pin::Pin;
56
use std::rc::Rc;
67
use std::task::{Context, Poll};
@@ -14,12 +15,12 @@ struct Data<T> {
1415
impl<T> Source for Data<T> {
1516
type Item = T;
1617

17-
fn closed(&self) -> bool {
18-
!self.has_sender.get()
19-
}
20-
21-
fn extract_item(&self) -> Option<Self::Item> {
22-
self.value.replace(None)
18+
fn try_yield_one(&self) -> ControlFlow<Option<Self::Item>> {
19+
match self.value.take() {
20+
Some(value) => ControlFlow::Break(Some(value)),
21+
None if !self.has_sender.get() => ControlFlow::Break(None),
22+
None => ControlFlow::Continue(()),
23+
}
2324
}
2425
}
2526

src/sync/semaphore.rs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use super::shared_state::{SharedState, Source};
22
use futures::FutureExt;
33
use std::cell::Cell;
44
use std::future::{poll_fn, Future};
5+
use std::ops::ControlFlow;
56
use std::rc::Rc;
67

78
struct Data {
@@ -14,17 +15,14 @@ struct Data {
1415
impl Source for Data {
1516
type Item = ();
1617

17-
fn closed(&self) -> bool {
18-
!self.has_sender.get()
19-
}
20-
21-
fn extract_item(&self) -> Option<Self::Item> {
22-
let current_capacity = self.capacity.get();
23-
if !self.closed() && current_capacity > 0 {
24-
self.capacity.set(current_capacity - 1);
25-
Some(())
18+
fn try_yield_one(&self) -> ControlFlow<Option<Self::Item>> {
19+
if !self.has_sender.get() {
20+
ControlFlow::Break(None)
21+
} else if self.capacity.get() > 0 {
22+
self.capacity.update(|cap| cap - 1);
23+
ControlFlow::Break(Some(()))
2624
} else {
27-
None
25+
ControlFlow::Continue(())
2826
}
2927
}
3028
}

src/sync/shared_state.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
use std::cell::Cell;
2-
use std::ops::Deref;
2+
use std::ops::{ControlFlow, Deref};
33
use std::rc::Rc;
44
use std::task::{Context, Poll, Waker};
55

66
pub(super) trait Source {
77
type Item;
8-
fn closed(&self) -> bool;
9-
fn extract_item(&self) -> Option<Self::Item>;
8+
fn try_yield_one(&self) -> ControlFlow<Option<Self::Item>>;
109
}
1110

1211
pub(super) struct SharedState<T> {
@@ -36,10 +35,8 @@ impl<T: Source> SharedState<T> {
3635
// This should NEVER be called concurrently from different futures/tasks,
3736
// because we store only 1 waker
3837
pub(super) fn poll_wait(self: &mut Rc<Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> {
39-
if let Some(item) = self.inner.extract_item() {
40-
Poll::Ready(Some(item))
41-
} else if self.inner.closed() {
42-
Poll::Ready(None)
38+
if let ControlFlow::Break(output) = self.inner.try_yield_one() {
39+
Poll::Ready(output)
4340
} else {
4441
let new_waker = match self.waker.replace(None) {
4542
Some(waker) if waker.will_wake(cx.waker()) => waker,

0 commit comments

Comments
 (0)