Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 45 additions & 36 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,13 @@
#![doc = include_str!("../README.md")]
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
#![feature(thread_raw)]

use std::{
future::{Future, IntoFuture},
sync::Arc,
task::{Context, Poll, Wake, Waker},
thread,
task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
thread::{self, Thread},
};

thread_local! {
// A local reusable signal for each thread.
static LOCAL_THREAD_SIGNAL: Arc<Signal> = Arc::new(Signal {
owning_thread: thread::current(),
});
}

#[cfg(feature = "macro")]
pub use pollster_macro::{main, test};

Expand All @@ -41,20 +34,37 @@ pub trait FutureExt: Future {

impl<F: Future> FutureExt for F {}

struct Signal {
/// The thread that owns the signal.
owning_thread: thread::Thread,
}

impl Wake for Signal {
fn wake(self: Arc<Self>) {
self.owning_thread.unpark();
}

fn wake_by_ref(self: &Arc<Self>) {
self.owning_thread.unpark();
}
}
static RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
// clone
|ptr| unsafe {
// Reify ownership of `Thread`
// TODO: It is not entirely clear if this is legal in a multi-threaded context
let thread = Thread::from_raw(ptr);
// Clone the `Thread`
let clone = thread.clone();
// Forget the previously owned instance, since cloning doesn't take ownership
core::mem::forget(thread);
RawWaker::new(clone.into_raw(), &RAW_WAKER_VTABLE)
},
// wake - take ownership and unpark
|ptr| unsafe {
Thread::from_raw(ptr).unpark();
},
// wake_by_ref
|ptr| unsafe {
// Reify ownership of `Thread`
// TODO: It is not entirely clear if this is legal in a multi-threaded context
let thread = Thread::from_raw(ptr);
// Clone the `Thread`, unpark the clone
thread.clone().unpark();
// Forget the previously owned instance, since wake_by_ref doesn't take ownership
core::mem::forget(thread);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The clone() above will increment the reference count, but this forget prevents it from being decremented, so this will leak the Thread internals.

The clone() above has to be removed for this function to be leak-free.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct. If I ever get round to work on this again I'll correct this.

},
// drop - take ownership and drop
|ptr| unsafe {
Thread::from_raw(ptr);
},
);

/// Block the thread until the future is ready.
///
Expand All @@ -67,18 +77,17 @@ impl Wake for Signal {
pub fn block_on<F: IntoFuture>(fut: F) -> F::Output {
let mut fut = core::pin::pin!(fut.into_future());

// A signal used to wake up the thread for polling as the future moves to completion.
LOCAL_THREAD_SIGNAL.with(|signal| {
// Create a waker and a context to be passed to the future.
let waker = Waker::from(Arc::clone(signal));
let mut context = Context::from_waker(&waker);
// Create a waker and a context to be passed to the future.
// SAFETY: Use of `Thread::from_raw` and `Thread::into_raw` is
// thread-safe and respects ownership requirements.
let waker = unsafe { Waker::new(thread::current().into_raw(), &RAW_WAKER_VTABLE) };
let mut context = Context::from_waker(&waker);

// Poll the future to completion.
loop {
match fut.as_mut().poll(&mut context) {
Poll::Pending => thread::park(),
Poll::Ready(item) => break item,
}
// Poll the future to completion.
loop {
match fut.as_mut().poll(&mut context) {
Poll::Pending => thread::park(),
Poll::Ready(item) => break item,
}
})
}
}
Loading