diff --git a/crates/core/src/abi.rs b/crates/core/src/abi.rs index 177a9ec04..6e866671b 100644 --- a/crates/core/src/abi.rs +++ b/crates/core/src/abi.rs @@ -554,6 +554,10 @@ def_instruction! { blocks: usize, } : [1] => [0], + /// Deallocates the language-specific handle representation on the top + /// of the stack. Used for async imports. + DropHandle { ty: &'a Type } : [1] => [0], + /// Call `task.return` for an async-lifted export. /// /// This will call core wasm import `name` which will be mapped to @@ -783,33 +787,37 @@ pub fn post_return(resolve: &Resolve, func: &Function, bindgen: &mut impl Bindge /// a list or a string primarily. pub fn guest_export_needs_post_return(resolve: &Resolve, func: &Function) -> bool { func.result - .map(|t| needs_post_return(resolve, &t)) + .map(|t| needs_deallocate(resolve, &t, Deallocate::Lists)) .unwrap_or(false) } -fn needs_post_return(resolve: &Resolve, ty: &Type) -> bool { +fn needs_deallocate(resolve: &Resolve, ty: &Type, what: Deallocate) -> bool { match ty { Type::String => true, Type::ErrorContext => true, Type::Id(id) => match &resolve.types[*id].kind { TypeDefKind::List(_) => true, - TypeDefKind::Type(t) => needs_post_return(resolve, t), - TypeDefKind::Handle(_) => false, + TypeDefKind::Type(t) => needs_deallocate(resolve, t, what), + TypeDefKind::Handle(Handle::Own(_)) => what.handles(), + TypeDefKind::Handle(Handle::Borrow(_)) => false, TypeDefKind::Resource => false, - TypeDefKind::Record(r) => r.fields.iter().any(|f| needs_post_return(resolve, &f.ty)), - TypeDefKind::Tuple(t) => t.types.iter().any(|t| needs_post_return(resolve, t)), + TypeDefKind::Record(r) => r + .fields + .iter() + .any(|f| needs_deallocate(resolve, &f.ty, what)), + TypeDefKind::Tuple(t) => t.types.iter().any(|t| needs_deallocate(resolve, t, what)), TypeDefKind::Variant(t) => t .cases .iter() .filter_map(|t| t.ty.as_ref()) - .any(|t| needs_post_return(resolve, t)), - TypeDefKind::Option(t) => needs_post_return(resolve, t), + .any(|t| needs_deallocate(resolve, t, what)), + TypeDefKind::Option(t) => needs_deallocate(resolve, t, what), TypeDefKind::Result(t) => [&t.ok, &t.err] .iter() .filter_map(|t| t.as_ref()) - .any(|t| needs_post_return(resolve, t)), + .any(|t| needs_deallocate(resolve, t, what)), TypeDefKind::Flags(_) | TypeDefKind::Enum(_) => false, - TypeDefKind::Future(_) | TypeDefKind::Stream(_) => false, + TypeDefKind::Future(_) | TypeDefKind::Stream(_) => what.handles(), TypeDefKind::Unknown => unreachable!(), }, @@ -836,7 +844,18 @@ pub fn deallocate_lists_in_types( ptr: B::Operand, bindgen: &mut B, ) { - Generator::new(resolve, bindgen).deallocate_lists_in_types(types, ptr); + Generator::new(resolve, bindgen).deallocate_in_types(types, ptr, Deallocate::Lists); +} + +/// Generate instructions in `bindgen` to deallocate all lists in `ptr` where +/// that's a pointer to a sequence of `types` stored in linear memory. +pub fn deallocate_lists_and_own_in_types( + resolve: &Resolve, + types: &[Type], + ptr: B::Operand, + bindgen: &mut B, +) { + Generator::new(resolve, bindgen).deallocate_in_types(types, ptr, Deallocate::ListsAndOwn); } #[derive(Copy, Clone)] @@ -845,6 +864,25 @@ pub enum Realloc { Export(&'static str), } +/// What to deallocate in various `deallocate_*` methods. +#[derive(Copy, Clone)] +enum Deallocate { + /// Only deallocate lists. + Lists, + /// Deallocate lists and owned resources such as `own` and + /// futures/streams. + ListsAndOwn, +} + +impl Deallocate { + fn handles(&self) -> bool { + match self { + Deallocate::Lists => false, + Deallocate::ListsAndOwn => true, + } + } +} + struct Generator<'a, B: Bindgen> { bindgen: &'a mut B, resolve: &'a Resolve, @@ -1166,14 +1204,14 @@ impl<'a, B: Bindgen> Generator<'a, B> { let mut types = Vec::new(); types.extend(func.result); - self.deallocate_lists_in_types(&types, addr); + self.deallocate_in_types(&types, addr, Deallocate::Lists); self.emit(&Instruction::Return { func, amt: 0 }); } - fn deallocate_lists_in_types(&mut self, types: &[Type], addr: B::Operand) { + fn deallocate_in_types(&mut self, types: &[Type], addr: B::Operand, what: Deallocate) { for (offset, ty) in self.bindgen.sizes().field_offsets(types) { - self.deallocate(ty, addr.clone(), offset); + self.deallocate(ty, addr.clone(), offset, what); } assert!( @@ -1973,12 +2011,18 @@ impl<'a, B: Bindgen> Generator<'a, B> { }); } - fn deallocate(&mut self, ty: &Type, addr: B::Operand, offset: ArchitectureSize) { + fn deallocate( + &mut self, + ty: &Type, + addr: B::Operand, + offset: ArchitectureSize, + what: Deallocate, + ) { use Instruction::*; // No need to execute any instructions if this type itself doesn't // require any form of post-return. - if !needs_post_return(self.resolve, ty) { + if !needs_deallocate(self.resolve, ty, what) { return; } @@ -2008,7 +2052,7 @@ impl<'a, B: Bindgen> Generator<'a, B> { | Type::ErrorContext => {} Type::Id(id) => match &self.resolve.types[id].kind { - TypeDefKind::Type(t) => self.deallocate(t, addr, offset), + TypeDefKind::Type(t) => self.deallocate(t, addr, offset, what), TypeDefKind::List(element) => { self.stack.push(addr.clone()); @@ -2021,30 +2065,36 @@ impl<'a, B: Bindgen> Generator<'a, B> { self.push_block(); self.emit(&IterBasePointer); let elemaddr = self.stack.pop().unwrap(); - self.deallocate(element, elemaddr, Default::default()); + self.deallocate(element, elemaddr, Default::default(), what); self.finish_block(0); self.emit(&Instruction::GuestDeallocateList { element }); } - TypeDefKind::Handle(_) => { - todo!() + TypeDefKind::Handle(Handle::Own(_)) + | TypeDefKind::Future(_) + | TypeDefKind::Stream(_) + if what.handles() => + { + self.read_from_memory(ty, addr, offset); + self.emit(&DropHandle { ty }); } - TypeDefKind::Resource => { - todo!() - } + TypeDefKind::Handle(Handle::Own(_)) => unreachable!(), + TypeDefKind::Handle(Handle::Borrow(_)) => unreachable!(), + TypeDefKind::Resource => unreachable!(), TypeDefKind::Record(record) => { self.deallocate_fields( &record.fields.iter().map(|f| f.ty).collect::>(), addr, offset, + what, ); } TypeDefKind::Tuple(tuple) => { - self.deallocate_fields(&tuple.types, addr, offset); + self.deallocate_fields(&tuple.types, addr, offset, what); } TypeDefKind::Flags(_) => {} @@ -2055,6 +2105,7 @@ impl<'a, B: Bindgen> Generator<'a, B> { addr, variant.tag(), variant.cases.iter().map(|c| c.ty.as_ref()), + what, ); self.emit(&GuestDeallocateVariant { blocks: variant.cases.len(), @@ -2062,19 +2113,25 @@ impl<'a, B: Bindgen> Generator<'a, B> { } TypeDefKind::Option(t) => { - self.deallocate_variant(offset, addr, Int::U8, [None, Some(t)]); + self.deallocate_variant(offset, addr, Int::U8, [None, Some(t)], what); self.emit(&GuestDeallocateVariant { blocks: 2 }); } TypeDefKind::Result(e) => { - self.deallocate_variant(offset, addr, Int::U8, [e.ok.as_ref(), e.err.as_ref()]); + self.deallocate_variant( + offset, + addr, + Int::U8, + [e.ok.as_ref(), e.err.as_ref()], + what, + ); self.emit(&GuestDeallocateVariant { blocks: 2 }); } TypeDefKind::Enum(_) => {} - TypeDefKind::Future(_) => todo!("read future from memory"), - TypeDefKind::Stream(_) => todo!("read stream from memory"), + TypeDefKind::Future(_) => unreachable!(), + TypeDefKind::Stream(_) => unreachable!(), TypeDefKind::Unknown => unreachable!(), }, } @@ -2086,6 +2143,7 @@ impl<'a, B: Bindgen> Generator<'a, B> { addr: B::Operand, tag: Int, cases: impl IntoIterator> + Clone, + what: Deallocate, ) { self.stack.push(addr.clone()); self.load_intrepr(offset, tag); @@ -2093,15 +2151,21 @@ impl<'a, B: Bindgen> Generator<'a, B> { for ty in cases { self.push_block(); if let Some(ty) = ty { - self.deallocate(ty, addr.clone(), payload_offset); + self.deallocate(ty, addr.clone(), payload_offset, what); } self.finish_block(0); } } - fn deallocate_fields(&mut self, tys: &[Type], addr: B::Operand, offset: ArchitectureSize) { + fn deallocate_fields( + &mut self, + tys: &[Type], + addr: B::Operand, + offset: ArchitectureSize, + what: Deallocate, + ) { for (field_offset, ty) in self.bindgen.sizes().field_offsets(tys) { - self.deallocate(ty, addr.clone(), offset + (field_offset)); + self.deallocate(ty, addr.clone(), offset + (field_offset), what); } } } diff --git a/crates/csharp/src/function.rs b/crates/csharp/src/function.rs index 4b68c0a43..78a084ea2 100644 --- a/crates/csharp/src/function.rs +++ b/crates/csharp/src/function.rs @@ -1262,7 +1262,9 @@ impl Bindgen for FunctionBindgen<'_, '_> { | Instruction::StreamLower { .. } | Instruction::StreamLift { .. } | Instruction::ErrorContextLower { .. } - | Instruction::ErrorContextLift { .. } => todo!(), + | Instruction::ErrorContextLift { .. } + | Instruction::DropHandle { .. } + => todo!(), } } diff --git a/crates/guest-rust/rt/src/async_support.rs b/crates/guest-rust/rt/src/async_support.rs index 1371f486f..ae9270337 100644 --- a/crates/guest-rust/rt/src/async_support.rs +++ b/crates/guest-rust/rt/src/async_support.rs @@ -10,6 +10,7 @@ use std::boxed::Box; use std::collections::HashMap; use std::ffi::c_void; use std::future::Future; +use std::mem; use std::pin::Pin; use std::ptr; use std::sync::Arc; @@ -115,7 +116,9 @@ impl FutureState { !self.waitables.is_empty() } - fn callback(&mut self, event0: u32, event1: u32, event2: u32) -> u32 { + /// Handles the `event{0,1,2}` event codes and returns a corresponding + /// return code along with a flag whether this future is "done" or not. + fn callback(&mut self, event0: u32, event1: u32, event2: u32) -> (u32, bool) { match event0 { EVENT_NONE => rtdebug!("EVENT_NONE"), EVENT_SUBTASK => rtdebug!("EVENT_SUBTASK({event1:#x}, {event2:#x})"), @@ -123,6 +126,15 @@ impl FutureState { EVENT_STREAM_WRITE => rtdebug!("EVENT_STREAM_WRITE({event1:#x}, {event2:#x})"), EVENT_FUTURE_READ => rtdebug!("EVENT_FUTURE_READ({event1:#x}, {event2:#x})"), EVENT_FUTURE_WRITE => rtdebug!("EVENT_FUTURE_WRITE({event1:#x}, {event2:#x})"), + EVENT_CANCEL => { + rtdebug!("EVENT_CANCEL"); + + // Cancellation is mapped to destruction in Rust, so return a + // code/bool indicating we're done. The caller will then + // appropriately deallocate this `FutureState` which will + // transitively run all destructors. + return (CALLBACK_CODE_EXIT, true); + } _ => unreachable!(), } if event0 != EVENT_NONE { @@ -145,7 +157,59 @@ impl FutureState { /// Poll this task until it either completes or can't make immediate /// progress. - fn poll(&mut self) -> u32 { + /// + /// Returns the code representing what happened along with a boolean as to + /// whether this execution is done. + fn poll(&mut self) -> (u32, bool) { + self.with_p3_task_set(|me| { + let mut context = Context::from_waker(&me.waker_clone); + + loop { + // Reset the waker before polling to clear out any pending + // notification, if any. + me.waker.0.store(false, Ordering::Relaxed); + + // Poll our future, handling `SPAWNED` around this. + let poll; + unsafe { + poll = me.tasks.poll_next_unpin(&mut context); + if !SPAWNED.is_empty() { + me.tasks.extend(SPAWNED.drain(..)); + } + } + + match poll { + // A future completed, yay! Keep going to see if more have + // completed. + Poll::Ready(Some(())) => (), + + // The `FuturesUnordered` list is empty meaning that there's no + // more work left to do, so we're done. + Poll::Ready(None) => { + assert!(!me.remaining_work()); + assert!(me.tasks.is_empty()); + break (CALLBACK_CODE_EXIT, true); + } + + // Some future within `FuturesUnordered` is not ready yet. If + // our `waker` was signaled then that means this is a yield + // operation, otherwise it means we're blocking on something. + Poll::Pending => { + assert!(!me.tasks.is_empty()); + if me.waker.0.load(Ordering::Relaxed) { + break (CALLBACK_CODE_YIELD, false); + } + + assert!(me.remaining_work()); + let waitable = me.waitable_set.as_ref().unwrap().as_raw(); + break (CALLBACK_CODE_WAIT | (waitable << 4), false); + } + } + } + }) + } + + fn with_p3_task_set(&mut self, f: impl FnOnce(&mut Self) -> R) -> R { // Finish our `wasip3_task` by initializing its self-referential pointer, // and then register it for the duration of this function with // `wasip3_task_set`. The previous value of `wasip3_task_set` will get @@ -163,49 +227,21 @@ impl FutureState { let prev = unsafe { cabi::wasip3_task_set(&mut self.wasip3_task) }; let _reset = ResetTask(prev); - let mut context = Context::from_waker(&self.waker_clone); - - loop { - // Reset the waker before polling to clear out any pending - // notification, if any. - self.waker.0.store(false, Ordering::Relaxed); - - // Poll our future, handling `SPAWNED` around this. - let poll; - unsafe { - poll = self.tasks.poll_next_unpin(&mut context); - if !SPAWNED.is_empty() { - self.tasks.extend(SPAWNED.drain(..)); - } - } - - match poll { - // A future completed, yay! Keep going to see if more have - // completed. - Poll::Ready(Some(())) => (), - - // The `FuturesUnordered` list is empty meaning that there's no - // more work left to do, so we're done. - Poll::Ready(None) => { - assert!(!self.remaining_work()); - assert!(self.tasks.is_empty()); - break CALLBACK_CODE_EXIT; - } - - // Some future within `FuturesUnordered` is not ready yet. If - // our `waker` was signaled then that means this is a yield - // operation, otherwise it means we're blocking on something. - Poll::Pending => { - assert!(!self.tasks.is_empty()); - if self.waker.0.load(Ordering::Relaxed) { - break CALLBACK_CODE_YIELD; - } + f(self) + } +} - assert!(self.remaining_work()); - let waitable = self.waitable_set.as_ref().unwrap().as_raw(); - break CALLBACK_CODE_WAIT | (waitable << 4); - } - } +impl Drop for FutureState { + fn drop(&mut self) { + // If this state has active tasks then they need to be dropped which may + // execute arbitrary code. This arbitrary code might require the p3 APIs + // for managing waitables, notably around removing them. In this + // situation we ensure that the p3 task is set while futures are being + // destroyed. + if !self.tasks.is_empty() { + self.with_p3_task_set(|me| { + me.tasks = Default::default(); + }) } } } @@ -258,6 +294,7 @@ const EVENT_STREAM_READ: u32 = 2; const EVENT_STREAM_WRITE: u32 = 3; const EVENT_FUTURE_READ: u32 = 4; const EVENT_FUTURE_WRITE: u32 = 5; +const EVENT_CANCEL: u32 = 6; const CALLBACK_CODE_EXIT: u32 = 0; const CALLBACK_CODE_YIELD: u32 = 1; @@ -267,6 +304,8 @@ const _CALLBACK_CODE_POLL: u32 = 3; const STATUS_STARTING: u32 = 0; const STATUS_STARTED: u32 = 1; const STATUS_RETURNED: u32 = 2; +const STATUS_STARTED_CANCELLED: u32 = 3; +const STATUS_RETURNED_CANCELLED: u32 = 4; const BLOCKED: u32 = 0xffff_ffff; const COMPLETED: u32 = 0x0; @@ -351,8 +390,8 @@ pub unsafe fn callback(event0: u32, event1: u32, event2: u32) -> u32 { // our future so deallocate it. Otherwise put our future back in // context-local storage and forward the code. unsafe { - let rc = (*state).callback(event0, event1, event2); - if rc == CALLBACK_CODE_EXIT { + let (rc, done) = (*state).callback(event0, event1, event2); + if done { drop(Box::from_raw(state)); } else { context_set(state.cast()); @@ -382,8 +421,8 @@ pub fn block_on(future: impl Future + 'static) -> T { let mut event = (EVENT_NONE, 0, 0); loop { match state.callback(event.0, event.1, event.2) { - CALLBACK_CODE_EXIT => break rx.try_recv().unwrap().unwrap(), - CALLBACK_CODE_YIELD => event = state.waitable_set.as_ref().unwrap().poll(), + (_, true) => break rx.try_recv().unwrap().unwrap(), + (CALLBACK_CODE_YIELD, false) => event = state.waitable_set.as_ref().unwrap().poll(), _ => event = state.waitable_set.as_ref().unwrap().wait(), } } @@ -461,3 +500,38 @@ unsafe fn context_set(value: *mut u8) { unsafe { set(value) } } + +#[doc(hidden)] +pub struct TaskCancelOnDrop { + _priv: (), +} + +impl TaskCancelOnDrop { + #[doc(hidden)] + pub fn new() -> TaskCancelOnDrop { + TaskCancelOnDrop { _priv: () } + } + + #[doc(hidden)] + pub fn forget(self) { + mem::forget(self); + } +} + +impl Drop for TaskCancelOnDrop { + fn drop(&mut self) { + #[cfg(not(target_arch = "wasm32"))] + unsafe fn cancel() { + unreachable!() + } + + #[cfg(target_arch = "wasm32")] + #[link(wasm_import_module = "[export]$root")] + extern "C" { + #[link_name = "[task-cancel]"] + fn cancel(); + } + + unsafe { cancel() } + } +} diff --git a/crates/guest-rust/rt/src/async_support/subtask.rs b/crates/guest-rust/rt/src/async_support/subtask.rs index aaf51425f..85905a87b 100644 --- a/crates/guest-rust/rt/src/async_support/subtask.rs +++ b/crates/guest-rust/rt/src/async_support/subtask.rs @@ -8,7 +8,10 @@ //! safe. use crate::async_support::waitable::{WaitableOp, WaitableOperation}; -use crate::async_support::{STATUS_RETURNED, STATUS_STARTED, STATUS_STARTING}; +use crate::async_support::{ + STATUS_RETURNED, STATUS_RETURNED_CANCELLED, STATUS_STARTED, STATUS_STARTED_CANCELLED, + STATUS_STARTING, +}; use crate::Cleanup; use std::alloc::Layout; use std::future::Future; @@ -50,6 +53,10 @@ pub unsafe trait Subtask { /// `dst`. unsafe fn params_dealloc_lists(dst: *mut u8); + /// Bindings-generated version of deallocating not only owned lists within + /// `src` but also deallocating any owned resources. + unsafe fn params_dealloc_lists_and_own(src: *mut u8); + /// Bindings-generated version of lifting the results stored at `src`. unsafe fn results_lift(src: *mut u8) -> Self::Results; @@ -59,7 +66,15 @@ pub unsafe trait Subtask { where Self: Sized, { - WaitableOperation::>::new(Start { params }) + async { + match WaitableOperation::>::new(Start { params }).await { + Ok(results) => results, + Err(_) => unreachable!( + "cancellation is not exposed API-wise, \ + should not be possible" + ), + } + } } } @@ -72,8 +87,8 @@ struct Start { unsafe impl WaitableOp for SubtaskOps { type Start = Start; type InProgress = InProgress; - type Result = T::Results; - type Cancel = (); + type Result = Result; + type Cancel = Result; fn start(state: Self::Start) -> (u32, Self::InProgress) { unsafe { @@ -97,7 +112,9 @@ unsafe impl WaitableOp for SubtaskOps { } } - fn start_cancelled(_state: Self::Start) -> Self::Cancel {} + fn start_cancelled(_state: Self::Start) -> Self::Cancel { + Err(()) + } fn in_progress_update( mut state: Self::InProgress, @@ -130,8 +147,47 @@ unsafe impl WaitableOp for SubtaskOps { // Note that by dropping `state` here we'll both deallocate the // params/results storage area as well as the subtask handle // itself. - unsafe { Ok(T::results_lift(state.ptr_results())) } + unsafe { Ok(Ok(T::results_lift(state.ptr_results()))) } + } + + // This subtask was dropped which forced cancellation. Said + // cancellation stopped the subtask before it reached the "started" + // state, meaning that we still own all of the parameters in their + // lowered form. + // + // In this situation we lift the parameters, even after we + // previously lowered them, back into `T::Params`. That notably + // re-acquires ownership and is suitable for disposing of all of + // the parameters via normal Rust-based destructors. + STATUS_STARTED_CANCELLED => { + assert!(!state.started); + unsafe { + T::params_dealloc_lists_and_own(state.ptr_params()); + } + Ok(Err(())) + } + + // This subtask was dropped which forced cancellation. Said + // cancellation stopped the subtask before it reached the "returned" + // state, meaning that it started, received the arguments, but then + // did not complete. + // + // In this situation we may have already received `STATUS_STARTED`, + // but we also might not have. This means we conditionally need + // to flag this task as started which will deallocate all lists + // owned by the parameters. + // + // After that though we do not have ownership of the parameters any + // more (e.g. own resources are all gone) so there's nothing to + // return. Here we yield a result and dispose of the in-progress + // state. + STATUS_RETURNED_CANCELLED => { + if !state.started { + state.flag_started(); + } + Ok(Err(())) } + other => panic!("unknown code {other:#x}"), } } @@ -143,14 +199,12 @@ unsafe impl WaitableOp for SubtaskOps { state.subtask.as_ref().unwrap().handle.get() } - fn in_progress_cancel(_: &Self::InProgress) -> u32 { - // FIXME: plan is to implement cancellation in the canonical ABI in the - // near future, this will get filled out soon in theory. - trap_because_of_future_cancel() + fn in_progress_cancel(state: &Self::InProgress) -> u32 { + unsafe { cancel(Self::in_progress_waitable(state)) } } fn result_into_cancel(result: Self::Result) -> Self::Cancel { - drop(result); + result } } @@ -162,19 +216,7 @@ struct SubtaskHandle { impl Drop for SubtaskHandle { fn drop(&mut self) { unsafe { - subtask_drop(self.handle.get()); - } - - #[cfg(not(target_arch = "wasm32"))] - unsafe fn subtask_drop(_: u32) { - unreachable!() - } - - #[cfg(target_arch = "wasm32")] - #[link(wasm_import_module = "$root")] - extern "C" { - #[link_name = "[subtask-drop]"] - fn subtask_drop(handle: u32); + drop(self.handle.get()); } } } @@ -213,12 +255,21 @@ impl InProgress { } } -#[cold] -fn trap_because_of_future_cancel() -> ! { - panic!( - "an imported function is being dropped/cancelled before being fully \ - awaited, but that is not sound at this time so the program is going \ - to be aborted; for more information see \ - https://github.com/bytecodealliance/wit-bindgen/issues/1175" - ) +#[cfg(not(target_arch = "wasm32"))] +unsafe fn drop(_: u32) { + unreachable!() +} + +#[cfg(not(target_arch = "wasm32"))] +unsafe fn cancel(_: u32) -> u32 { + unreachable!() +} + +#[cfg(target_arch = "wasm32")] +#[link(wasm_import_module = "$root")] +extern "C" { + #[link_name = "[subtask-cancel]"] + fn cancel(handle: u32) -> u32; + #[link_name = "[subtask-drop]"] + fn drop(handle: u32); } diff --git a/crates/guest-rust/rt/src/async_support/waitable.rs b/crates/guest-rust/rt/src/async_support/waitable.rs index 62d32d684..65fcdde9b 100644 --- a/crates/guest-rust/rt/src/async_support/waitable.rs +++ b/crates/guest-rust/rt/src/async_support/waitable.rs @@ -89,9 +89,10 @@ pub unsafe trait WaitableOp { /// /// This method will transition from the `InProgress` state, with some /// status code that was received, to either a completed result or a new - /// `InProgress` state. This is invoked after an operation has started - /// whenever a new status code has been received by an async export's - /// `callback`, for example. + /// `InProgress` state. This is invoked when: + /// + /// * a new status code has been received by an async export's `callback` + /// * cancellation returned a code to be processed here fn in_progress_update( state: Self::InProgress, code: u32, @@ -108,6 +109,12 @@ pub unsafe trait WaitableOp { /// Initiates a request for cancellation of this operation. Returns the /// status code returned by the `{future,stream}.cancel-{read,write}` /// intrinsic. + /// + /// Note that this must synchronously complete the operation somehow. This + /// cannot return a status code indicating that an operation is pending, + /// instead the operation must be complete with the returned code. That may + /// mean that this intrinsic can block while figuring things out in the + /// component model ABI, for example. fn in_progress_cancel(state: &Self::InProgress) -> u32; /// Converts a "completion result" into a "cancel result". This is necessary @@ -265,7 +272,7 @@ where // Note that it's the responsibility of the completion callback at // the ABI level that we install to fill in this pointer, e.g. it's // part of the `register_waker` contract. - InProgress(_) => completion_status.code, + InProgress(_) => completion_status.code_mut().take(), // This write has already completed, it's a Rust-level API violation // to call this function again. @@ -287,7 +294,7 @@ where ) -> Poll { use WaitableOperationState::*; - let (state, completion_status) = self.as_mut().pin_project(); + let (state, _completion_status) = self.as_mut().pin_project(); // If a status code is provided, then extract the in-progress state and // see what it thinks about this code. If we're done, yay! If not then @@ -304,10 +311,6 @@ where Ok(result) => return Poll::Ready(result), Err(in_progress) => *state = InProgress(in_progress), } - - // Remove the previous completion status, if any, as we're no longer - // interested in it if it was present. - *completion_status.code_mut() = None; } let in_progress = match state { @@ -322,15 +325,6 @@ where if let Some(cx) = cx { let handle = S::in_progress_waitable(in_progress); self.register_waker(handle, cx); - } else { - // This should not be dynamically reachable and, if it were, it may - // mean that this needs to be re-thought and/or the caller should be - // adjusted. Conservatively panic for now to defer fleshing this out - // for later. - panic!( - "unexpected poll to completion in a non-future way (no context) \ - and this operation is still pending" - ); } Poll::Pending } @@ -349,7 +343,7 @@ where pub fn cancel(mut self: Pin<&mut Self>) -> S::Cancel { use WaitableOperationState::*; - let (state, _) = self.as_mut().pin_project(); + let (state, mut completion_status) = self.as_mut().pin_project(); let in_progress = match state { // This operation was never actually started, so there's no need to // cancel anything, just pull out the value and return it. @@ -370,30 +364,67 @@ where Done => panic!("cannot cancel operation after completing it"), }; - // This operation is currently actively in progress after being queued - // up in the past. In this situation we need to call - // `{future,stream}.cancel-{read,write}`. First ensure that our - // exported task's state is no longer interested in the write handle - // here, so unregister that. Next if a completion hasn't already come - // in due to some race then perform the actual cancellation here. - let waitable = S::in_progress_waitable(in_progress); - self.as_mut().unregister_waker(waitable); - let (InProgress(in_progress), mut completion_status) = self.as_mut().pin_project() else { - unreachable!() - }; - if completion_status.code.is_none() { - *completion_status.as_mut().code_mut() = Some(S::in_progress_cancel(in_progress)); + // Our operation is in-progress, let's take a look at the pending + // completion code, if any. + match completion_status.as_mut().code_mut().take() { + // A completion code, or status update, is available. This can + // happen for example if an export received a status update for + // this operation but then during the subsequent poll we decided + // that the future should be dropped instead, aka a race between + // two events. In this situation though to fully process the + // cancellation we need to see what's up, so check to see if the + // operation is done with this code. + // + // Note that in this branch it's known that this operation's waker + // is not registered with the exported task because the exported + // task already delivered us the completion code, which + // automatically deregisters it at this time. + Some(code) => { + match self.as_mut().poll_complete_with_code(None, Some(code)) { + // The operation completed without us needing to cancel it, + // so just convert that to the `Cancel` type. In this + // situation no cancellation is necessary, the async + // operation is now inert, and we can immediately return. + Poll::Ready(result) => return S::result_into_cancel(result), + + // The operation, despite receiving an update via a code, + // has not yet completed. In this case we do indeed need to + // perform cancellation, so fall through to below. + Poll::Pending => {} + } + } + + // A completion code is not yet available. In this situation we + // deregister our waker from the exported task's waitable set and + // callback handling since we'll be no longer waiting for events. + // Cancellation below happens synchronously. + // + // After we've unregistered fall through to below. + None => { + let waitable = S::in_progress_waitable(in_progress); + self.as_mut().unregister_waker(waitable); + } } - // Now that we're guaranteed to have a completion status, pass that - // through to "interpret the result". - let code = completion_status.code.unwrap(); + // This operation is guaranteed actively in progress at this point. + // That means we really do in fact need to cancel it. Here the + // appropriate cancellation intrinsic for the component model is + // invoked which returns the final completion status for this + // operation. + // + // The completion code is forwarded to `poll_complete_with_code` which + // determines what happened as a result. Note that at this time + // cancellation is required to be a synchronous operation in Rust, even + // if it's async in the component model, since that's the only way for + // this to be sound. Rust doesn't currently have linear types or async + // destructors for example to ensure otherwise that if this were to + // proceed asynchronously that we could rely on it being invoked. + let (InProgress(in_progress), _) = self.as_mut().pin_project() else { + unreachable!() + }; + let code = S::in_progress_cancel(in_progress); match self.poll_complete_with_code(None, Some(code)) { - // Leave it up to `S` to interpret the completion result as a - // cancellation result. Poll::Ready(result) => S::result_into_cancel(result), - - // Should not be reachable as we always pass `Some(code)`. Poll::Pending => unreachable!(), } } diff --git a/crates/guest-rust/rt/src/async_support/waitable_set.rs b/crates/guest-rust/rt/src/async_support/waitable_set.rs index 16fc023ff..8f4237591 100644 --- a/crates/guest-rust/rt/src/async_support/waitable_set.rs +++ b/crates/guest-rust/rt/src/async_support/waitable_set.rs @@ -1,6 +1,5 @@ //! Low-level FFI-like bindings around `waitable-set` in the canonical ABI. -use super::EVENT_NONE; use std::num::NonZeroU32; pub struct WaitableSet(NonZeroU32); diff --git a/crates/guest-rust/src/lib.rs b/crates/guest-rust/src/lib.rs index 5b9a31d68..95b91b1fd 100644 --- a/crates/guest-rust/src/lib.rs +++ b/crates/guest-rust/src/lib.rs @@ -895,7 +895,7 @@ pub mod rt { #[cfg(feature = "async")] pub use wit_bindgen_rt::async_support::{ - block_on, spawn, AbiBuffer, FutureRead, FutureReader, FutureWrite, FutureWriteCancel, - FutureWriteError, FutureWriter, StreamRead, StreamReader, StreamResult, StreamWrite, - StreamWriter, + backpressure_set, block_on, spawn, AbiBuffer, FutureRead, FutureReader, FutureWrite, + FutureWriteCancel, FutureWriteError, FutureWriter, StreamRead, StreamReader, StreamResult, + StreamWrite, StreamWriter, }; diff --git a/crates/moonbit/src/lib.rs b/crates/moonbit/src/lib.rs index bed338e08..d0d11d4ed 100644 --- a/crates/moonbit/src/lib.rs +++ b/crates/moonbit/src/lib.rs @@ -2666,7 +2666,8 @@ impl Bindgen for FunctionBindgen<'_, '_> { | Instruction::StreamLower { .. } | Instruction::StreamLift { .. } | Instruction::ErrorContextLower { .. } - | Instruction::ErrorContextLift { .. } => todo!(), + | Instruction::ErrorContextLift { .. } + | Instruction::DropHandle { .. } => todo!(), } } diff --git a/crates/rust/src/bindgen.rs b/crates/rust/src/bindgen.rs index 976b7caf8..419852d27 100644 --- a/crates/rust/src/bindgen.rs +++ b/crates/rust/src/bindgen.rs @@ -934,6 +934,7 @@ impl Bindgen for FunctionBindgen<'_, '_> { Instruction::AsyncTaskReturn { name, params } => { let func = self.declare_import(name, params, &[]); + uwriteln!(self.src, "_task_cancel.forget();"); uwriteln!(self.src, "{func}({});", operands.join(", ")); } @@ -1202,6 +1203,10 @@ impl Bindgen for FunctionBindgen<'_, '_> { align = align.format(POINTER_SIZE_EXPRESSION) )); } + + Instruction::DropHandle { .. } => { + uwriteln!(self.src, "let _ = {};", operands[0]); + } } } } diff --git a/crates/rust/src/interface.rs b/crates/rust/src/interface.rs index 9271bc9fa..60346d233 100644 --- a/crates/rust/src/interface.rs +++ b/crates/rust/src/interface.rs @@ -743,6 +743,12 @@ pub mod vtable{ordinal} {{ format!("unsafe {{ {} }}", String::from(f.src)) } + fn deallocate_lists_and_own(&mut self, address: &str, types: &[Type], module: &str) -> String { + let mut f = FunctionBindgen::new(self, Vec::new(), module, true); + abi::deallocate_lists_and_own_in_types(f.r#gen.resolve, types, address.into(), &mut f); + format!("unsafe {{ {} }}", String::from(f.src)) + } + fn lift_from_memory(&mut self, address: &str, ty: &Type, module: &str) -> String { let mut f = FunctionBindgen::new(self, Vec::new(), module, true); let result = abi::lift_from_memory(f.r#gen.resolve, &mut f, address.into(), ty); @@ -881,6 +887,15 @@ unsafe fn call_import(params: *mut u8, results: *mut u8) -> u32 {{ uwriteln!(self.src, "{dealloc_lists}"); uwriteln!(self.src, "}}"); + // Generate `fn params_dealloc_lists_and_own` + let dealloc_lists_and_own = self.deallocate_lists_and_own("_ptr", ¶m_tys, module); + uwriteln!( + self.src, + "unsafe fn params_dealloc_lists_and_own(_ptr: *mut u8) {{" + ); + uwriteln!(self.src, "{dealloc_lists_and_own}"); + uwriteln!(self.src, "}}"); + // Generate `fn params_lower` let mut lowers = Vec::new(); let offsets = self @@ -986,6 +1001,10 @@ unsafe fn call_import(params: *mut u8, results: *mut u8) -> u32 {{ if async_ { let async_support = self.r#gen.async_support_path(); uwriteln!(self.src, "{async_support}::start_task(async move {{"); + uwriteln!( + self.src, + "let _task_cancel = {async_support}::TaskCancelOnDrop::new();" + ); if needs_cleanup_list { let vec = self.path_to_vec(); uwriteln!(self.src, "let mut cleanup_list = {vec}::new();"); diff --git a/tests/runtime-async/async/cancel-import/runner.rs b/tests/runtime-async/async/cancel-import/runner.rs new file mode 100644 index 000000000..0a7d8bf25 --- /dev/null +++ b/tests/runtime-async/async/cancel-import/runner.rs @@ -0,0 +1,154 @@ +include!(env!("BINDINGS")); + +use crate::my::test::i::*; +use futures::task::noop_waker_ref; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +fn main() { + // Cancel an import in-progress + wit_bindgen::block_on(async { + let (tx, rx) = wit_future::new(); + let mut import = Box::pin(pending_import(rx)); + assert!(import + .as_mut() + .poll(&mut Context::from_waker(noop_waker_ref())) + .is_pending()); + drop(import); + tx.write(()).await.unwrap_err(); + }); + + // Cancel an import before it starts + wit_bindgen::block_on(async { + let (tx, rx) = wit_future::new(); + let import = Box::pin(pending_import(rx)); + drop(import); + tx.write(()).await.unwrap_err(); + }); + + // Cancel an import in the "started" state + wit_bindgen::block_on(async { + let (tx1, rx1) = wit_future::new(); + let (tx2, rx2) = wit_future::new(); + + // create a task in the "started" state, but don't complete it yet + let mut started_import = Box::pin(pending_import(rx1)); + assert!(started_import + .as_mut() + .poll(&mut Context::from_waker(noop_waker_ref())) + .is_pending()); + + // request the other component sets its backpressure flag meaning we + // won't be able to create new tasks in the "started" state. + backpressure_set(true); + let mut starting_import = Box::pin(pending_import(rx2)); + assert!(starting_import + .as_mut() + .poll(&mut Context::from_waker(noop_waker_ref())) + .is_pending()); + + // Now cancel the "starting" import. This should notably drop handles in + // arguments since they get re-acquired during cancellation + drop(starting_import); + + // cancel our in-progress export + drop(started_import); + + backpressure_set(false); + + // both channels should be closed + tx1.write(()).await.unwrap_err(); + tx2.write(()).await.unwrap_err(); + }); + + // Race an import's cancellation with a status code saying it's done. + wit_bindgen::block_on(async { + // Start a subtask and get it into the "started" state + let (tx, rx) = wit_future::new(); + let mut import = Box::pin(pending_import(rx)); + assert!(import + .as_mut() + .poll(&mut Context::from_waker(noop_waker_ref())) + .is_pending()); + + // Complete the subtask, but don't see the completion in Rust yet. + tx.write(()).await.unwrap(); + + // Let the subtask's completion notification make its way to our task + // here. + for _ in 0..5 { + yield_().await; + } + + // Now cancel the import, despite it actually being done. This should + // realize that the cancellation is racing completion. + drop(import); + }); + + // Race an import's cancellation with a pending status code indicating that + // it's transitioning from started => returned. + wit_bindgen::block_on(async { + // Start a subtask and get it into the "started" state + let (tx1, rx1) = wit_future::new(); + let mut started_import = Box::pin(pending_import(rx1)); + assert!(started_import + .as_mut() + .poll(&mut Context::from_waker(noop_waker_ref())) + .is_pending()); + + // force the next subtask to start out in the "starting" state, not the + // "started" state. + backpressure_set(true); + let (tx2, rx2) = wit_future::new(); + let mut starting_import = Box::pin(pending_import(rx2)); + assert!(starting_import + .as_mut() + .poll(&mut Context::from_waker(noop_waker_ref())) + .is_pending()); + + // Disable backpressure in the other component which will let the + // `starting_import`, previously in the "STARTING" state, get a queued up + // notification that it's entered the "STARTED" state. + backpressure_set(false); + for _ in 0..5 { + yield_().await; + } + + // Now cancel the `starting_import`. This should correctly pick up the + // STARTING => STARTED state transition and handle that correctly. + drop(starting_import); + + // Our future to the import we cancelled shouldn't be able to complete + // its write. + tx2.write(()).await.unwrap_err(); + + // Complete the other import normally just to assert that it's not + // cancelled and able to proceed as usual. + tx1.write(()).await.unwrap(); + started_import.await; + }); +} + +async fn yield_() { + #[derive(Default)] + struct Yield { + yielded: bool, + } + + impl Future for Yield { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<()> { + if self.yielded { + Poll::Ready(()) + } else { + self.yielded = true; + context.waker().wake_by_ref(); + Poll::Pending + } + } + } + + Yield::default().await; +} diff --git a/tests/runtime-async/async/cancel-import/test.rs b/tests/runtime-async/async/cancel-import/test.rs new file mode 100644 index 000000000..8dd4f3d62 --- /dev/null +++ b/tests/runtime-async/async/cancel-import/test.rs @@ -0,0 +1,17 @@ +use wit_bindgen::FutureReader; + +include!(env!("BINDINGS")); + +struct Component; + +export!(Component); + +impl crate::exports::my::test::i::Guest for Component { + async fn pending_import(x: FutureReader<()>) { + x.await.unwrap(); + } + + fn backpressure_set(x: bool) { + wit_bindgen::backpressure_set(x) + } +} diff --git a/tests/runtime-async/async/cancel-import/test.wit b/tests/runtime-async/async/cancel-import/test.wit new file mode 100644 index 000000000..c83ad0a28 --- /dev/null +++ b/tests/runtime-async/async/cancel-import/test.wit @@ -0,0 +1,14 @@ +package my:test; + +interface i { + pending-import: async func(x: future); + backpressure-set: func(x: bool); +} + +world test { + export i; +} + +world runner { + import i; +}