Skip to content

Commit 4484b46

Browse files
Bounded spsc channel
1 parent 5611f7a commit 4484b46

5 files changed

Lines changed: 316 additions & 26 deletions

File tree

src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,11 @@ pub mod prelude {
88
pub use crate::sealed;
99
pub use crate::shared::*;
1010
pub use crate::stopwatch::Stopwatch;
11-
pub use crate::sync::channel as local_channel;
11+
pub use crate::sync::bounded as local_bounded;
1212
pub use crate::sync::condvar as local_condvar;
1313
pub use crate::sync::oneshot as local_oneshot;
1414
pub use crate::sync::semaphore as local_semaphore;
15+
pub use crate::sync::unbounded as local_unbounded;
1516
pub use crate::{
1617
debug_stopwatch, error_stopwatch, info_stopwatch, trace_stopwatch, warn_stopwatch,
1718
};

src/sync/bounded.rs

Lines changed: 296 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,296 @@
1+
use crate::shared::UnsafeShared;
2+
use futures::Stream;
3+
use std::cell::UnsafeCell;
4+
use std::collections::VecDeque;
5+
use std::rc::Rc;
6+
use std::task::{Context, Poll, Waker};
7+
use std::{fmt, io};
8+
use std::{future::poll_fn, pin::Pin};
9+
10+
struct State<T> {
11+
queue: VecDeque<T>,
12+
tx_waker: Option<Waker>,
13+
rx_waker: Option<Waker>,
14+
has_tx: bool,
15+
has_rx: bool,
16+
}
17+
18+
fn replace_waker(old_waker: &mut Option<Waker>, cx: &mut Context) {
19+
if old_waker.as_ref().is_none_or(|w| !w.will_wake(cx.waker())) {
20+
old_waker.replace(cx.waker().clone());
21+
}
22+
}
23+
24+
fn take_and_wake(waker: &mut Option<Waker>) {
25+
waker.take().inspect(Waker::wake_by_ref);
26+
}
27+
28+
#[derive(PartialEq, Eq)]
29+
pub enum TrySendError<T> {
30+
Full(T),
31+
Closed(T),
32+
}
33+
34+
#[derive(PartialEq, Eq)]
35+
pub enum SendError<T> {
36+
Closed(T),
37+
}
38+
39+
/// Bounded SPSC channel
40+
pub fn channel<T>(limit: usize) -> (Sender<T>, Receiver<T>) {
41+
let shared = Rc::new(UnsafeCell::new(State {
42+
queue: VecDeque::with_capacity(limit),
43+
tx_waker: None,
44+
rx_waker: None,
45+
has_tx: true,
46+
has_rx: true,
47+
}));
48+
(Sender(shared.clone()), Receiver(shared))
49+
}
50+
51+
pub struct Sender<T>(Rc<UnsafeCell<State<T>>>);
52+
53+
impl<T> Sender<T> {
54+
pub async fn send(&mut self, item: T) -> Result<(), SendError<T>> {
55+
let can_send = poll_fn(|cx| self.poll_ready(cx)).await;
56+
if can_send {
57+
unsafe {
58+
self.0.with_unchecked(|state| {
59+
state.queue.push_back(item);
60+
take_and_wake(&mut state.rx_waker);
61+
})
62+
}
63+
Ok(())
64+
} else {
65+
Err(SendError::Closed(item))
66+
}
67+
}
68+
69+
pub async fn closed(&mut self) {
70+
poll_fn(|cx| self.poll_closed(cx)).await
71+
}
72+
73+
pub fn try_send(&mut self, item: T) -> Result<(), TrySendError<T>> {
74+
unsafe {
75+
self.0.with_unchecked(|state| {
76+
if !state.has_rx {
77+
Err(TrySendError::Closed(item))
78+
} else if state.queue.len() < state.queue.capacity() {
79+
state.queue.push_back(item);
80+
take_and_wake(&mut state.rx_waker);
81+
Ok(())
82+
} else {
83+
Err(TrySendError::Full(item))
84+
}
85+
})
86+
}
87+
}
88+
89+
pub fn is_closed(&self) -> bool {
90+
unsafe { !(&*self.0.get()).has_rx }
91+
}
92+
93+
fn poll_ready(&mut self, cx: &mut Context) -> Poll<bool> {
94+
unsafe {
95+
self.0.with_unchecked(|state| {
96+
if !state.has_rx {
97+
Poll::Ready(false)
98+
} else if state.queue.len() < state.queue.capacity() {
99+
Poll::Ready(true)
100+
} else {
101+
replace_waker(&mut state.tx_waker, cx);
102+
Poll::Pending
103+
}
104+
})
105+
}
106+
}
107+
108+
fn poll_closed(&mut self, cx: &mut Context) -> Poll<()> {
109+
unsafe {
110+
self.0.with_unchecked(|state| {
111+
if !state.has_rx {
112+
Poll::Ready(())
113+
} else {
114+
replace_waker(&mut state.tx_waker, cx);
115+
Poll::Pending
116+
}
117+
})
118+
}
119+
}
120+
}
121+
122+
impl<T> Drop for Sender<T> {
123+
fn drop(&mut self) {
124+
unsafe {
125+
self.0.with_unchecked(|state| {
126+
state.has_tx = false;
127+
state.tx_waker = None;
128+
take_and_wake(&mut state.rx_waker);
129+
})
130+
}
131+
}
132+
}
133+
134+
pub struct Receiver<T>(Rc<UnsafeCell<State<T>>>);
135+
136+
impl<T> Receiver<T> {
137+
pub fn is_closed(&self) -> bool {
138+
unsafe { !(&*self.0.get()).has_tx }
139+
}
140+
}
141+
142+
impl<T> Stream for Receiver<T> {
143+
type Item = T;
144+
145+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
146+
unsafe {
147+
self.0.with_unchecked(|state| {
148+
if let Some(item) = state.queue.pop_front() {
149+
take_and_wake(&mut state.tx_waker);
150+
Poll::Ready(Some(item))
151+
} else if !state.has_tx {
152+
Poll::Ready(None)
153+
} else {
154+
replace_waker(&mut state.rx_waker, cx);
155+
Poll::Pending
156+
}
157+
})
158+
}
159+
}
160+
}
161+
162+
impl<T> Drop for Receiver<T> {
163+
fn drop(&mut self) {
164+
unsafe {
165+
self.0.with_unchecked(|state| {
166+
state.has_rx = false;
167+
take_and_wake(&mut state.tx_waker);
168+
})
169+
}
170+
}
171+
}
172+
173+
impl<T> fmt::Debug for TrySendError<T> {
174+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
175+
match self {
176+
TrySendError::Full(_) => f.write_str("TrySendError::Full(..)"),
177+
TrySendError::Closed(_) => f.write_str("TrySendError::Closed(..)"),
178+
}
179+
}
180+
}
181+
182+
impl<T> fmt::Display for TrySendError<T> {
183+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
184+
match self {
185+
TrySendError::Full(_) => f.write_str("channel is full"),
186+
TrySendError::Closed(_) => f.write_str("channel is closed"),
187+
}
188+
}
189+
}
190+
191+
impl<T> std::error::Error for TrySendError<T> {}
192+
193+
impl<T> From<TrySendError<T>> for io::Error {
194+
fn from(err: TrySendError<T>) -> Self {
195+
let source = format!("{err}");
196+
match err {
197+
TrySendError::Full(_) => io::Error::new(io::ErrorKind::StorageFull, source),
198+
TrySendError::Closed(_) => io::Error::new(io::ErrorKind::BrokenPipe, source),
199+
}
200+
}
201+
}
202+
203+
impl<T> fmt::Debug for SendError<T> {
204+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
205+
match self {
206+
SendError::Closed(_) => f.write_str("SendError::Closed(..)"),
207+
}
208+
}
209+
}
210+
211+
impl<T> fmt::Display for SendError<T> {
212+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
213+
match self {
214+
SendError::Closed(_) => f.write_str("channel is closed"),
215+
}
216+
}
217+
}
218+
219+
impl<T> std::error::Error for SendError<T> {}
220+
221+
impl<T> From<SendError<T>> for io::Error {
222+
fn from(err: SendError<T>) -> Self {
223+
let source = format!("{err}");
224+
match err {
225+
SendError::Closed(_) => io::Error::new(io::ErrorKind::BrokenPipe, source),
226+
}
227+
}
228+
}
229+
230+
#[cfg(test)]
231+
mod tests {
232+
use super::*;
233+
use static_assertions::assert_not_impl_any;
234+
use std::sync::Arc;
235+
use tokio_test::task::spawn;
236+
use tokio_test::{assert_pending, assert_ready};
237+
238+
#[test]
239+
fn test_channel_static_properties() {
240+
assert_not_impl_any!(Arc<Sender<usize>>: std::marker::Send, Sync);
241+
assert_not_impl_any!(Arc<Receiver<usize>>: std::marker::Send, Sync);
242+
assert_not_impl_any!(Sender<usize>: std::marker::Send, Sync, Clone);
243+
assert_not_impl_any!(Receiver<usize>: std::marker::Send, Sync, Clone);
244+
}
245+
246+
#[test]
247+
fn test_sender_notifies_receiver() {
248+
let (mut sender, receiver) = channel::<i32>(2);
249+
250+
let mut receiver = spawn(receiver);
251+
assert_pending!(receiver.poll_next());
252+
253+
assert_eq!(Ok(()), assert_ready!(spawn(sender.send(42)).poll()));
254+
assert!(receiver.is_woken());
255+
assert_eq!(Some(42), assert_ready!(receiver.poll_next()));
256+
assert_pending!(receiver.poll_next());
257+
258+
drop(sender);
259+
assert!(receiver.is_woken());
260+
assert_eq!(None, assert_ready!(receiver.poll_next()));
261+
assert!(receiver.is_closed());
262+
}
263+
264+
#[test]
265+
fn test_receiver_notifies_sender() {
266+
let (mut sender, receiver) = channel::<i32>(1);
267+
268+
let mut receiver = spawn(receiver);
269+
assert_pending!(receiver.poll_next());
270+
271+
assert_eq!(Ok(()), assert_ready!(spawn(sender.send(41)).poll()));
272+
let mut send = spawn(sender.send(42));
273+
assert_pending!(send.poll());
274+
275+
assert!(receiver.is_woken());
276+
assert_eq!(Some(41), assert_ready!(receiver.poll_next()));
277+
assert_pending!(receiver.poll_next());
278+
279+
assert!(send.is_woken());
280+
assert_eq!(Ok(()), assert_ready!(send.poll()));
281+
drop(send);
282+
283+
assert!(receiver.is_woken());
284+
assert_eq!(Some(42), assert_ready!(receiver.poll_next()));
285+
286+
assert_eq!(Ok(()), assert_ready!(spawn(sender.send(43)).poll()));
287+
let mut send = spawn(sender.send(44));
288+
assert_pending!(send.poll());
289+
290+
drop(receiver);
291+
assert!(send.is_woken());
292+
assert_eq!(Err(SendError::Closed(44)), assert_ready!(send.poll()));
293+
drop(send);
294+
assert!(sender.is_closed());
295+
}
296+
}

src/sync/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
pub mod channel;
1+
pub mod bounded;
22
pub mod condvar;
33
pub mod oneshot;
44
pub mod semaphore;
55
mod shared_state;
6+
pub mod unbounded;

src/sync/oneshot.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ pub struct Sender<T>(StateRc<T>);
3030

3131
pub struct Receiver<T>(StateRc<T>);
3232

33-
pub fn oneshot<T>() -> (Sender<T>, Receiver<T>) {
33+
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
3434
let state = SharedState::new(Data {
3535
value: Cell::new(None),
3636
has_sender: Cell::new(true),

0 commit comments

Comments
 (0)