diff --git a/Cargo.toml b/Cargo.toml index a583400..a263c57 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,9 @@ license = "Apache-2.0/MIT" readme = "README.md" [features] +default = ["std"] macro = ["pollster-macro"] +std = [] [dependencies] pollster-macro = { version = "0.1", path = "macro", optional = true } diff --git a/src/lib.rs b/src/lib.rs index d864467..ed44563 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,10 +1,17 @@ #![doc = include_str!("../README.md")] #![cfg_attr(docsrs, feature(doc_auto_cfg))] +#![cfg_attr(not(feature = "std"), no_std)] -use std::{ +use core::{ future::Future, - sync::{Arc, Condvar, Mutex}, - task::{Context, Poll, Wake, Waker}, + mem, + task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, +}; + +#[cfg(feature = "std")] +use std::{ + sync::Arc, + thread::{self, Thread}, }; #[cfg(feature = "macro")] @@ -23,109 +30,204 @@ pub trait FutureExt: Future { /// /// let result = my_fut.block_on(); /// ``` - fn block_on(self) -> Self::Output where Self: Sized { block_on(self) } + fn block_on(self) -> Self::Output + where + Self: Sized, + { + block_on(self) + } + + /// Block the thread until the future is ready with custom thread parking implementation. + /// + /// This allows one to use custom thread parking mechanisms in `no_std` environments. + /// + /// # Example + /// + /// ``` + /// use pollster::FutureExt as _; + /// + /// let my_fut = async {}; + /// + /// let result = my_fut.block_on_t::<()>(); + /// ``` + fn block_on_t(self) -> Self::Output + where + Self: Sized, + { + block_on_t::(self) + } } impl FutureExt for F {} -enum SignalState { - Empty, - Waiting, - Notified, +/// Parkable handle. +/// +/// This handle allows a thread to potentially be efficiently blocked. This is used in the polling +/// implementation to wait for wakeups. +/// +/// The interface models that of `std::thread`, and many functions, such as +/// [`current`](Parkable::current), [`park`](Parkable::park), and [`unpark`](Parkable::unpark) +/// map to `std::thread` equivalents. +pub trait Parkable: Sized + Clone { + /// Get handle to current thread. + fn current() -> Self; + + /// Park the current thread. + fn park(); + + /// Unpark specified thread. + fn unpark(&self); + + /// Convert self into opaque pointer. + /// + /// This requires `Self` to either be layout compatible with `*const ()`. + fn into_opaque(self) -> *const (); + + /// Convert opaque pointer into `Self`. + /// + /// # Safety + /// + /// The implementation is safe if: + /// + /// - `data` argument is a valid park handle created by `Self::into_opaque`. + /// - `assert_eq!(into_opaque(from_opaque(data)), data)` holds for any data pointer. + unsafe fn from_opaque(data: *const ()) -> Self; + + /// Create a waker out of `self` + /// + /// This function will clone self and build a `Waker` object. + fn waker(&self) -> Waker { + let data = self.clone().into_opaque(); + // SAFETY: `RawWaker` created by `raw_waker` builds a waker object out of the raw data and + // vtable methods of this type which we assume are correct. + unsafe { Waker::from_raw(raw_waker::(data)) } + } } -struct Signal { - state: Mutex, - cond: Condvar, +pub type DefaultHandle = Arc; +#[cfg(not(feature = "std"))] +pub type DefaultHandle = (); + +fn raw_waker(data: *const ()) -> RawWaker { + RawWaker::new( + data, + &RawWakerVTable::new( + clone_waker::, + wake::, + wake_by_ref::, + drop_waker::, + ), + ) } -impl Signal { - fn new() -> Self { - Self { - state: Mutex::new(SignalState::Empty), - cond: Condvar::new(), - } +unsafe fn clone_waker(data: *const ()) -> RawWaker { + let waker = T::from_opaque(data); + let _ = T::into_opaque(waker.clone()); + let _ = T::into_opaque(waker); + raw_waker::(data) +} + +unsafe fn wake(data: *const ()) { + let waker = T::from_opaque(data); + waker.unpark(); +} + +unsafe fn wake_by_ref(data: *const ()) { + let waker = T::from_opaque(data); + waker.unpark(); + let _ = T::into_opaque(waker); +} + +unsafe fn drop_waker(data: *const ()) { + let _ = T::from_opaque(data); +} + +// Needs to be an Arc, because converting Thread to *const () is UB +// See: https://github.com/zesterer/pollster/pull/19 +// +// This may be switched once `thread_raw` features is merged and stabilized. See: +// - https://github.com/rust-lang/libs-team/issues/200 +// - https://github.com/rust-lang/rust/pull/97524 +#[cfg(feature = "std")] +impl Parkable for Arc { + fn current() -> Self { + thread::current().into() } - fn wait(&self) { - let mut state = self.state.lock().unwrap(); - match *state { - // Notify() was called before we got here, consume it here without waiting and return immediately. - SignalState::Notified => *state = SignalState::Empty, - // This should not be possible because our signal is created within a function and never handed out to any - // other threads. If this is the case, we have a serious problem so we panic immediately to avoid anything - // more problematic happening. - SignalState::Waiting => { - unreachable!("Multiple threads waiting on the same signal: Open a bug report!"); - } - SignalState::Empty => { - // Nothing has happened yet, and we're the only thread waiting (as should be the case!). Set the state - // accordingly and begin polling the condvar in a loop until it's no longer telling us to wait. The - // loop prevents incorrect spurious wakeups. - *state = SignalState::Waiting; - while let SignalState::Waiting = *state { - state = self.cond.wait(state).unwrap(); - } - } - } + fn park() { + thread::park(); } - fn notify(&self) { - let mut state = self.state.lock().unwrap(); - match *state { - // The signal was already notified, no need to do anything because the thread will be waking up anyway - SignalState::Notified => {} - // The signal wasn't notified but a thread isn't waiting on it, so we can avoid doing unnecessary work by - // skipping the condvar and leaving behind a message telling the thread that a notification has already - // occurred should it come along in the future. - SignalState::Empty => *state = SignalState::Notified, - // The signal wasn't notified and there's a waiting thread. Reset the signal so it can be wait()'ed on again - // and wake up the thread. Because there should only be a single thread waiting, `notify_all` would also be - // valid. - SignalState::Waiting => { - *state = SignalState::Empty; - self.cond.notify_one(); - } - } + fn unpark(&self) { + Thread::unpark(&**self); + } + + fn into_opaque(self) -> *const () { + Arc::into_raw(self) as *const () + } + + unsafe fn from_opaque(data: *const ()) -> Self { + Arc::from_raw(data as *const Thread) } } -impl Wake for Signal { - fn wake(self: Arc) { - self.notify(); +impl Parkable for () { + fn current() -> Self {} + + fn park() { + core::hint::spin_loop() + } + + fn unpark(&self) {} + + fn into_opaque(self) -> *const () { + &() + } + + unsafe fn from_opaque(_: *const ()) -> Self { + () } } -/// Block the thread until the future is ready. +/// Block the thread until the future is ready with custom parking implementation. +/// +/// This allows one to use custom thread parking mechanisms in `no_std` environments. /// /// # Example /// /// ``` /// let my_fut = async {}; -/// let result = pollster::block_on(my_fut); +/// let result = pollster::block_on_t::<(), _>(my_fut); /// ``` -pub fn block_on(mut fut: F) -> F::Output { +pub fn block_on_t(mut fut: F) -> F::Output { // Pin the future so that it can be polled. // SAFETY: We shadow `fut` so that it cannot be used again. The future is now pinned to the stack and will not be // moved until the end of this scope. This is, incidentally, exactly what the `pin_mut!` macro from `pin_utils` // does. let mut fut = unsafe { std::pin::Pin::new_unchecked(&mut fut) }; - // Signal used to wake up the thread for polling as the future moves to completion. We need to use an `Arc` - // because, although the lifetime of `fut` is limited to this function, the underlying IO abstraction might keep - // the signal alive for far longer. `Arc` is a thread-safe way to allow this to happen. - // TODO: Investigate ways to reuse this `Arc`... perhaps via a `static`? - let signal = Arc::new(Signal::new()); + let handle = T::current(); - // Create a context that will be passed to the future. - let waker = Waker::from(Arc::clone(&signal)); + let waker: Waker = handle.waker(); let mut context = Context::from_waker(&waker); // Poll the future to completion loop { match fut.as_mut().poll(&mut context) { - Poll::Pending => signal.wait(), + Poll::Pending => T::park(), Poll::Ready(item) => break item, } } } + +/// Block the thread until the future is ready. +/// +/// # Example +/// +/// ``` +/// let my_fut = async {}; +/// let result = pollster::block_on(my_fut); +/// ``` +pub fn block_on(fut: F) -> F::Output { + return block_on_t::(fut); +}