From b47ebfab178825fd7a2d981c0ebe0db76ad269bc Mon Sep 17 00:00:00 2001 From: Joel Dice Date: Fri, 1 Aug 2025 15:16:04 -0600 Subject: [PATCH 1/3] use a single table per instance for resources, waitables, etc. Per https://github.com/WebAssembly/component-model/pull/513, the spec now puts resources, waitables, waitable sets, subtasks, and error contexts in the same table per instance. This updates the implementation to match. - Combine the `ResourceTable` and `StateTable` data structures into a single `HandleTable` structure - Rename `ComponentInstance::instance_resource_tables` to `instance_handle_tables` - Remove `ConcurrentState::waitable_tables` and `error_context_tables` in favor of the above - Move various associated functions from `ConcurrentState` to `ComponentInstance` so they can access `instance_resource_tables` While I was doing table-related things, I also updated `concurrent::Table::new` to reserve the zero handle to mean "invalid". This won't affect what the guest sees in any way, but it allows us to use `TableId::new(0)` to invalidate host-owned handles in e.g. `{Stream,Future}{Reader,Writer}::close`. Fixes #11189 Signed-off-by: Joel Dice Re-internalize `ResourceKind` to `mod handle_table` Remove `ResourceKind` Start flattening the representation of `Slot` Internalize `get_mut_handle_by_index` Internalize implementation details such as the representation of slots to and make methods a bit more targeted in their functionality. Internalize more details of `HandleTable` Don't expose `HandleKind` and of per-function methods for operating on the various kinds of handles that reside in the table. Flatten the representation of `Slot` There's still some more work to do for host/guest resource handles, but this helps realize the goal of the previous refactorings in the meantime. --- .../src/runtime/component/concurrent.rs | 724 ++++++------- .../component/concurrent/error_contexts.rs | 8 - .../concurrent/futures_and_streams.rs | 959 ++++++++---------- .../runtime/component/concurrent/states.rs | 143 --- .../src/runtime/component/concurrent/table.rs | 9 +- .../src/runtime/component/func/options.rs | 6 +- crates/wasmtime/src/runtime/store.rs | 6 +- crates/wasmtime/src/runtime/vm/component.rs | 28 +- .../src/runtime/vm/component/handle_table.rs | 665 ++++++++++++ .../src/runtime/vm/component/libcalls.rs | 32 +- .../src/runtime/vm/component/resources.rs | 197 +--- 11 files changed, 1461 insertions(+), 1316 deletions(-) delete mode 100644 crates/wasmtime/src/runtime/component/concurrent/states.rs create mode 100644 crates/wasmtime/src/runtime/vm/component/handle_table.rs diff --git a/crates/wasmtime/src/runtime/component/concurrent.rs b/crates/wasmtime/src/runtime/component/concurrent.rs index 8e5af865b1e4..4637627012fd 100644 --- a/crates/wasmtime/src/runtime/component/concurrent.rs +++ b/crates/wasmtime/src/runtime/component/concurrent.rs @@ -51,16 +51,17 @@ use crate::component::func::{self, Func, Options}; use crate::component::{Component, ComponentInstanceId, HasData, HasSelf, Instance}; use crate::fiber::{self, StoreFiber, StoreFiberYield}; use crate::store::{StoreInner, StoreOpaque, StoreToken}; -use crate::vm::component::{CallContext, InstanceFlags, ResourceTables}; +use crate::vm::component::{ + CallContext, ComponentInstance, InstanceFlags, ResourceTables, TransmitLocalState, +}; use crate::vm::{SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore}; use crate::{AsContext, AsContextMut, StoreContext, StoreContextMut, ValRaw}; use anyhow::{Context as _, Result, anyhow, bail}; -use error_contexts::{GlobalErrorContextRefCount, LocalErrorContextRefCount}; +use error_contexts::GlobalErrorContextRefCount; use futures::channel::oneshot; use futures::future::{self, Either, FutureExt}; use futures::stream::{FuturesUnordered, StreamExt}; -use futures_and_streams::{FlatAbi, ReturnCode, StreamFutureState, TableIndex, TransmitHandle}; -use states::StateTable; +use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex}; use std::any::Any; use std::borrow::ToOwned; use std::boxed::Box; @@ -78,7 +79,6 @@ use std::sync::Mutex; use std::task::{Context, Poll, Waker}; use std::vec::Vec; use table::{Table, TableDebug, TableError, TableId}; -use wasmtime_environ::PrimaryMap; use wasmtime_environ::component::{ CanonicalOptions, CanonicalOptionsDataModel, ExportIndex, MAX_FLAT_PARAMS, MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT, @@ -100,7 +100,6 @@ pub(crate) use futures_and_streams::{ mod abort; mod error_contexts; mod futures_and_streams; -mod states; mod table; pub(crate) mod tls; @@ -549,21 +548,6 @@ where fn run(self, accessor: &Accessor) -> impl Future + Send; } -/// Represents the state of a waitable handle. -#[derive(Debug)] -enum WaitableState { - /// Represents a host task handle. - HostTask, - /// Represents a guest task handle. - GuestTask, - /// Represents a stream handle. - Stream(TypeStreamTableIndex, StreamFutureState), - /// Represents a future handle. - Future(TypeFutureTableIndex, StreamFutureState), - /// Represents a waitable-set handle. - Set, -} - /// Represents parameter and result metadata for the caller side of a /// guest->guest call orchestrated by a fused adapter. enum CallerInfo { @@ -724,103 +708,14 @@ impl fmt::Debug for WorkItem { } } -impl ConcurrentState { - fn instance_state(&mut self, instance: RuntimeComponentInstanceIndex) -> &mut InstanceState { - self.instance_states.entry(instance).or_default() - } - - fn push(&mut self, value: V) -> Result, TableError> { - self.table.push(value) - } - - fn get(&self, id: TableId) -> Result<&V, TableError> { - self.table.get(id) - } - - fn get_mut(&mut self, id: TableId) -> Result<&mut V, TableError> { - self.table.get_mut(id) - } - - pub fn add_child( - &mut self, - child: TableId, - parent: TableId, - ) -> Result<(), TableError> { - self.table.add_child(child, parent) - } - - pub fn remove_child( - &mut self, - child: TableId, - parent: TableId, - ) -> Result<(), TableError> { - self.table.remove_child(child, parent) - } - - fn delete(&mut self, id: TableId) -> Result { - self.table.delete(id) - } - - fn push_future(&mut self, future: HostTaskFuture) { - // Note that we can't directly push to `ConcurrentState::futures` here - // since this may be called from a future that's being polled inside - // `Self::poll_until`, which temporarily removes the `FuturesUnordered` - // so it has exclusive access while polling it. Therefore, we push a - // work item to the "high priority" queue, which will actually push to - // `ConcurrentState::futures` later. - self.push_high_priority(WorkItem::PushFuture(Mutex::new(future))); - } - - fn push_high_priority(&mut self, item: WorkItem) { - log::trace!("push high priority: {item:?}"); - self.high_priority.push(item); - } - - fn push_low_priority(&mut self, item: WorkItem) { - log::trace!("push low priority: {item:?}"); - self.low_priority.push(item); - } - - /// Determine whether the instance associated with the specified guest task - /// may be entered (i.e. is not already on the async call stack). - /// - /// This is an additional check on top of the "may_enter" instance flag; - /// it's needed because async-lifted exports with callback functions must - /// not call their own instances directly or indirectly, and due to the - /// "stackless" nature of callback-enabled guest tasks this may happen even - /// if there are no activation records on the stack (i.e. the "may_enter" - /// field is `true`) for that instance. - fn may_enter(&mut self, mut guest_task: TableId) -> bool { - let guest_instance = self.get(guest_task).unwrap().instance; - - // Walk the task tree back to the root, looking for potential - // reentrance. - // - // TODO: This could be optimized by maintaining a per-`GuestTask` bitset - // such that each bit represents and instance which has been entered by - // that task or an ancestor of that task, in which case this would be a - // constant time check. - loop { - match &self.get_mut(guest_task).unwrap().caller { - Caller::Host { .. } => break true, - Caller::Guest { task, instance } => { - if *instance == guest_instance { - break false; - } else { - guest_task = *task; - } - } - } - } - } - +impl ComponentInstance { /// Handle the `CallbackCode` returned from an async-lifted export or its /// callback. /// /// If `initial_call` is `true`, then the code was received from the /// async-lifted export; otherwise, it was received from its callback. fn handle_callback_code( - &mut self, + mut self: Pin<&mut Self>, guest_task: TableId, runtime_instance: RuntimeComponentInstanceIndex, code: u32, @@ -830,7 +725,8 @@ impl ConcurrentState { log::trace!("received callback code from {guest_task:?}: {code} (set: {set})"); - let task = self.get_mut(guest_task)?; + let state = self.as_mut().concurrent_state_mut(); + let task = state.get_mut(guest_task)?; if task.lift_result.is_some() { if code == callback_code::EXIT { @@ -840,7 +736,7 @@ impl ConcurrentState { // Notify any current or future waiters that this subtask has // started. Waitable::Guest(guest_task).set_event( - self, + state, Some(Event::Subtask { status: Status::Started, }), @@ -848,23 +744,19 @@ impl ConcurrentState { } } - let get_set = |instance: &mut Self, handle| { + let get_set = |instance: Pin<&mut Self>, handle| { if handle == 0 { bail!("invalid waitable-set handle"); } - let (set, WaitableState::Set) = - instance.waitable_tables[runtime_instance].get_mut_by_index(handle)? - else { - bail!("invalid waitable-set handle"); - }; + let set = instance.guest_tables().0[runtime_instance].waitable_set_rep(handle)?; Ok(TableId::::new(set)) }; match code { callback_code::EXIT => { - let task = self.get_mut(guest_task)?; + let task = state.get_mut(guest_task)?; match &task.caller { Caller::Host { remove_task_automatically, @@ -872,7 +764,7 @@ impl ConcurrentState { } => { if *remove_task_automatically { log::trace!("handle_callback_code will delete task {guest_task:?}"); - Waitable::Guest(guest_task).delete_from(self)?; + Waitable::Guest(guest_task).delete_from(state)?; } } Caller::Guest { .. } => { @@ -884,10 +776,10 @@ impl ConcurrentState { callback_code::YIELD => { // Push this task onto the "low priority" queue so it runs after // any other tasks have had a chance to run. - let task = self.get_mut(guest_task)?; + let task = state.get_mut(guest_task)?; assert!(task.event.is_none()); task.event = Some(Event::None); - self.push_low_priority(WorkItem::GuestCall(GuestCall { + state.push_low_priority(WorkItem::GuestCall(GuestCall { task: guest_task, kind: GuestCallKind::DeliverEvent { instance: runtime_instance, @@ -896,12 +788,14 @@ impl ConcurrentState { })); } callback_code::WAIT | callback_code::POLL => { - let set = get_set(self, set)?; + let set = get_set(self.as_mut(), set)?; + let state = self.concurrent_state_mut(); - if self.get_mut(guest_task)?.event.is_some() || !self.get_mut(set)?.ready.is_empty() + if state.get_mut(guest_task)?.event.is_some() + || !state.get_mut(set)?.ready.is_empty() { // An event is immediately available; deliver it ASAP. - self.push_high_priority(WorkItem::GuestCall(GuestCall { + state.push_high_priority(WorkItem::GuestCall(GuestCall { task: guest_task, kind: GuestCallKind::DeliverEvent { instance: runtime_instance, @@ -914,7 +808,7 @@ impl ConcurrentState { callback_code::POLL => { // We're polling, so just yield and check whether an // event has arrived after that. - self.push_low_priority(WorkItem::Poll(PollParams { + state.push_low_priority(WorkItem::Poll(PollParams { task: guest_task, instance: runtime_instance, set, @@ -927,9 +821,9 @@ impl ConcurrentState { // Here we also set `GuestTask::wake_on_cancel` // which allows `subtask.cancel` to interrupt the // wait. - let old = self.get_mut(guest_task)?.wake_on_cancel.replace(set); + let old = state.get_mut(guest_task)?.wake_on_cancel.replace(set); assert!(old.is_none()); - let old = self + let old = state .get_mut(set)? .waiting .insert(guest_task, WaitMode::Callback(runtime_instance)); @@ -945,78 +839,38 @@ impl ConcurrentState { Ok(()) } - /// Record that we're about to enter a (sub-)component instance which does - /// not support more than one concurrent, stackful activation, meaning it - /// cannot be entered again until the next call returns. - fn enter_instance(&mut self, instance: RuntimeComponentInstanceIndex) { - self.instance_state(instance).do_not_enter = true; - } - - /// Record that we've exited a (sub-)component instance previously entered - /// with `Self::enter_instance` and then calls `Self::partition_pending`. - /// See the documentation for the latter for details. - fn exit_instance(&mut self, instance: RuntimeComponentInstanceIndex) -> Result<()> { - self.instance_state(instance).do_not_enter = false; - self.partition_pending(instance) - } - - /// Iterate over `InstanceState::pending`, moving any ready items into the - /// "high priority" work item queue. - /// - /// See `GuestCall::is_ready` for details. - fn partition_pending(&mut self, instance: RuntimeComponentInstanceIndex) -> Result<()> { - for (task, kind) in mem::take(&mut self.instance_state(instance).pending).into_iter() { - let call = GuestCall { task, kind }; - if call.is_ready(self)? { - self.push_high_priority(WorkItem::GuestCall(call)); - } else { - self.instance_state(instance) - .pending - .insert(call.task, call.kind); - } - } - - Ok(()) - } - /// Get the next pending event for the specified task and (optional) /// waitable set, along with the waitable handle if applicable. fn get_event( - &mut self, + mut self: Pin<&mut Self>, guest_task: TableId, instance: RuntimeComponentInstanceIndex, set: Option>, ) -> Result)>> { + let state = self.as_mut().concurrent_state_mut(); + Ok( - if let Some(event) = self.get_mut(guest_task)?.event.take() { + if let Some(event) = state.get_mut(guest_task)?.event.take() { log::trace!("deliver event {event:?} to {guest_task:?}"); Some((event, None)) } else if let Some((set, waitable)) = set .and_then(|set| { - self.get_mut(set) + state + .get_mut(set) .map(|v| v.ready.pop_first().map(|v| (set, v))) .transpose() }) .transpose()? { - let event = waitable.common(self)?.event.take().unwrap(); + let event = waitable.common(state)?.event.take().unwrap(); log::trace!( "deliver event {event:?} to {guest_task:?} for {waitable:?}; set {set:?}" ); - let entry = self.waitable_tables[instance].get_mut_by_rep(waitable.rep()); - let Some(( - handle, - WaitableState::HostTask - | WaitableState::GuestTask - | WaitableState::Stream(..) - | WaitableState::Future(..), - )) = entry - else { - bail!("handle not found for waitable rep {waitable:?} instance {instance:?}"); - }; + let handle = + self.as_mut().guest_tables().0[instance].waitable_by_rep(waitable.rep())?; waitable.on_delivery(self, event); @@ -1027,52 +881,33 @@ impl ConcurrentState { ) } - /// Implements the `backpressure.set` intrinsic. - pub(crate) fn backpressure_set( - &mut self, - caller_instance: RuntimeComponentInstanceIndex, - enabled: u32, - ) -> Result<()> { - let state = self.instance_state(caller_instance); - let old = state.backpressure; - let new = enabled != 0; - state.backpressure = new; - - if old && !new { - // Backpressure was previously enabled and is now disabled; move any - // newly-eligible guest calls to the "high priority" queue. - self.partition_pending(caller_instance)?; - } - - Ok(()) - } - /// Implements the `waitable-set.new` intrinsic. pub(crate) fn waitable_set_new( - &mut self, + mut self: Pin<&mut Self>, caller_instance: RuntimeComponentInstanceIndex, ) -> Result { - let set = self.push(WaitableSet::default())?; - let handle = self.waitable_tables[caller_instance].insert(set.rep(), WaitableState::Set)?; + let set = self + .as_mut() + .concurrent_state_mut() + .push(WaitableSet::default())?; + let handle = self.guest_tables().0[caller_instance].waitable_set_insert(set.rep())?; log::trace!("new waitable set {set:?} (handle {handle})"); Ok(handle) } /// Implements the `waitable-set.drop` intrinsic. pub(crate) fn waitable_set_drop( - &mut self, + mut self: Pin<&mut Self>, caller_instance: RuntimeComponentInstanceIndex, set: u32, ) -> Result<()> { - let (rep, WaitableState::Set) = - self.waitable_tables[caller_instance].remove_by_index(set)? - else { - bail!("invalid waitable-set handle"); - }; + let rep = self.as_mut().guest_tables().0[caller_instance].waitable_set_remove(set)?; log::trace!("drop waitable set {rep} (handle {set})"); - let set = self.delete(TableId::::new(rep))?; + let set = self + .concurrent_state_mut() + .delete(TableId::::new(rep))?; if !set.waiting.is_empty() { bail!("cannot drop waitable set with waiters"); @@ -1083,21 +918,18 @@ impl ConcurrentState { /// Implements the `waitable.join` intrinsic. pub(crate) fn waitable_join( - &mut self, + mut self: Pin<&mut Self>, caller_instance: RuntimeComponentInstanceIndex, waitable_handle: u32, set_handle: u32, ) -> Result<()> { - let waitable = Waitable::from_instance(self, caller_instance, waitable_handle)?; + let waitable = Waitable::from_instance(self.as_mut(), caller_instance, waitable_handle)?; let set = if set_handle == 0 { None } else { - let (set, WaitableState::Set) = - self.waitable_tables[caller_instance].get_mut_by_index(set_handle)? - else { - bail!("invalid waitable-set handle"); - }; + let set = + self.as_mut().guest_tables().0[caller_instance].waitable_set_rep(set_handle)?; Some(TableId::::new(set)) }; @@ -1106,49 +938,47 @@ impl ConcurrentState { "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})", ); - waitable.join(self, set) + waitable.join(self.concurrent_state_mut(), set) } /// Implements the `subtask.drop` intrinsic. pub(crate) fn subtask_drop( - &mut self, + mut self: Pin<&mut Self>, caller_instance: RuntimeComponentInstanceIndex, task_id: u32, ) -> Result<()> { - self.waitable_join(caller_instance, task_id, 0)?; + self.as_mut().waitable_join(caller_instance, task_id, 0)?; - let (rep, state) = self.waitable_tables[caller_instance].remove_by_index(task_id)?; + let (rep, is_host) = + self.as_mut().guest_tables().0[caller_instance].subtask_remove(task_id)?; - let (waitable, expected_caller_instance, delete) = match state { - WaitableState::HostTask => { - let id = TableId::::new(rep); - let task = self.get(id)?; - if task.join_handle.is_some() { - bail!("cannot drop a subtask which has not yet resolved"); - } - (Waitable::Host(id), task.caller_instance, true) + let concurrent_state = self.concurrent_state_mut(); + let (waitable, expected_caller_instance, delete) = if is_host { + let id = TableId::::new(rep); + let task = concurrent_state.get(id)?; + if task.join_handle.is_some() { + bail!("cannot drop a subtask which has not yet resolved"); } - WaitableState::GuestTask => { - let id = TableId::::new(rep); - let task = self.get(id)?; - if task.lift_result.is_some() { - bail!("cannot drop a subtask which has not yet resolved"); - } - if let Caller::Guest { instance, .. } = &task.caller { - (Waitable::Guest(id), *instance, task.exited) - } else { - unreachable!() - } + (Waitable::Host(id), task.caller_instance, true) + } else { + let id = TableId::::new(rep); + let task = concurrent_state.get(id)?; + if task.lift_result.is_some() { + bail!("cannot drop a subtask which has not yet resolved"); + } + if let Caller::Guest { instance, .. } = &task.caller { + (Waitable::Guest(id), *instance, task.exited) + } else { + unreachable!() } - _ => bail!("invalid task handle: {task_id}"), }; - if waitable.take_event(self)?.is_some() { + if waitable.take_event(concurrent_state)?.is_some() { bail!("cannot drop a subtask with an undelivered event"); } if delete { - waitable.delete_from(self)?; + waitable.delete_from(concurrent_state)?; } // Since waitables can neither be passed between instances nor forged, @@ -1158,26 +988,6 @@ impl ConcurrentState { log::trace!("subtask_drop {waitable:?} (handle {task_id})"); Ok(()) } - - /// Implements the `context.get` intrinsic. - pub(crate) fn context_get(&mut self, slot: u32) -> Result { - let task = self.guest_task.unwrap(); - let val = self.get(task)?.context[usize::try_from(slot).unwrap()]; - log::trace!("context_get {task:?} slot {slot} val {val:#x}"); - Ok(val) - } - - /// Implements the `context.set` intrinsic. - pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> { - let task = self.guest_task.unwrap(); - log::trace!("context_set {task:?} slot {slot} val {val:#x}"); - self.get_mut(task)?.context[usize::try_from(slot).unwrap()] = val; - Ok(()) - } - - fn options(&self, options: OptionsIndex) -> &CanonicalOptions { - &self.component.env_component().options[options] - } } impl Instance { @@ -1208,10 +1018,16 @@ impl Instance { /// been dropped by their supertasks. #[doc(hidden)] pub fn assert_concurrent_state_empty(&self, mut store: impl AsContextMut) { - let state = self - .id() - .get_mut(store.as_context_mut().0) - .concurrent_state_mut(); + let mut instance = self.id().get_mut(store.as_context_mut().0); + assert!( + instance + .as_mut() + .guest_tables() + .0 + .iter() + .all(|(_, table)| table.is_empty()) + ); + let state = instance.concurrent_state_mut(); assert!(state.table.is_empty(), "non-empty table: {:?}", state.table); assert!(state.high_priority.is_empty()); assert!(state.low_priority.is_empty()); @@ -1225,24 +1041,12 @@ impl Instance { .unwrap() .is_empty() ); - assert!( - state - .waitable_tables - .iter() - .all(|(_, table)| table.is_empty()) - ); assert!( state .instance_states .iter() .all(|(_, state)| state.pending.is_empty()) ); - assert!( - state - .error_context_tables - .iter() - .all(|(_, table)| table.is_empty()) - ); assert!(state.global_error_context_ref_counts.is_empty()); } @@ -1687,8 +1491,12 @@ impl Instance { instance: runtime_instance, set, } => { + let (event, waitable) = self + .id() + .get_mut(store) + .get_event(call.task, runtime_instance, set)? + .unwrap(); let state = self.concurrent_state_mut(store); - let (event, waitable) = state.get_event(call.task, runtime_instance, set)?.unwrap(); let task = state.get_mut(call.task)?; let runtime_instance = task.instance; let handle = waitable.map(|(_, v)| v).unwrap_or(0); @@ -1721,10 +1529,14 @@ impl Instance { self.maybe_pop_call_context(store.store_opaque_mut(), call.task)?; - let state = self.concurrent_state_mut(store); - state.handle_callback_code(call.task, runtime_instance, code, false)?; + self.id().get_mut(store).handle_callback_code( + call.task, + runtime_instance, + code, + false, + )?; - state.guest_task = old_task; + self.concurrent_state_mut(store).guest_task = old_task; log::trace!("GuestCallKind::DeliverEvent: restored {old_task:?} as current task"); } GuestCallKind::Start(fun) => { @@ -1949,7 +1761,12 @@ impl Instance { // function returns a `i32` result. let code = unsafe { storage[0].assume_init() }.get_i32() as u32; - state.handle_callback_code(guest_task, callee_instance, code, true)?; + instance.id().get_mut(store).handle_callback_code( + guest_task, + callee_instance, + code, + true, + )?; Ok(()) }) @@ -2457,8 +2274,8 @@ impl Instance { break ( status, Some( - state.waitable_tables[caller_instance] - .insert(guest_task.rep(), WaitableState::GuestTask)?, + self.id().get_mut(store.0).guest_tables().0[caller_instance] + .subtask_insert_guest(guest_task.rep())?, ), ); } else { @@ -2639,10 +2456,9 @@ impl Instance { // It hasn't finished yet; add the future to // `ConcurrentState::futures` so it will be polled by the event // loop and allocate a waitable handle to return to the guest. - let state = self.concurrent_state_mut(store.0); - state.push_future(future); - let handle = state.waitable_tables[caller_instance] - .insert(task.rep(), WaitableState::HostTask)?; + self.concurrent_state_mut(store.0).push_future(future); + let handle = self.id().get_mut(store.0).guest_tables().0[caller_instance] + .subtask_insert_host(task.rep())?; log::trace!( "assign {task:?} handle {handle} for {caller:?} instance {caller_instance:?}" ); @@ -2904,15 +2720,11 @@ impl Instance { set: u32, payload: u32, ) -> Result { - let state = self.concurrent_state_mut(store); - let opts = state.options(options); + let opts = self.concurrent_state_mut(store).options(options); let async_ = opts.async_; let caller_instance = opts.instance; - let (rep, WaitableState::Set) = - state.waitable_tables[caller_instance].get_mut_by_index(set)? - else { - bail!("invalid waitable-set handle"); - }; + let rep = + self.id().get_mut(store).guest_tables().0[caller_instance].waitable_set_rep(set)?; self.waitable_check( store, @@ -2934,15 +2746,11 @@ impl Instance { set: u32, payload: u32, ) -> Result { - let state = self.concurrent_state_mut(store); - let opts = state.options(options); + let opts = self.concurrent_state_mut(store).options(options); let async_ = opts.async_; let caller_instance = opts.instance; - let (rep, WaitableState::Set) = - state.waitable_tables[caller_instance].get_mut_by_index(set)? - else { - bail!("invalid waitable-set handle"); - }; + let rep = + self.id().get_mut(store).guest_tables().0[caller_instance].waitable_set_rep(set)?; self.waitable_check( store, @@ -3024,7 +2832,7 @@ impl Instance { let result = match check { // Deliver any pending events to the guest and return. WaitableCheck::Wait(params) | WaitableCheck::Poll(params) => { - let event = self.concurrent_state_mut(store).get_event( + let event = self.id().get_mut(store).get_event( guest_task, params.caller_instance, Some(params.set), @@ -3077,26 +2885,23 @@ impl Instance { async_: bool, task_id: u32, ) -> Result { - let concurrent_state = self.concurrent_state_mut(store); - let (rep, state) = - concurrent_state.waitable_tables[caller_instance].get_mut_by_index(task_id)?; - let (waitable, expected_caller_instance) = match state { - WaitableState::HostTask => { - let id = TableId::::new(rep); - ( - Waitable::Host(id), - concurrent_state.get(id)?.caller_instance, - ) - } - WaitableState::GuestTask => { - let id = TableId::::new(rep); - if let Caller::Guest { instance, .. } = &concurrent_state.get(id)?.caller { - (Waitable::Guest(id), *instance) - } else { - unreachable!() - } + let (rep, is_host) = + self.id().get_mut(store).guest_tables().0[caller_instance].subtask_rep(task_id)?; + let (waitable, expected_caller_instance) = if is_host { + let id = TableId::::new(rep); + ( + Waitable::Host(id), + self.concurrent_state_mut(store).get(id)?.caller_instance, + ) + } else { + let id = TableId::::new(rep); + if let Caller::Guest { instance, .. } = + &self.concurrent_state_mut(store).get(id)?.caller + { + (Waitable::Guest(id), *instance) + } else { + unreachable!() } - _ => bail!("invalid task handle: {task_id}"), }; // Since waitables can neither be passed between instances nor forged, // this should never fail unless there's a bug in Wasmtime, but we check @@ -3105,6 +2910,7 @@ impl Instance { log::trace!("subtask_cancel {waitable:?} (handle {task_id})"); + let concurrent_state = self.concurrent_state_mut(store); if let Waitable::Host(host_task) = waitable { if let Some(handle) = concurrent_state.get_mut(host_task)?.join_handle.take() { handle.abort(); @@ -3487,7 +3293,7 @@ impl VMComponentAsyncStore for StoreInner { instance .guest_write( StoreContextMut(self), - TableIndex::Future(ty), + TransmitIndex::Future(ty), options, None, future, @@ -3508,7 +3314,7 @@ impl VMComponentAsyncStore for StoreInner { instance .guest_read( StoreContextMut(self), - TableIndex::Future(ty), + TransmitIndex::Future(ty), options, None, future, @@ -3530,7 +3336,7 @@ impl VMComponentAsyncStore for StoreInner { instance .guest_write( StoreContextMut(self), - TableIndex::Stream(ty), + TransmitIndex::Stream(ty), options, None, stream, @@ -3552,7 +3358,7 @@ impl VMComponentAsyncStore for StoreInner { instance .guest_read( StoreContextMut(self), - TableIndex::Stream(ty), + TransmitIndex::Stream(ty), options, None, stream, @@ -3568,7 +3374,7 @@ impl VMComponentAsyncStore for StoreInner { ty: TypeFutureTableIndex, writer: u32, ) -> Result<()> { - instance.guest_drop_writable(StoreContextMut(self), TableIndex::Future(ty), writer) + instance.guest_drop_writable(StoreContextMut(self), TransmitIndex::Future(ty), writer) } fn flat_stream_write( @@ -3585,7 +3391,7 @@ impl VMComponentAsyncStore for StoreInner { instance .guest_write( StoreContextMut(self), - TableIndex::Stream(ty), + TransmitIndex::Stream(ty), options, Some(FlatAbi { size: payload_size, @@ -3612,7 +3418,7 @@ impl VMComponentAsyncStore for StoreInner { instance .guest_read( StoreContextMut(self), - TableIndex::Stream(ty), + TransmitIndex::Stream(ty), options, Some(FlatAbi { size: payload_size, @@ -3631,7 +3437,7 @@ impl VMComponentAsyncStore for StoreInner { ty: TypeStreamTableIndex, writer: u32, ) -> Result<()> { - instance.guest_drop_writable(StoreContextMut(self), TableIndex::Stream(ty), writer) + instance.guest_drop_writable(StoreContextMut(self), TransmitIndex::Stream(ty), writer) } fn error_context_debug_message( @@ -3921,20 +3727,18 @@ impl Waitable { /// Retrieve the `Waitable` corresponding to the specified guest-visible /// handle. fn from_instance( - state: &mut ConcurrentState, + state: Pin<&mut ComponentInstance>, caller_instance: RuntimeComponentInstanceIndex, waitable: u32, ) -> Result { - let (waitable, state) = - state.waitable_tables[caller_instance].get_mut_by_index(waitable)?; - - Ok(match state { - WaitableState::HostTask => Waitable::Host(TableId::new(waitable)), - WaitableState::GuestTask => Waitable::Guest(TableId::new(waitable)), - WaitableState::Stream(..) | WaitableState::Future(..) => { - Waitable::Transmit(TableId::new(waitable)) - } - _ => bail!("invalid waitable handle"), + use crate::runtime::vm::component::Waitable; + + let (waitable, kind) = state.guest_tables().0[caller_instance].waitable_rep(waitable)?; + + Ok(match kind { + Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)), + Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)), + Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)), }) } @@ -4036,7 +3840,7 @@ impl Waitable { /// Handle the imminent delivery of the specified event, e.g. by updating /// the state of the stream or future. - fn on_delivery(&self, state: &mut ConcurrentState, event: Event) { + fn on_delivery(&self, instance: Pin<&mut ComponentInstance>, event: Event) { match event { Event::FutureRead { pending: Some((ty, handle)), @@ -4046,20 +3850,15 @@ impl Waitable { pending: Some((ty, handle)), .. } => { - let runtime_instance = state.component.types()[ty].instance; - let (rep, WaitableState::Future(actual_ty, state)) = state.waitable_tables - [runtime_instance] - .get_mut_by_index(handle) - .unwrap() - else { - unreachable!() - }; - assert_eq!(*actual_ty, ty); + let runtime_instance = instance.component().types()[ty].instance; + let (rep, state) = instance.guest_tables().0[runtime_instance] + .future_rep(ty, handle) + .unwrap(); assert_eq!(rep, self.rep()); - assert_eq!(*state, StreamFutureState::Busy); + assert_eq!(*state, TransmitLocalState::Busy); *state = match event { - Event::FutureRead { .. } => StreamFutureState::Read { done: false }, - Event::FutureWrite { .. } => StreamFutureState::Write { done: false }, + Event::FutureRead { .. } => TransmitLocalState::Read { done: false }, + Event::FutureWrite { .. } => TransmitLocalState::Write { done: false }, _ => unreachable!(), }; } @@ -4071,21 +3870,16 @@ impl Waitable { pending: Some((ty, handle)), code, } => { - let runtime_instance = state.component.types()[ty].instance; - let (rep, WaitableState::Stream(actual_ty, state)) = state.waitable_tables - [runtime_instance] - .get_mut_by_index(handle) - .unwrap() - else { - unreachable!() - }; - assert_eq!(*actual_ty, ty); + let runtime_instance = instance.component().types()[ty].instance; + let (rep, state) = instance.guest_tables().0[runtime_instance] + .stream_rep(ty, handle) + .unwrap(); assert_eq!(rep, self.rep()); - assert_eq!(*state, StreamFutureState::Busy); + assert_eq!(*state, TransmitLocalState::Busy); let done = matches!(code, ReturnCode::Dropped(_)); *state = match event { - Event::StreamRead { .. } => StreamFutureState::Read { done }, - Event::StreamWrite { .. } => StreamFutureState::Write { done }, + Event::StreamRead { .. } => TransmitLocalState::Read { done }, + Event::StreamWrite { .. } => TransmitLocalState::Write { done }, _ => unreachable!(), }; } @@ -4210,9 +4004,6 @@ pub struct ConcurrentState { /// populated as needed. // TODO: this can and should be a `PrimaryMap` instance_states: HashMap, - /// Tables for tracking per-(sub-)component waitable handles and their - /// states. - waitable_tables: PrimaryMap>, /// The "high priority" work queue for this instance's event loop. high_priority: Vec, /// The "high priority" work queue for this instance's event loop. @@ -4228,18 +4019,6 @@ pub struct ConcurrentState { /// A place to stash the work item for which we're resuming a worker fiber. worker_item: Option, - /// (Sub)Component specific error context tracking - /// - /// At the component level, only the number of references (`usize`) to a given error context is tracked, - /// with state related to the error context being held at the component model level, in concurrent - /// state. - /// - /// The state tables in the (sub)component local tracking must contain a pointer into the global - /// error context lookups in order to ensure that in contexts where only the local reference is present - /// the global state can still be maintained/updated. - error_context_tables: - PrimaryMap>, - /// Reference counts for all component error contexts /// /// NOTE: it is possible the global ref count to be *greater* than the sum of @@ -4262,34 +4041,16 @@ pub struct ConcurrentState { impl ConcurrentState { pub(crate) fn new(component: &Component) -> Self { - let num_waitable_tables = component.env_component().num_runtime_component_instances; - let num_error_context_tables = component.env_component().num_error_context_tables; - let mut waitable_tables = - PrimaryMap::with_capacity(usize::try_from(num_waitable_tables).unwrap()); - for _ in 0..num_waitable_tables { - waitable_tables.push(StateTable::default()); - } - - let mut error_context_tables = PrimaryMap::< - TypeComponentLocalErrorContextTableIndex, - StateTable, - >::with_capacity(num_error_context_tables); - for _ in 0..num_error_context_tables { - error_context_tables.push(StateTable::default()); - } - Self { guest_task: None, table: Table::new(), futures: Mutex::new(Some(FuturesUnordered::new())), instance_states: HashMap::new(), - waitable_tables, high_priority: Vec::new(), low_priority: Vec::new(), suspend_reason: None, worker: None, worker_item: None, - error_context_tables, global_error_context_ref_counts: BTreeMap::new(), component: component.clone(), } @@ -4356,6 +4117,169 @@ impl ConcurrentState { futures.push(them); } } + + fn instance_state(&mut self, instance: RuntimeComponentInstanceIndex) -> &mut InstanceState { + self.instance_states.entry(instance).or_default() + } + + fn push(&mut self, value: V) -> Result, TableError> { + self.table.push(value) + } + + fn get(&self, id: TableId) -> Result<&V, TableError> { + self.table.get(id) + } + + fn get_mut(&mut self, id: TableId) -> Result<&mut V, TableError> { + self.table.get_mut(id) + } + + pub fn add_child( + &mut self, + child: TableId, + parent: TableId, + ) -> Result<(), TableError> { + self.table.add_child(child, parent) + } + + pub fn remove_child( + &mut self, + child: TableId, + parent: TableId, + ) -> Result<(), TableError> { + self.table.remove_child(child, parent) + } + + fn delete(&mut self, id: TableId) -> Result { + self.table.delete(id) + } + + fn push_future(&mut self, future: HostTaskFuture) { + // Note that we can't directly push to `ConcurrentState::futures` here + // since this may be called from a future that's being polled inside + // `Self::poll_until`, which temporarily removes the `FuturesUnordered` + // so it has exclusive access while polling it. Therefore, we push a + // work item to the "high priority" queue, which will actually push to + // `ConcurrentState::futures` later. + self.push_high_priority(WorkItem::PushFuture(Mutex::new(future))); + } + + fn push_high_priority(&mut self, item: WorkItem) { + log::trace!("push high priority: {item:?}"); + self.high_priority.push(item); + } + + fn push_low_priority(&mut self, item: WorkItem) { + log::trace!("push low priority: {item:?}"); + self.low_priority.push(item); + } + + /// Determine whether the instance associated with the specified guest task + /// may be entered (i.e. is not already on the async call stack). + /// + /// This is an additional check on top of the "may_enter" instance flag; + /// it's needed because async-lifted exports with callback functions must + /// not call their own instances directly or indirectly, and due to the + /// "stackless" nature of callback-enabled guest tasks this may happen even + /// if there are no activation records on the stack (i.e. the "may_enter" + /// field is `true`) for that instance. + fn may_enter(&mut self, mut guest_task: TableId) -> bool { + let guest_instance = self.get(guest_task).unwrap().instance; + + // Walk the task tree back to the root, looking for potential + // reentrance. + // + // TODO: This could be optimized by maintaining a per-`GuestTask` bitset + // such that each bit represents and instance which has been entered by + // that task or an ancestor of that task, in which case this would be a + // constant time check. + loop { + match &self.get_mut(guest_task).unwrap().caller { + Caller::Host { .. } => break true, + Caller::Guest { task, instance } => { + if *instance == guest_instance { + break false; + } else { + guest_task = *task; + } + } + } + } + } + + /// Record that we're about to enter a (sub-)component instance which does + /// not support more than one concurrent, stackful activation, meaning it + /// cannot be entered again until the next call returns. + fn enter_instance(&mut self, instance: RuntimeComponentInstanceIndex) { + self.instance_state(instance).do_not_enter = true; + } + + /// Record that we've exited a (sub-)component instance previously entered + /// with `Self::enter_instance` and then calls `Self::partition_pending`. + /// See the documentation for the latter for details. + fn exit_instance(&mut self, instance: RuntimeComponentInstanceIndex) -> Result<()> { + self.instance_state(instance).do_not_enter = false; + self.partition_pending(instance) + } + + /// Iterate over `InstanceState::pending`, moving any ready items into the + /// "high priority" work item queue. + /// + /// See `GuestCall::is_ready` for details. + fn partition_pending(&mut self, instance: RuntimeComponentInstanceIndex) -> Result<()> { + for (task, kind) in mem::take(&mut self.instance_state(instance).pending).into_iter() { + let call = GuestCall { task, kind }; + if call.is_ready(self)? { + self.push_high_priority(WorkItem::GuestCall(call)); + } else { + self.instance_state(instance) + .pending + .insert(call.task, call.kind); + } + } + + Ok(()) + } + + /// Implements the `backpressure.set` intrinsic. + pub(crate) fn backpressure_set( + &mut self, + caller_instance: RuntimeComponentInstanceIndex, + enabled: u32, + ) -> Result<()> { + let state = self.instance_state(caller_instance); + let old = state.backpressure; + let new = enabled != 0; + state.backpressure = new; + + if old && !new { + // Backpressure was previously enabled and is now disabled; move any + // newly-eligible guest calls to the "high priority" queue. + self.partition_pending(caller_instance)?; + } + + Ok(()) + } + + /// Implements the `context.get` intrinsic. + pub(crate) fn context_get(&mut self, slot: u32) -> Result { + let task = self.guest_task.unwrap(); + let val = self.get(task)?.context[usize::try_from(slot).unwrap()]; + log::trace!("context_get {task:?} slot {slot} val {val:#x}"); + Ok(val) + } + + /// Implements the `context.set` intrinsic. + pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> { + let task = self.guest_task.unwrap(); + log::trace!("context_set {task:?} slot {slot} val {val:#x}"); + self.get_mut(task)?.context[usize::try_from(slot).unwrap()] = val; + Ok(()) + } + + fn options(&self, options: OptionsIndex) -> &CanonicalOptions { + &self.component.env_component().options[options] + } } /// Provide a type hint to compiler about the shape of a parameter lower diff --git a/crates/wasmtime/src/runtime/component/concurrent/error_contexts.rs b/crates/wasmtime/src/runtime/component/concurrent/error_contexts.rs index 435197f79e5e..603d801e8ffb 100644 --- a/crates/wasmtime/src/runtime/component/concurrent/error_contexts.rs +++ b/crates/wasmtime/src/runtime/component/concurrent/error_contexts.rs @@ -1,11 +1,3 @@ -/// Error context reference count local to a given (sub)component -/// -/// This reference count is localized to a single (sub)component, -/// rather than the global cross-component count (i.e. that determines -/// when a error context can be completely removed) -#[repr(transparent)] -pub struct LocalErrorContextRefCount(pub(crate) usize); - /// Error context reference count across a [`ComponentInstance`] /// /// Contrasted to `LocalErrorContextRefCount`, this count is maintained diff --git a/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs b/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs index 0433bbb2fa85..3f6d970904f7 100644 --- a/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs +++ b/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs @@ -1,8 +1,5 @@ use super::table::{TableDebug, TableId}; -use super::{ - Event, GlobalErrorContextRefCount, LocalErrorContextRefCount, StateTable, Waitable, - WaitableCommon, WaitableState, -}; +use super::{Event, GlobalErrorContextRefCount, Waitable, WaitableCommon}; use crate::component::concurrent::{ConcurrentState, WorkItem}; use crate::component::func::{self, LiftContext, LowerContext, Options}; use crate::component::matching::InstanceType; @@ -10,6 +7,7 @@ use crate::component::values::{ErrorContextAny, FutureAny, StreamAny}; use crate::component::{AsAccessor, Instance, Lower, Val, WasmList, WasmStr}; use crate::store::{StoreOpaque, StoreToken}; use crate::vm::VMStore; +use crate::vm::component::{ComponentInstance, HandleTable, TransmitLocalState}; use crate::{AsContextMut, StoreContextMut, ValRaw}; use anyhow::{Context, Result, anyhow, bail}; use buffers::Extender; @@ -21,12 +19,13 @@ use std::future; use std::iter; use std::marker::PhantomData; use std::mem::{self, MaybeUninit}; +use std::pin::Pin; use std::string::{String, ToString}; use std::sync::{Arc, Mutex}; use std::task::{Poll, Waker}; use std::vec::Vec; use wasmtime_environ::component::{ - CanonicalAbiInfo, ComponentTypes, InterfaceType, OptionsIndex, RuntimeComponentInstanceIndex, + CanonicalAbiInfo, ComponentTypes, InterfaceType, OptionsIndex, TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex, TypeFutureTableIndex, TypeStreamTableIndex, }; @@ -38,7 +37,7 @@ mod buffers; /// Enum for distinguishing between a stream or future in functions that handle /// both. #[derive(Copy, Clone, Debug)] -enum TransmitKind { +pub enum TransmitKind { Stream, Future, } @@ -95,16 +94,16 @@ impl ReturnCode { /// This is useful as a parameter type for functions which operate on either a /// future or a stream. #[derive(Copy, Clone, Debug)] -pub(super) enum TableIndex { +pub enum TransmitIndex { Stream(TypeStreamTableIndex), Future(TypeFutureTableIndex), } -impl TableIndex { - fn kind(&self) -> TransmitKind { +impl TransmitIndex { + pub fn kind(&self) -> TransmitKind { match self { - TableIndex::Stream(_) => TransmitKind::Stream, - TableIndex::Future(_) => TransmitKind::Future, + TransmitIndex::Stream(_) => TransmitKind::Stream, + TransmitIndex::Future(_) => TransmitKind::Future, } } } @@ -127,51 +126,23 @@ struct HostResult { /// Retrieve the payload type of the specified stream or future, or `None` if it /// has no payload type. -fn payload(ty: TableIndex, types: &Arc) -> Option { +fn payload(ty: TransmitIndex, types: &Arc) -> Option { match ty { - TableIndex::Future(ty) => types[types[ty].ty].payload, - TableIndex::Stream(ty) => types[types[ty].ty].payload, + TransmitIndex::Future(ty) => types[types[ty].ty].payload, + TransmitIndex::Stream(ty) => types[types[ty].ty].payload, } } /// Retrieve the host rep and state for the specified guest-visible waitable /// handle. fn get_mut_by_index_from( - state_table: &mut StateTable, - ty: TableIndex, + handle_table: &mut HandleTable, + ty: TransmitIndex, index: u32, -) -> Result<(u32, &mut StreamFutureState)> { - Ok(match ty { - TableIndex::Stream(ty) => { - let (rep, WaitableState::Stream(actual_ty, state)) = - state_table.get_mut_by_index(index)? - else { - bail!("invalid stream handle"); - }; - if *actual_ty != ty { - bail!("invalid stream handle"); - } - (rep, state) - } - TableIndex::Future(ty) => { - let (rep, WaitableState::Future(actual_ty, state)) = - state_table.get_mut_by_index(index)? - else { - bail!("invalid future handle"); - }; - if *actual_ty != ty { - bail!("invalid future handle"); - } - (rep, state) - } - }) -} - -/// Construct a `WaitableState` using the specified type and state. -fn waitable_state(ty: TableIndex, state: StreamFutureState) -> WaitableState { +) -> Result<(u32, &mut TransmitLocalState)> { match ty { - TableIndex::Stream(ty) => WaitableState::Stream(ty, state), - TableIndex::Future(ty) => WaitableState::Future(ty, state), + TransmitIndex::Stream(ty) => handle_table.stream_rep(ty, index), + TransmitIndex::Future(ty) => handle_table.future_rep(ty, index), } } @@ -369,28 +340,6 @@ async fn watch_writer(accessor: impl AsAccessor, instance: Instance, id: TableId .await } -/// Represents the state of a stream or future handle from the perspective of a -/// given component instance. -#[derive(Debug, Eq, PartialEq)] -pub(super) enum StreamFutureState { - /// The write end of the stream or future. - Write { - /// Whether the component instance has been notified that the stream or - /// future is "done" (i.e. the other end has dropped, or, in the case of - /// a future, a value has been transmitted). - done: bool, - }, - /// The read end of the stream or future. - Read { - /// Whether the component instance has been notified that the stream or - /// future is "done" (i.e. the other end has dropped, or, in the case of - /// a future, a value has been transmitted). - done: bool, - }, - /// A read or write is in progress. - Busy, -} - /// Represents the state associated with an error context #[derive(Debug, PartialEq, Eq, PartialOrd)] pub(super) struct ErrorContextState { @@ -697,21 +646,13 @@ impl FutureReader { fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result { match ty { InterfaceType::Future(src) => { - let state_table = cx + let handle_table = cx .instance_mut() - .concurrent_state_mut() - .state_table(TableIndex::Future(src)); - let (rep, state) = - get_mut_by_index_from(state_table, TableIndex::Future(src), index)?; - - match state { - StreamFutureState::Read { .. } => { - state_table.remove_by_index(index)?; - } - StreamFutureState::Write { .. } => bail!("cannot transfer write end of future"), - StreamFutureState::Busy => bail!("cannot transfer busy future"), + .table_for_transmit(TransmitIndex::Future(src)); + let (rep, is_done) = handle_table.future_remove_readable(src, index)?; + if is_done { + bail!("cannot lift future after being notified that the writable end dropped"); } - let id = TableId::::new(rep); let concurrent_state = cx.instance_mut().concurrent_state_mut(); let state = concurrent_state.get(id)?.state; @@ -786,12 +727,9 @@ pub(crate) fn lower_future_to_index( .state; let rep = concurrent_state.get(state)?.read_handle.rep(); - concurrent_state - .state_table(TableIndex::Future(dst)) - .insert( - rep, - WaitableState::Future(dst, StreamFutureState::Read { done: false }), - ) + cx.instance_mut() + .table_for_transmit(TransmitIndex::Future(dst)) + .future_insert_read(dst, rep) } _ => func::bad_type_info(), } @@ -1274,26 +1212,14 @@ impl StreamReader { fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result { match ty { InterfaceType::Stream(src) => { - let state_table = cx + let handle_table = cx .instance_mut() - .concurrent_state_mut() - .state_table(TableIndex::Stream(src)); - let (rep, state) = - get_mut_by_index_from(state_table, TableIndex::Stream(src), index)?; - - match state { - StreamFutureState::Read { done: true } => bail!( - "cannot lift stream after being notified that the writable end dropped" - ), - StreamFutureState::Read { done: false } => { - state_table.remove_by_index(index)?; - } - StreamFutureState::Write { .. } => bail!("cannot transfer write end of stream"), - StreamFutureState::Busy => bail!("cannot transfer busy stream"), + .table_for_transmit(TransmitIndex::Stream(src)); + let (rep, is_done) = handle_table.stream_remove_readable(src, index)?; + if is_done { + bail!("cannot lift stream after being notified that the writable end dropped"); } - let id = TableId::::new(rep); - Ok(Self::new(id, cx.instance_handle())) } _ => func::bad_type_info(), @@ -1360,12 +1286,9 @@ pub(crate) fn lower_stream_to_index( .state; let rep = concurrent_state.get(state)?.read_handle.rep(); - concurrent_state - .state_table(TableIndex::Stream(dst)) - .insert( - rep, - WaitableState::Stream(dst, StreamFutureState::Read { done: false }), - ) + cx.instance_mut() + .table_for_transmit(TransmitIndex::Stream(dst)) + .stream_insert_read(dst, rep) } _ => func::bad_type_info(), } @@ -1545,13 +1468,10 @@ impl ErrorContext { fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result { match ty { InterfaceType::ErrorContext(src) => { - let (rep, _) = cx + let rep = cx .instance_mut() - .concurrent_state_mut() - .error_context_tables - .get_mut(src) - .expect("error context table index present in (sub)component table during lift") - .get_mut_by_index(index)?; + .table_for_error_context(src) + .error_context_rep(index)?; Ok(Self { rep }) } @@ -1567,24 +1487,12 @@ pub(crate) fn lower_error_context_to_index( ) -> Result { match ty { InterfaceType::ErrorContext(dst) => { - let tbl = cx - .instance_mut() - .concurrent_state_mut() - .error_context_tables - .get_mut(dst) - .expect("error context table index present in (sub)component table during lower"); - - if let Some((dst_idx, dst_state)) = tbl.get_mut_by_rep(rep) { - dst_state.0 += 1; - Ok(dst_idx) - } else { - tbl.insert(rep, LocalErrorContextRefCount(1)) - } + let tbl = cx.instance_mut().table_for_error_context(dst); + tbl.error_context_insert(rep) } _ => func::bad_type_info(), } } - // SAFETY: This relies on the `ComponentType` implementation for `u32` being // safe and correct since we lift and lower future handles as `u32`s. unsafe impl func::ComponentType for ErrorContext { @@ -1720,7 +1628,7 @@ enum WriteState { Open, /// The write end is owned by a guest task and a write is pending. GuestReady { - ty: TableIndex, + ty: TransmitIndex, flat_abi: Option, options: Options, address: usize, @@ -1755,7 +1663,7 @@ enum ReadState { Open, /// The read end is owned by a guest task and a read is pending. GuestReady { - ty: TableIndex, + ty: TransmitIndex, flat_abi: Option, options: Options, address: usize, @@ -1808,7 +1716,7 @@ enum Reader<'a> { /// The read end is owned by a guest task. Guest { options: &'a Options, - ty: TableIndex, + ty: TransmitIndex, address: usize, count: usize, }, @@ -1942,11 +1850,11 @@ impl Instance { self.concurrent_state_mut(store.0).set_event( read_handle.rep(), match ty { - TableIndex::Future(ty) => Event::FutureRead { + TransmitIndex::Future(ty) => Event::FutureRead { code, pending: Some((ty, handle)), }, - TableIndex::Stream(ty) => Event::StreamRead { + TransmitIndex::Stream(ty) => Event::StreamRead { code, pending: Some((ty, handle)), }, @@ -2072,7 +1980,7 @@ impl Instance { post_write, .. } => { - if let TableIndex::Future(_) = ty { + if let TransmitIndex::Future(_) = ty { transmit.done = true; } @@ -2100,11 +2008,11 @@ impl Instance { state.set_event( write_handle.rep(), match ty { - TableIndex::Future(ty) => Event::FutureWrite { + TransmitIndex::Future(ty) => Event::FutureWrite { code, pending: pending.then_some((ty, handle)), }, - TableIndex::Stream(ty) => Event::StreamWrite { + TransmitIndex::Stream(ty) => Event::StreamWrite { code, pending: pending.then_some((ty, handle)), }, @@ -2211,11 +2119,11 @@ impl Instance { state.update_event( write_handle.rep(), match ty { - TableIndex::Future(ty) => Event::FutureWrite { + TransmitIndex::Future(ty) => Event::FutureWrite { code: ReturnCode::Dropped(0), pending: Some((ty, handle)), }, - TableIndex::Stream(ty) => Event::StreamWrite { + TransmitIndex::Stream(ty) => Event::StreamWrite { code: ReturnCode::Dropped(0), pending: Some((ty, handle)), }, @@ -2328,11 +2236,11 @@ impl Instance { self.concurrent_state_mut(store.0).update_event( read_handle.rep(), match ty { - TableIndex::Future(ty) => Event::FutureRead { + TransmitIndex::Future(ty) => Event::FutureRead { code: ReturnCode::Dropped(0), pending: Some((ty, handle)), }, - TableIndex::Stream(ty) => Event::StreamRead { + TransmitIndex::Stream(ty) => Event::StreamRead { code: ReturnCode::Dropped(0), pending: Some((ty, handle)), }, @@ -2378,36 +2286,22 @@ impl Instance { pub(super) fn guest_drop_writable( self, store: StoreContextMut, - ty: TableIndex, + ty: TransmitIndex, writer: u32, ) -> Result<()> { - let (transmit_rep, state) = self - .concurrent_state_mut(store.0) - .state_table(ty) - .remove_by_index(writer) - .context("failed to find writer")?; - let (state, kind) = match state { - WaitableState::Stream(_, state) => (state, TransmitKind::Stream), - WaitableState::Future(_, state) => (state, TransmitKind::Future), - _ => { - bail!("invalid stream or future handle"); - } + let table = self.id().get_mut(store.0).table_for_transmit(ty); + let transmit_rep = match ty { + TransmitIndex::Future(ty) => table.future_remove_writable(ty, writer)?, + TransmitIndex::Stream(ty) => table.stream_remove_writable(ty, writer)?, }; - match state { - StreamFutureState::Write { .. } => {} - StreamFutureState::Read { .. } => { - bail!("passed read end to `{{stream|future}}.drop-writable`") - } - StreamFutureState::Busy => bail!("cannot drop busy stream or future"), - } let id = TableId::::new(transmit_rep); log::trace!("guest_drop_writable: drop writer {id:?}"); - match kind { - TransmitKind::Stream => { + match ty { + TransmitIndex::Stream(_) => { self.host_drop_writer(store, id, None::<&dyn Fn() -> Result<()>>) } - TransmitKind::Future => self.host_drop_writer( + TransmitIndex::Future(_) => self.host_drop_writer( store, id, Some(&|| { @@ -2425,10 +2319,10 @@ impl Instance { self, mut store: StoreContextMut, flat_abi: Option, - write_ty: TableIndex, + write_ty: TransmitIndex, write_options: &Options, write_address: usize, - read_ty: TableIndex, + read_ty: TransmitIndex, read_options: &Options, read_address: usize, count: usize, @@ -2436,7 +2330,7 @@ impl Instance { ) -> Result<()> { let types = self.id().get(store.0).component().types().clone(); match (write_ty, read_ty) { - (TableIndex::Future(write_ty), TableIndex::Future(read_ty)) => { + (TransmitIndex::Future(write_ty), TransmitIndex::Future(read_ty)) => { assert_eq!(count, 1); let val = types[types[write_ty].ty] @@ -2474,7 +2368,7 @@ impl Instance { val.store(lower, ty, ptr)?; } } - (TableIndex::Stream(write_ty), TableIndex::Stream(read_ty)) => { + (TransmitIndex::Stream(write_ty), TransmitIndex::Stream(read_ty)) => { if let Some(flat_abi) = flat_abi { // Fast path memcpy for "flat" (i.e. no pointers or handles) payloads: let length_in_bytes = usize::try_from(flat_abi.size).unwrap() * count; @@ -2562,7 +2456,7 @@ impl Instance { pub(super) fn guest_write( self, mut store: StoreContextMut, - ty: TableIndex, + ty: TransmitIndex, options: OptionsIndex, flat_abi: Option, handle: u32, @@ -2575,9 +2469,8 @@ impl Instance { if !options.async_() { bail!("synchronous stream and future writes not yet supported"); } - let concurrent_state = self.concurrent_state_mut(store.0); - let (rep, state) = concurrent_state.get_mut_by_index(ty, handle)?; - let StreamFutureState::Write { done } = *state else { + let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?; + let TransmitLocalState::Write { done } = *state else { bail!( "invalid handle {handle}; expected `Write`; got {:?}", *state @@ -2588,8 +2481,9 @@ impl Instance { bail!("cannot write to stream after being notified that the readable end dropped"); } - *state = StreamFutureState::Busy; + *state = TransmitLocalState::Busy; let transmit_handle = TableId::::new(rep); + let concurrent_state = self.concurrent_state_mut(store.0); let transmit_id = concurrent_state.get(transmit_handle)?.state; let transmit = concurrent_state.get_mut(transmit_id)?; log::trace!( @@ -2633,7 +2527,7 @@ impl Instance { } => { assert_eq!(flat_abi, read_flat_abi); - if let TableIndex::Future(_) = ty { + if let TransmitIndex::Future(_) = ty { transmit.done = true; } @@ -2702,11 +2596,11 @@ impl Instance { concurrent_state.set_event( read_handle_rep, match read_ty { - TableIndex::Future(ty) => Event::FutureRead { + TransmitIndex::Future(ty) => Event::FutureRead { code, pending: Some((ty, read_handle)), }, - TableIndex::Stream(ty) => Event::StreamRead { + TransmitIndex::Stream(ty) => Event::StreamRead { code, pending: Some((ty, read_handle)), }, @@ -2735,7 +2629,7 @@ impl Instance { } ReadState::HostReady { accept } => { - if let TableIndex::Future(_) = ty { + if let TransmitIndex::Future(_) = ty { transmit.done = true; } @@ -2754,7 +2648,7 @@ impl Instance { } ReadState::Dropped => { - if let TableIndex::Future(_) = ty { + if let TransmitIndex::Future(_) = ty { transmit.done = true; } @@ -2763,13 +2657,13 @@ impl Instance { }; if result != ReturnCode::Blocked { - let state = self.concurrent_state_mut(store.0); - *state.get_mut_by_index(ty, handle)?.1 = StreamFutureState::Write { - done: matches!( - (result, ty), - (ReturnCode::Dropped(_), TableIndex::Stream(_)) - ), - }; + *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 = + TransmitLocalState::Write { + done: matches!( + (result, ty), + (ReturnCode::Dropped(_), TransmitIndex::Stream(_)) + ), + }; } Ok(result) @@ -2779,7 +2673,7 @@ impl Instance { pub(super) fn guest_read( self, mut store: StoreContextMut, - ty: TableIndex, + ty: TransmitIndex, options: OptionsIndex, flat_abi: Option, handle: u32, @@ -2791,9 +2685,8 @@ impl Instance { if !options.async_() { bail!("synchronous stream and future reads not yet supported"); } - let concurrent_state = self.concurrent_state_mut(store.0); - let (rep, state) = concurrent_state.get_mut_by_index(ty, handle)?; - let StreamFutureState::Read { done } = *state else { + let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?; + let TransmitLocalState::Read { done } = *state else { bail!("invalid handle {handle}; expected `Read`; got {:?}", *state); }; @@ -2801,8 +2694,9 @@ impl Instance { bail!("cannot read from stream after being notified that the writable end dropped"); } - *state = StreamFutureState::Busy; + *state = TransmitLocalState::Busy; let transmit_handle = TableId::::new(rep); + let concurrent_state = self.concurrent_state_mut(store.0); let transmit_id = concurrent_state.get(transmit_handle)?.state; let transmit = concurrent_state.get_mut(transmit_id)?; log::trace!( @@ -2846,7 +2740,7 @@ impl Instance { } => { assert_eq!(flat_abi, write_flat_abi); - if let TableIndex::Future(_) = ty { + if let TransmitIndex::Future(_) = ty { transmit.done = true; } @@ -2907,11 +2801,11 @@ impl Instance { concurrent_state.set_event( write_handle_rep, match write_ty { - TableIndex::Future(ty) => Event::FutureWrite { + TransmitIndex::Future(ty) => Event::FutureWrite { code, pending: pending.then_some((ty, write_handle)), }, - TableIndex::Stream(ty) => Event::StreamWrite { + TransmitIndex::Stream(ty) => Event::StreamWrite { code, pending: pending.then_some((ty, write_handle)), }, @@ -2941,7 +2835,7 @@ impl Instance { } WriteState::HostReady { accept, post_write } => { - if let TableIndex::Future(_) = ty { + if let TransmitIndex::Future(_) = ty { transmit.done = true; } @@ -2974,13 +2868,13 @@ impl Instance { }; if result != ReturnCode::Blocked { - let state = self.concurrent_state_mut(store.0); - *state.get_mut_by_index(ty, handle)?.1 = StreamFutureState::Read { - done: matches!( - (result, ty), - (ReturnCode::Dropped(_), TableIndex::Stream(_)) - ), - }; + *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 = + TransmitLocalState::Read { + done: matches!( + (result, ty), + (ReturnCode::Dropped(_), TransmitIndex::Stream(_)) + ), + }; } Ok(result) @@ -2990,25 +2884,18 @@ impl Instance { fn guest_drop_readable( self, store: &mut dyn VMStore, - ty: TableIndex, + ty: TransmitIndex, reader: u32, ) -> Result<()> { - let concurrent_state = self.concurrent_state_mut(store); - let (rep, state) = concurrent_state.state_table(ty).remove_by_index(reader)?; - let (state, kind) = match state { - WaitableState::Stream(_, state) => (state, TransmitKind::Stream), - WaitableState::Future(_, state) => (state, TransmitKind::Future), - _ => { - bail!("invalid stream or future handle"); - } + let table = self.id().get_mut(store).table_for_transmit(ty); + let (rep, _is_done) = match ty { + TransmitIndex::Stream(ty) => table.stream_remove_readable(ty, reader)?, + TransmitIndex::Future(ty) => table.future_remove_readable(ty, reader)?, + }; + let kind = match ty { + TransmitIndex::Stream(_) => TransmitKind::Stream, + TransmitIndex::Future(_) => TransmitKind::Future, }; - match state { - StreamFutureState::Read { .. } => {} - StreamFutureState::Write { .. } => { - bail!("passed write end to `{{stream|future}}.drop-readable`") - } - StreamFutureState::Busy => bail!("cannot drop busy stream or future"), - } let id = TableId::::new(rep); log::trace!("guest_drop_readable: drop reader {id:?}"); self.host_drop_reader(store, id, kind) @@ -3057,13 +2944,11 @@ impl Instance { // Here we reflect the newly created global concurrent error context state into the // component instance's locally tracked count, along with the appropriate key into the global // ref tracking data structures to enable later lookup - let local_tbl = &mut state.error_context_tables[ty]; - - assert!( - !local_tbl.has_handle(table_id.rep()), - "newly created error context state already tracked by component" - ); - let local_idx = local_tbl.insert(table_id.rep(), LocalErrorContextRefCount(1))?; + let local_idx = self + .id() + .get_mut(store) + .table_for_error_context(ty) + .error_context_insert(table_id.rep())?; Ok(local_idx) } @@ -3078,16 +2963,16 @@ impl Instance { debug_msg_address: u32, ) -> Result<()> { // Retrieve the error context and internal debug message - let state = self.concurrent_state_mut(store.0); - let (state_table_id_rep, _) = state - .error_context_tables - .get_mut(ty) - .context("error context table index present in (sub)component lookup during debug_msg")? - .get_mut_by_index(err_ctx_handle)?; + let handle_table_id_rep = self + .id() + .get_mut(store.0) + .table_for_error_context(ty) + .error_context_rep(err_ctx_handle)?; + let state = self.concurrent_state_mut(store.0); // Get the state associated with the error context let ErrorContextState { debug_msg } = - state.get_mut(TableId::::new(state_table_id_rep))?; + state.get_mut(TableId::::new(handle_table_id_rep))?; let debug_msg = debug_msg.clone(); let options = Options::new_index(store.0, self, options); @@ -3115,7 +3000,7 @@ impl Instance { ty: TypeFutureTableIndex, reader: u32, ) -> Result<()> { - self.guest_drop_readable(store, TableIndex::Future(ty), reader) + self.guest_drop_readable(store, TransmitIndex::Future(ty), reader) } /// Implements the `stream.drop-readable` intrinsic. @@ -3125,7 +3010,286 @@ impl Instance { ty: TypeStreamTableIndex, reader: u32, ) -> Result<()> { - self.guest_drop_readable(store, TableIndex::Stream(ty), reader) + self.guest_drop_readable(store, TransmitIndex::Stream(ty), reader) + } +} + +impl ComponentInstance { + fn table_for_transmit(self: Pin<&mut Self>, ty: TransmitIndex) -> &mut HandleTable { + let (tables, types) = self.guest_tables(); + let runtime_instance = match ty { + TransmitIndex::Stream(ty) => types[ty].instance, + TransmitIndex::Future(ty) => types[ty].instance, + }; + &mut tables[runtime_instance] + } + + fn table_for_error_context( + self: Pin<&mut Self>, + ty: TypeComponentLocalErrorContextTableIndex, + ) -> &mut HandleTable { + let (tables, types) = self.guest_tables(); + let runtime_instance = types[ty].instance; + &mut tables[runtime_instance] + } + + fn get_mut_by_index( + self: Pin<&mut Self>, + ty: TransmitIndex, + index: u32, + ) -> Result<(u32, &mut TransmitLocalState)> { + get_mut_by_index_from(self.table_for_transmit(ty), ty, index) + } + + /// Allocate a new future or stream and grant ownership of both the read and + /// write ends to the (sub-)component instance to which the specified + /// `TransmitIndex` belongs. + fn guest_new(mut self: Pin<&mut Self>, ty: TransmitIndex) -> Result { + let (write, read) = self.as_mut().concurrent_state_mut().new_transmit()?; + + let table = self.as_mut().table_for_transmit(ty); + let (read, write) = match ty { + TransmitIndex::Future(ty) => ( + table.future_insert_read(ty, read.rep())?, + table.future_insert_write(ty, write.rep())?, + ), + TransmitIndex::Stream(ty) => ( + table.stream_insert_read(ty, read.rep())?, + table.stream_insert_write(ty, write.rep())?, + ), + }; + Ok(ResourcePair { write, read }) + } + + /// Cancel a pending write for the specified stream or future from the guest. + fn guest_cancel_write( + mut self: Pin<&mut Self>, + ty: TransmitIndex, + writer: u32, + _async_: bool, + ) -> Result { + let (rep, state) = get_mut_by_index_from(self.as_mut().table_for_transmit(ty), ty, writer)?; + let id = TableId::::new(rep); + log::trace!("guest cancel write {id:?} (handle {writer})"); + match state { + TransmitLocalState::Write { .. } => { + bail!("stream or future write cancelled when no write is pending") + } + TransmitLocalState::Read { .. } => { + bail!("passed read end to `{{stream|future}}.cancel-write`") + } + TransmitLocalState::Busy => { + *state = TransmitLocalState::Write { done: false }; + } + } + let state = self.concurrent_state_mut(); + let rep = state.get(id)?.state.rep(); + state.host_cancel_write(rep) + } + + /// Cancel a pending read for the specified stream or future from the guest. + fn guest_cancel_read( + mut self: Pin<&mut Self>, + ty: TransmitIndex, + reader: u32, + _async_: bool, + ) -> Result { + let (rep, state) = get_mut_by_index_from(self.as_mut().table_for_transmit(ty), ty, reader)?; + let id = TableId::::new(rep); + log::trace!("guest cancel read {id:?} (handle {reader})"); + match state { + TransmitLocalState::Read { .. } => { + bail!("stream or future read cancelled when no read is pending") + } + TransmitLocalState::Write { .. } => { + bail!("passed write end to `{{stream|future}}.cancel-read`") + } + TransmitLocalState::Busy => { + *state = TransmitLocalState::Read { done: false }; + } + } + let state = self.concurrent_state_mut(); + let rep = state.get(id)?.state.rep(); + state.host_cancel_read(rep) + } + + /// Drop the specified error context. + pub(crate) fn error_context_drop( + mut self: Pin<&mut Self>, + ty: TypeComponentLocalErrorContextTableIndex, + error_context: u32, + ) -> Result<()> { + let local_handle_table = self.as_mut().table_for_error_context(ty); + + // Reduce the local (sub)component ref count, removing tracking if necessary + let (rep, local_ref_removed) = local_handle_table.error_context_drop(error_context)?; + + let global_ref_count_idx = TypeComponentGlobalErrorContextTableIndex::from_u32(rep); + + let state = self.concurrent_state_mut(); + let GlobalErrorContextRefCount(global_ref_count) = state + .global_error_context_ref_counts + .get_mut(&global_ref_count_idx) + .expect("retrieve concurrent state for error context during drop"); + + // Reduce the component-global ref count, removing tracking if necessary + assert!(*global_ref_count >= 1); + *global_ref_count -= 1; + if *global_ref_count == 0 { + assert!(local_ref_removed); + + state + .global_error_context_ref_counts + .remove(&global_ref_count_idx); + + state + .delete(TableId::::new(rep)) + .context("deleting component-global error context data")?; + } + + Ok(()) + } + + /// Transfer ownership of the specified stream or future read end from one + /// guest to another. + fn guest_transfer( + mut self: Pin<&mut Self>, + src_idx: u32, + src: TransmitIndex, + dst: TransmitIndex, + ) -> Result { + let src_table = self.as_mut().table_for_transmit(src); + let (rep, is_done) = match src { + TransmitIndex::Future(idx) => src_table.future_remove_readable(idx, src_idx)?, + TransmitIndex::Stream(idx) => src_table.stream_remove_readable(idx, src_idx)?, + }; + if is_done { + bail!("cannot lift after being notified that the writable end dropped"); + } + let dst_table = self.as_mut().table_for_transmit(dst); + match dst { + TransmitIndex::Future(idx) => dst_table.future_insert_read(idx, rep), + TransmitIndex::Stream(idx) => dst_table.stream_insert_read(idx, rep), + } + } + + /// Implements the `future.new` intrinsic. + pub(crate) fn future_new( + self: Pin<&mut Self>, + ty: TypeFutureTableIndex, + ) -> Result { + self.guest_new(TransmitIndex::Future(ty)) + } + + /// Implements the `future.cancel-write` intrinsic. + pub(crate) fn future_cancel_write( + self: Pin<&mut Self>, + ty: TypeFutureTableIndex, + async_: bool, + writer: u32, + ) -> Result { + self.guest_cancel_write(TransmitIndex::Future(ty), writer, async_) + .map(|result| result.encode()) + } + + /// Implements the `future.cancel-read` intrinsic. + pub(crate) fn future_cancel_read( + self: Pin<&mut Self>, + ty: TypeFutureTableIndex, + async_: bool, + reader: u32, + ) -> Result { + self.guest_cancel_read(TransmitIndex::Future(ty), reader, async_) + .map(|result| result.encode()) + } + + /// Implements the `stream.new` intrinsic. + pub(crate) fn stream_new( + self: Pin<&mut Self>, + ty: TypeStreamTableIndex, + ) -> Result { + self.guest_new(TransmitIndex::Stream(ty)) + } + + /// Implements the `stream.cancel-write` intrinsic. + pub(crate) fn stream_cancel_write( + self: Pin<&mut Self>, + ty: TypeStreamTableIndex, + async_: bool, + writer: u32, + ) -> Result { + self.guest_cancel_write(TransmitIndex::Stream(ty), writer, async_) + .map(|result| result.encode()) + } + + /// Implements the `stream.cancel-read` intrinsic. + pub(crate) fn stream_cancel_read( + self: Pin<&mut Self>, + ty: TypeStreamTableIndex, + async_: bool, + reader: u32, + ) -> Result { + self.guest_cancel_read(TransmitIndex::Stream(ty), reader, async_) + .map(|result| result.encode()) + } + + /// Transfer ownership of the specified future read end from one guest to + /// another. + pub(crate) fn future_transfer( + self: Pin<&mut Self>, + src_idx: u32, + src: TypeFutureTableIndex, + dst: TypeFutureTableIndex, + ) -> Result { + self.guest_transfer( + src_idx, + TransmitIndex::Future(src), + TransmitIndex::Future(dst), + ) + } + + /// Transfer ownership of the specified stream read end from one guest to + /// another. + pub(crate) fn stream_transfer( + self: Pin<&mut Self>, + src_idx: u32, + src: TypeStreamTableIndex, + dst: TypeStreamTableIndex, + ) -> Result { + self.guest_transfer( + src_idx, + TransmitIndex::Stream(src), + TransmitIndex::Stream(dst), + ) + } + + /// Copy the specified error context from one component to another. + pub(crate) fn error_context_transfer( + mut self: Pin<&mut Self>, + src_idx: u32, + src: TypeComponentLocalErrorContextTableIndex, + dst: TypeComponentLocalErrorContextTableIndex, + ) -> Result { + let rep = self + .as_mut() + .table_for_error_context(src) + .error_context_rep(src_idx)?; + let dst_idx = self + .as_mut() + .table_for_error_context(dst) + .error_context_insert(rep)?; + + // Update the global (cross-subcomponent) count for error contexts + // as the new component has essentially created a new reference that will + // be dropped/handled independently + let global_ref_count = self + .concurrent_state_mut() + .global_error_context_ref_counts + .get_mut(&TypeComponentGlobalErrorContextTableIndex::from_u32(rep)) + .context("global ref count present for existing (sub)component error context")?; + global_ref_count.0 += 1; + + Ok(dst_idx) } } @@ -3196,14 +3360,6 @@ impl ConcurrentState { waitable.set_event(self, Some(event)) } - fn get_mut_by_index( - &mut self, - ty: TableIndex, - index: u32, - ) -> Result<(u32, &mut StreamFutureState)> { - get_mut_by_index_from(self.state_table(ty), ty, index) - } - /// Allocate a new future or stream, including the `TransmitState` and the /// `TransmitHandle`s corresponding to the read and write ends. fn new_transmit(&mut self) -> Result<(TableId, TableId)> { @@ -3236,30 +3392,6 @@ impl ConcurrentState { Ok(()) } - fn state_table(&mut self, ty: TableIndex) -> &mut StateTable { - let runtime_instance = match ty { - TableIndex::Stream(ty) => self.component.types()[ty].instance, - TableIndex::Future(ty) => self.component.types()[ty].instance, - }; - &mut self.waitable_tables[runtime_instance] - } - - /// Allocate a new future or stream and grant ownership of both the read and - /// write ends to the (sub-)component instance to which the specified - /// `TableIndex` belongs. - fn guest_new(&mut self, ty: TableIndex) -> Result { - let (write, read) = self.new_transmit()?; - let read = self.state_table(ty).insert( - read.rep(), - waitable_state(ty, StreamFutureState::Read { done: false }), - )?; - let write = self.state_table(ty).insert( - write.rep(), - waitable_state(ty, StreamFutureState::Write { done: false }), - )?; - Ok(ResourcePair { write, read }) - } - /// Cancel a pending stream or future write from the host. /// /// # Arguments @@ -3349,301 +3481,6 @@ impl ConcurrentState { Ok(code) } - - /// Cancel a pending write for the specified stream or future from the guest. - fn guest_cancel_write( - &mut self, - ty: TableIndex, - writer: u32, - _async_: bool, - ) -> Result { - let (rep, WaitableState::Stream(_, state) | WaitableState::Future(_, state)) = - self.state_table(ty).get_mut_by_index(writer)? - else { - bail!("invalid stream or future handle"); - }; - let id = TableId::::new(rep); - log::trace!("guest cancel write {id:?} (handle {writer})"); - match state { - StreamFutureState::Write { .. } => { - bail!("stream or future write cancelled when no write is pending") - } - StreamFutureState::Read { .. } => { - bail!("passed read end to `{{stream|future}}.cancel-write`") - } - StreamFutureState::Busy => { - *state = StreamFutureState::Write { done: false }; - } - } - let rep = self.get(id)?.state.rep(); - self.host_cancel_write(rep) - } - - /// Cancel a pending read for the specified stream or future from the guest. - fn guest_cancel_read( - &mut self, - ty: TableIndex, - reader: u32, - _async_: bool, - ) -> Result { - let (rep, WaitableState::Stream(_, state) | WaitableState::Future(_, state)) = - self.state_table(ty).get_mut_by_index(reader)? - else { - bail!("invalid stream or future handle"); - }; - let id = TableId::::new(rep); - log::trace!("guest cancel read {id:?} (handle {reader})"); - match state { - StreamFutureState::Read { .. } => { - bail!("stream or future read cancelled when no read is pending") - } - StreamFutureState::Write { .. } => { - bail!("passed write end to `{{stream|future}}.cancel-read`") - } - StreamFutureState::Busy => { - *state = StreamFutureState::Read { done: false }; - } - } - let rep = self.get(id)?.state.rep(); - self.host_cancel_read(rep) - } - - /// Drop the specified error context. - pub(crate) fn error_context_drop( - &mut self, - ty: TypeComponentLocalErrorContextTableIndex, - error_context: u32, - ) -> Result<()> { - let local_state_table = self - .error_context_tables - .get_mut(ty) - .context("error context table index present in (sub)component table during drop")?; - - // Reduce the local (sub)component ref count, removing tracking if necessary - let (rep, local_ref_removed) = { - let (rep, LocalErrorContextRefCount(local_ref_count)) = - local_state_table.get_mut_by_index(error_context)?; - assert!(*local_ref_count > 0); - *local_ref_count -= 1; - let mut local_ref_removed = false; - if *local_ref_count == 0 { - local_ref_removed = true; - local_state_table - .remove_by_index(error_context) - .context("removing error context from component-local tracking")?; - } - (rep, local_ref_removed) - }; - - let global_ref_count_idx = TypeComponentGlobalErrorContextTableIndex::from_u32(rep); - - let GlobalErrorContextRefCount(global_ref_count) = self - .global_error_context_ref_counts - .get_mut(&global_ref_count_idx) - .expect("retrieve concurrent state for error context during drop"); - - // Reduce the component-global ref count, removing tracking if necessary - assert!(*global_ref_count >= 1); - *global_ref_count -= 1; - if *global_ref_count == 0 { - assert!(local_ref_removed); - - self.global_error_context_ref_counts - .remove(&global_ref_count_idx); - - self.delete(TableId::::new(rep)) - .context("deleting component-global error context data")?; - } - - Ok(()) - } - - /// Transfer ownership of the specified stream or future read end from one - /// guest to another. - fn guest_transfer( - &mut self, - src_idx: u32, - src: U, - src_instance: RuntimeComponentInstanceIndex, - dst: U, - dst_instance: RuntimeComponentInstanceIndex, - match_state: impl Fn(&mut WaitableState) -> Result<(U, &mut StreamFutureState)>, - make_state: impl Fn(U, StreamFutureState) -> WaitableState, - ) -> Result { - let src_table = &mut self.waitable_tables[src_instance]; - let (_rep, src_state) = src_table.get_mut_by_index(src_idx)?; - let (src_ty, _) = match_state(src_state)?; - if src_ty != src { - bail!("invalid future handle"); - } - - let src_table = &mut self.waitable_tables[src_instance]; - let (rep, src_state) = src_table.get_mut_by_index(src_idx)?; - let (_, src_state) = match_state(src_state)?; - - match src_state { - StreamFutureState::Read { done: true } => { - bail!("cannot lift stream after being notified that the writable end dropped") - } - StreamFutureState::Read { done: false } => { - src_table.remove_by_index(src_idx)?; - - let dst_table = &mut self.waitable_tables[dst_instance]; - dst_table.insert( - rep, - make_state(dst, StreamFutureState::Read { done: false }), - ) - } - StreamFutureState::Write { .. } => { - bail!("cannot transfer write end of stream or future") - } - StreamFutureState::Busy => bail!("cannot transfer busy stream or future"), - } - } - - /// Implements the `future.new` intrinsic. - pub(crate) fn future_new(&mut self, ty: TypeFutureTableIndex) -> Result { - self.guest_new(TableIndex::Future(ty)) - } - - /// Implements the `future.cancel-write` intrinsic. - pub(crate) fn future_cancel_write( - &mut self, - ty: TypeFutureTableIndex, - async_: bool, - writer: u32, - ) -> Result { - self.guest_cancel_write(TableIndex::Future(ty), writer, async_) - .map(|result| result.encode()) - } - - /// Implements the `future.cancel-read` intrinsic. - pub(crate) fn future_cancel_read( - &mut self, - ty: TypeFutureTableIndex, - async_: bool, - reader: u32, - ) -> Result { - self.guest_cancel_read(TableIndex::Future(ty), reader, async_) - .map(|result| result.encode()) - } - - /// Implements the `stream.new` intrinsic. - pub(crate) fn stream_new(&mut self, ty: TypeStreamTableIndex) -> Result { - self.guest_new(TableIndex::Stream(ty)) - } - - /// Implements the `stream.cancel-write` intrinsic. - pub(crate) fn stream_cancel_write( - &mut self, - ty: TypeStreamTableIndex, - async_: bool, - writer: u32, - ) -> Result { - self.guest_cancel_write(TableIndex::Stream(ty), writer, async_) - .map(|result| result.encode()) - } - - /// Implements the `stream.cancel-read` intrinsic. - pub(crate) fn stream_cancel_read( - &mut self, - ty: TypeStreamTableIndex, - async_: bool, - reader: u32, - ) -> Result { - self.guest_cancel_read(TableIndex::Stream(ty), reader, async_) - .map(|result| result.encode()) - } - - /// Transfer ownership of the specified future read end from one guest to - /// another. - pub(crate) fn future_transfer( - &mut self, - src_idx: u32, - src: TypeFutureTableIndex, - dst: TypeFutureTableIndex, - ) -> Result { - self.guest_transfer( - src_idx, - src, - self.component.types()[src].instance, - dst, - self.component.types()[dst].instance, - |state| { - if let WaitableState::Future(ty, state) = state { - Ok((*ty, state)) - } else { - Err(anyhow!("invalid future handle")) - } - }, - WaitableState::Future, - ) - } - - /// Transfer ownership of the specified stream read end from one guest to - /// another. - pub(crate) fn stream_transfer( - &mut self, - src_idx: u32, - src: TypeStreamTableIndex, - dst: TypeStreamTableIndex, - ) -> Result { - self.guest_transfer( - src_idx, - src, - self.component.types()[src].instance, - dst, - self.component.types()[dst].instance, - |state| { - if let WaitableState::Stream(ty, state) = state { - Ok((*ty, state)) - } else { - Err(anyhow!("invalid stream handle")) - } - }, - WaitableState::Stream, - ) - } - - /// Copy the specified error context from one component to another. - pub(crate) fn error_context_transfer( - &mut self, - src_idx: u32, - src: TypeComponentLocalErrorContextTableIndex, - dst: TypeComponentLocalErrorContextTableIndex, - ) -> Result { - let (rep, _) = { - let rep = self - .error_context_tables - .get_mut(src) - .context("error context table index present in (sub)component lookup")? - .get_mut_by_index(src_idx)?; - rep - }; - let dst = self - .error_context_tables - .get_mut(dst) - .context("error context table index present in (sub)component lookup")?; - - // Update the component local for the destination - let updated_count = if let Some((dst_idx, count)) = dst.get_mut_by_rep(rep) { - (*count).0 += 1; - dst_idx - } else { - dst.insert(rep, LocalErrorContextRefCount(1))? - }; - - // Update the global (cross-subcomponent) count for error contexts - // as the new component has essentially created a new reference that will - // be dropped/handled independently - let global_ref_count = self - .global_error_context_ref_counts - .get_mut(&TypeComponentGlobalErrorContextTableIndex::from_u32(rep)) - .context("global ref count present for existing (sub)component error context")?; - global_ref_count.0 += 1; - - Ok(updated_count) - } } pub(crate) struct ResourcePair { diff --git a/crates/wasmtime/src/runtime/component/concurrent/states.rs b/crates/wasmtime/src/runtime/component/concurrent/states.rs deleted file mode 100644 index 51d821c77137..000000000000 --- a/crates/wasmtime/src/runtime/component/concurrent/states.rs +++ /dev/null @@ -1,143 +0,0 @@ -// FIXME: This file was copied from vm/component/resources.rs and should be -// deduplicated as part of -// https://github.com/bytecodealliance/wasmtime/issues/11189. - -use alloc::vec::Vec; -use anyhow::{Result, bail}; -use core::mem; - -/// The maximum handle value is specified in -/// -/// currently and keeps the upper bit free for use in the component. -const MAX_HANDLE: u32 = 1 << 30; - -enum Slot { - Free { next: u32 }, - Occupied { rep: u32, state: T }, -} - -pub struct StateTable { - next: u32, - slots: Vec>, - // TODO: This is a sparse table (where zero means "no entry"); it might make - // more sense to use a `HashMap` here, but we'd need one that's - // no_std-compatible. A `BTreeMap` might also be appropriate if we restrict - // ourselves to `alloc::collections`. - reps_to_indexes: Vec, -} - -impl Default for StateTable { - fn default() -> Self { - Self { - next: 0, - slots: Vec::new(), - reps_to_indexes: Vec::new(), - } - } -} - -impl StateTable { - /// Returns whether or not this table is empty. - pub fn is_empty(&self) -> bool { - self.slots - .iter() - .all(|slot| matches!(slot, Slot::Free { .. })) - } - - pub fn insert(&mut self, rep: u32, state: T) -> Result { - if matches!(self - .reps_to_indexes - .get(usize::try_from(rep).unwrap()), Some(idx) if *idx != 0) - { - bail!("rep {rep} already exists in this table"); - } - - let next = self.next as usize; - if next == self.slots.len() { - self.slots.push(Slot::Free { - next: self.next.checked_add(1).unwrap(), - }); - } - let ret = self.next; - self.next = match mem::replace(&mut self.slots[next], Slot::Occupied { rep, state }) { - Slot::Free { next } => next, - _ => unreachable!(), - }; - // The component model reserves index 0 as never allocatable so add one - // to the table index to start the numbering at 1 instead. Also note - // that the component model places an upper-limit per-table on the - // maximum allowed index. - let ret = ret + 1; - if ret >= MAX_HANDLE { - bail!("cannot allocate another handle: index overflow"); - } - - let rep = usize::try_from(rep).unwrap(); - if self.reps_to_indexes.len() <= rep { - self.reps_to_indexes.resize(rep.checked_add(1).unwrap(), 0); - } - - self.reps_to_indexes[rep] = ret; - - Ok(ret) - } - - fn handle_index_to_table_index(&self, idx: u32) -> Option { - // NB: `idx` is decremented by one to account for the `+1` above during - // allocation. - let idx = idx.checked_sub(1)?; - usize::try_from(idx).ok() - } - - fn get_mut(&mut self, idx: u32) -> Result<&mut Slot> { - let slot = self - .handle_index_to_table_index(idx) - .and_then(|i| self.slots.get_mut(i)); - match slot { - None | Some(Slot::Free { .. }) => bail!("unknown handle index {idx}"), - Some(slot) => Ok(slot), - } - } - - pub fn has_handle(&self, idx: u32) -> bool { - matches!( - self.handle_index_to_table_index(idx) - .and_then(|i| self.slots.get(i)), - Some(Slot::Occupied { .. }) - ) - } - - pub fn get_mut_by_index(&mut self, idx: u32) -> Result<(u32, &mut T)> { - let slot = self - .handle_index_to_table_index(idx) - .and_then(|i| self.slots.get_mut(i)); - match slot { - None | Some(Slot::Free { .. }) => bail!("unknown handle index {idx}"), - Some(Slot::Occupied { rep, state }) => Ok((*rep, state)), - } - } - - pub fn get_mut_by_rep(&mut self, rep: u32) -> Option<(u32, &mut T)> { - let index = *self.reps_to_indexes.get(usize::try_from(rep).unwrap())?; - if index > 0 { - let (_, state) = self.get_mut_by_index(index).unwrap(); - Some((index, state)) - } else { - None - } - } - - pub fn remove_by_index(&mut self, idx: u32) -> Result<(u32, T)> { - let to_fill = Slot::Free { next: self.next }; - let Slot::Occupied { rep, state } = mem::replace(self.get_mut(idx)?, to_fill) else { - unreachable!() - }; - self.next = idx - 1; - { - let rep = usize::try_from(rep).unwrap(); - assert_eq!(idx, self.reps_to_indexes[rep]); - self.reps_to_indexes[rep] = 0; - } - Ok((rep, state)) - } -} diff --git a/crates/wasmtime/src/runtime/component/concurrent/table.rs b/crates/wasmtime/src/runtime/component/concurrent/table.rs index 382eb07e6add..7829c05786c5 100644 --- a/crates/wasmtime/src/runtime/component/concurrent/table.rs +++ b/crates/wasmtime/src/runtime/component/concurrent/table.rs @@ -172,11 +172,16 @@ impl TableEntry { impl Table { /// Create an empty table pub fn new() -> Self { - Self { + let mut me = Self { entries: Vec::new(), free_head: None, debug: false, - } + }; + + // Reserve 0 as an invalid entry. + me.push(Tombstone).unwrap(); + + me } /// Returns whether or not this table is empty. diff --git a/crates/wasmtime/src/runtime/component/func/options.rs b/crates/wasmtime/src/runtime/component/func/options.rs index eaa6b27cd733..b17a71d1b9b2 100644 --- a/crates/wasmtime/src/runtime/component/func/options.rs +++ b/crates/wasmtime/src/runtime/component/func/options.rs @@ -3,7 +3,7 @@ use crate::component::resources::{HostResourceData, HostResourceIndex, HostResou use crate::component::{Instance, ResourceType}; use crate::prelude::*; use crate::runtime::vm::component::{ - CallContexts, ComponentInstance, InstanceFlags, ResourceTable, ResourceTables, + CallContexts, ComponentInstance, HandleTable, InstanceFlags, ResourceTables, }; use crate::runtime::vm::{VMFuncRef, VMMemoryDefinition}; use crate::store::{StoreId, StoreOpaque}; @@ -473,7 +473,7 @@ pub struct LiftContext<'a> { instance: Pin<&'a mut ComponentInstance>, instance_handle: Instance, - host_table: &'a mut ResourceTable, + host_table: &'a mut HandleTable, host_resource_data: &'a mut HostResourceData, calls: &'a mut CallContexts, @@ -490,7 +490,7 @@ impl<'a> LiftContext<'a> { ) -> LiftContext<'a> { // From `&mut StoreOpaque` provided the goal here is to project out // three different disjoint fields owned by the store: memory, - // `CallContexts`, and `ResourceTable`. There's no native API for that + // `CallContexts`, and `HandleTable`. There's no native API for that // so it's hacked around a bit. This unsafe pointer cast could be fixed // with more methods in more places, but it doesn't seem worth doing it // at this time. diff --git a/crates/wasmtime/src/runtime/store.rs b/crates/wasmtime/src/runtime/store.rs index a2a4289cd4ad..cea1f82d1cef 100644 --- a/crates/wasmtime/src/runtime/store.rs +++ b/crates/wasmtime/src/runtime/store.rs @@ -395,7 +395,7 @@ pub struct StoreOpaque { /// and calls. These also interact with the `ResourceAny` type and its /// internal representation. #[cfg(feature = "component-model")] - component_host_table: vm::component::ResourceTable, + component_host_table: vm::component::HandleTable, #[cfg(feature = "component-model")] component_calls: vm::component::CallContexts, #[cfg(feature = "component-model")] @@ -2032,7 +2032,7 @@ at https://bytecodealliance.org/security. &mut self, ) -> ( &mut vm::component::CallContexts, - &mut vm::component::ResourceTable, + &mut vm::component::HandleTable, &mut crate::component::HostResourceData, ) { ( @@ -2059,7 +2059,7 @@ at https://bytecodealliance.org/security. instance: crate::component::Instance, ) -> ( &mut vm::component::CallContexts, - &mut vm::component::ResourceTable, + &mut vm::component::HandleTable, &mut crate::component::HostResourceData, Pin<&mut vm::component::ComponentInstance>, ) { diff --git a/crates/wasmtime/src/runtime/vm/component.rs b/crates/wasmtime/src/runtime/vm/component.rs index b0d2e163aa8c..f9c5de3e9906 100644 --- a/crates/wasmtime/src/runtime/vm/component.rs +++ b/crates/wasmtime/src/runtime/vm/component.rs @@ -31,14 +31,16 @@ use wasmtime_environ::{HostPtr, PrimaryMap, VMSharedTypeIndex}; )] const INVALID_PTR: usize = 0xdead_dead_beef_beef_u64 as usize; +mod handle_table; mod libcalls; mod resources; +pub use self::handle_table::{HandleTable, RemovedResource}; +#[cfg(feature = "component-model-async")] +pub use self::handle_table::{TransmitLocalState, Waitable}; #[cfg(feature = "component-model-async")] pub use self::resources::CallContext; -pub use self::resources::{ - CallContexts, ResourceTable, ResourceTables, TypedResource, TypedResourceIndex, -}; +pub use self::resources::{CallContexts, ResourceTables, TypedResource, TypedResourceIndex}; #[cfg(feature = "component-model-async")] use crate::component::concurrent; @@ -78,11 +80,13 @@ pub struct ComponentInstance { // of the component can be thrown away (theoretically). component: Component, - /// State of resources for this component. + /// State of handles (e.g. resources, waitables, etc.) for this component. /// - /// This is paired with other information to create a `ResourceTables` which - /// is how this field is manipulated. - instance_resource_tables: PrimaryMap, + /// For resource handles, this is paired with other information to create a + /// `ResourceTables` and manipulated through that. For other handles, this + /// is used directly to translate guest handles to host representations and + /// vice-versa. + instance_handle_tables: PrimaryMap, /// State related to async for this component, e.g. futures, streams, tasks, /// etc. @@ -273,16 +277,16 @@ impl ComponentInstance { ) -> OwnedComponentInstance { let offsets = VMComponentOffsets::new(HostPtr, component.env_component()); let num_instances = component.env_component().num_runtime_component_instances; - let mut instance_resource_tables = + let mut instance_handle_tables = PrimaryMap::with_capacity(num_instances.try_into().unwrap()); for _ in 0..num_instances { - instance_resource_tables.push(ResourceTable::default()); + instance_handle_tables.push(HandleTable::default()); } let mut ret = OwnedInstance::new(ComponentInstance { id, offsets, - instance_resource_tables, + instance_handle_tables, instances: PrimaryMap::with_capacity( component .env_component() @@ -734,14 +738,14 @@ impl ComponentInstance { pub fn guest_tables( self: Pin<&mut Self>, ) -> ( - &mut PrimaryMap, + &mut PrimaryMap, &ComponentTypes, ) { // safety: we've chosen the `pin` guarantee of `self` to not apply to // the map returned. unsafe { let me = self.get_unchecked_mut(); - (&mut me.instance_resource_tables, me.component.types()) + (&mut me.instance_handle_tables, me.component.types()) } } diff --git a/crates/wasmtime/src/runtime/vm/component/handle_table.rs b/crates/wasmtime/src/runtime/vm/component/handle_table.rs new file mode 100644 index 000000000000..9e777a37cc55 --- /dev/null +++ b/crates/wasmtime/src/runtime/vm/component/handle_table.rs @@ -0,0 +1,665 @@ +use super::{TypedResource, TypedResourceIndex}; +use alloc::vec::Vec; +use anyhow::{Result, bail}; +use core::mem; +use wasmtime_environ::component::{TypeFutureTableIndex, TypeStreamTableIndex}; + +/// The maximum handle value is specified in +/// +/// currently and keeps the upper bits free for use in the component and ABI. +const MAX_HANDLE: u32 = 1 << 28; + +/// Represents the state of a stream or future handle from the perspective of a +/// given component instance. +#[derive(Debug, Eq, PartialEq)] +pub enum TransmitLocalState { + /// The write end of the stream or future. + Write { + /// Whether the component instance has been notified that the stream or + /// future is "done" (i.e. the other end has dropped, or, in the case of + /// a future, a value has been transmitted). + done: bool, + }, + /// The read end of the stream or future. + Read { + /// Whether the component instance has been notified that the stream or + /// future is "done" (i.e. the other end has dropped, or, in the case of + /// a future, a value has been transmitted). + done: bool, + }, + /// A read or write is in progress. + Busy, +} + +/// Return value from [`HandleTable::remove_resource`]. +pub enum RemovedResource { + /// An `own` resource was removed with the specified `rep` + Own { rep: u32 }, + /// A `borrow` resource was removed originally created within `scope`. + Borrow { scope: usize }, +} + +/// Different kinds of waitables returned by [`HandleTable::waitable_rep`]. +pub enum Waitable { + Subtask { is_host: bool }, + Future, + Stream, +} + +enum Slot { + Free { + next: u32, + }, + + /// Represents an owned resource handle with the listed representation. + /// + /// The `lend_count` tracks how many times this has been lent out as a + /// `borrow` and if nonzero this can't be removed. + ResourceOwn { + resource: TypedResource, + lend_count: u32, + }, + + /// Represents a borrowed resource handle connected to the `scope` + /// provided. + /// + /// The `rep` is listed and dropping this borrow will decrement the borrow + /// count of the `scope`. + ResourceBorrow { + resource: TypedResource, + scope: usize, + }, + + /// Represents a host task handle. + HostTask { + rep: u32, + }, + + /// Represents a guest task handle. + GuestTask { + rep: u32, + }, + + /// Represents a stream handle. + Stream { + ty: TypeStreamTableIndex, + rep: u32, + state: TransmitLocalState, + }, + + /// Represents a future handle. + Future { + ty: TypeFutureTableIndex, + rep: u32, + state: TransmitLocalState, + }, + + /// Represents a waitable-set handle. + WaitableSet { + rep: u32, + }, + + /// Represents an error-context handle. + ErrorContext { + /// Number of references held by the (sub-)component. + /// + /// This does not include the number of references which might be held + /// by other (sub-)components. + local_ref_count: u32, + + rep: u32, + }, +} + +impl Slot { + fn rep_for_reps_to_indexes(&self) -> Option { + match self { + Self::HostTask { rep } + | Self::GuestTask { rep } + | Self::Stream { rep, .. } + | Self::Future { rep, .. } + | Self::ErrorContext { rep, .. } => Some(*rep), + _ => None, + } + } +} + +pub struct HandleTable { + next: u32, + slots: Vec, + // TODO: This is a sparse table (where zero means "no entry"); it might make + // more sense to use a `HashMap` here, but we'd need one that's + // no_std-compatible. A `BTreeMap` might also be appropriate if we restrict + // ourselves to `alloc::collections`. + reps_to_indexes: Vec, +} + +impl Default for HandleTable { + fn default() -> Self { + Self { + next: 0, + slots: Vec::new(), + reps_to_indexes: Vec::new(), + } + } +} + +impl HandleTable { + /// Returns whether or not this table is empty. + pub fn is_empty(&self) -> bool { + self.slots + .iter() + .all(|slot| matches!(slot, Slot::Free { .. })) + } + + fn insert(&mut self, slot: Slot) -> Result { + let rep_usize = slot + .rep_for_reps_to_indexes() + .map(|r| usize::try_from(r).unwrap()); + + if let Some(rep) = rep_usize { + if matches!(self.reps_to_indexes.get(rep), Some(idx) if *idx != 0) { + bail!("rep {rep} already exists in this table"); + } + } + + let next = self.next as usize; + if next == self.slots.len() { + self.slots.push(Slot::Free { + next: self.next.checked_add(1).unwrap(), + }); + } + let ret = self.next; + self.next = match mem::replace(&mut self.slots[next], slot) { + Slot::Free { next } => next, + _ => unreachable!(), + }; + // The component model reserves index 0 as never allocatable so add one + // to the table index to start the numbering at 1 instead. Also note + // that the component model places an upper-limit per-table on the + // maximum allowed index. + let ret = ret + 1; + if ret >= MAX_HANDLE { + bail!("cannot allocate another handle: index overflow"); + } + + if let Some(rep) = rep_usize { + if self.reps_to_indexes.len() <= rep { + self.reps_to_indexes.resize(rep.checked_add(1).unwrap(), 0); + } + self.reps_to_indexes[rep] = ret; + } + + Ok(ret) + } + + fn remove(&mut self, idx: u32) -> Result<()> { + let to_fill = Slot::Free { next: self.next }; + let slot = self.get_mut(idx)?; + let rep = slot.rep_for_reps_to_indexes(); + *slot = to_fill; + self.next = idx - 1; + + if let Some(rep) = rep { + let rep = usize::try_from(rep).unwrap(); + assert_eq!(idx, self.reps_to_indexes[rep]); + self.reps_to_indexes[rep] = 0; + } + Ok(()) + } + + fn handle_index_to_table_index(&self, idx: u32) -> Option { + // NB: `idx` is decremented by one to account for the `+1` above during + // allocation. + let idx = idx.checked_sub(1)?; + usize::try_from(idx).ok() + } + + fn get_mut(&mut self, idx: u32) -> Result<&mut Slot> { + let slot = self + .handle_index_to_table_index(idx) + .and_then(|i| self.slots.get_mut(i)); + match slot { + None | Some(Slot::Free { .. }) => bail!("unknown handle index {idx}"), + Some(slot) => Ok(slot), + } + } + + fn get_mut_by_rep(&mut self, rep: u32) -> Option<(u32, &mut Slot)> { + let index = *self.reps_to_indexes.get(usize::try_from(rep).unwrap())?; + if index > 0 { + let slot = self.get_mut(index).unwrap(); + Some((index, slot)) + } else { + None + } + } + + /// Inserts a new `own` resource into this table whose type/rep are + /// specified by `resource`. + pub fn resource_own_insert(&mut self, resource: TypedResource) -> Result { + self.insert(Slot::ResourceOwn { + resource, + lend_count: 0, + }) + } + + /// Inserts a new `borrow` resource into this table whose type/rep are + /// specified by `resource`. The `scope` specified is used by + /// `CallContexts` to manage lending information. + pub fn resource_borrow_insert(&mut self, resource: TypedResource, scope: usize) -> Result { + self.insert(Slot::ResourceBorrow { resource, scope }) + } + + /// Returns the internal "rep" of the resource specified by `idx`. + /// + /// Returns an error if `idx` is out-of-bounds or doesn't point to a + /// resource of the appropriate type. + pub fn resource_rep(&mut self, idx: TypedResourceIndex) -> Result { + match self.get_mut(idx.raw_index())? { + Slot::ResourceOwn { resource, .. } | Slot::ResourceBorrow { resource, .. } => { + resource.rep(&idx) + } + _ => bail!("index is not a resource"), + } + } + + /// Accesses the "rep" of the resource pointed to by `idx` as part of a + /// lending operation. + /// + /// This will increase `lend_count` for owned resources and must be paired + /// with a `resource_undo_lend` below later on (managed by `CallContexts`). + /// + /// Upon success returns the "rep" plus whether the borrow came from an + /// `own` handle. + pub fn resource_lend(&mut self, idx: TypedResourceIndex) -> Result<(u32, bool)> { + match self.get_mut(idx.raw_index())? { + Slot::ResourceOwn { + resource, + lend_count, + } => { + let rep = resource.rep(&idx)?; + *lend_count = lend_count.checked_add(1).unwrap(); + Ok((rep, true)) + } + Slot::ResourceBorrow { resource, .. } => Ok((resource.rep(&idx)?, false)), + _ => bail!("index {} is not a resource", idx.raw_index()), + } + } + + /// For `own` resources that were borrowed in `resource_lend`, undoes the + /// lending operation. + pub fn resource_undo_lend(&mut self, idx: TypedResourceIndex) -> Result<()> { + match self.get_mut(idx.raw_index())? { + Slot::ResourceOwn { lend_count, .. } => { + *lend_count -= 1; + Ok(()) + } + _ => bail!("index {} is not an own resource", idx.raw_index()), + } + } + + /// Removes the resource specified by `idx` from the table. + /// + /// This can fail if `idx` doesn't point to a resource, points to a + /// borrowed resource, or points to a resource of the wrong type. + pub fn remove_resource(&mut self, idx: TypedResourceIndex) -> Result { + let ret = match self.get_mut(idx.raw_index())? { + Slot::ResourceOwn { + resource, + lend_count, + } => { + if *lend_count != 0 { + bail!("cannot remove owned resource while borrowed") + } + RemovedResource::Own { + rep: resource.rep(&idx)?, + } + } + Slot::ResourceBorrow { resource, scope } => { + // Ensure the drop is done with the right type + resource.rep(&idx)?; + RemovedResource::Borrow { scope: *scope } + } + _ => bail!("index {} is not a resource", idx.raw_index()), + }; + self.remove(idx.raw_index())?; + Ok(ret) + } + + /// Inserts a readable-end stream of type `ty` and with the specified `rep` + /// into this table. + /// + /// Returns the table-local index of the stream. + pub fn stream_insert_read(&mut self, ty: TypeStreamTableIndex, rep: u32) -> Result { + self.insert(Slot::Stream { + rep, + ty, + state: TransmitLocalState::Read { done: false }, + }) + } + + /// Inserts a writable-end stream of type `ty` and with the specified `rep` + /// into this table. + /// + /// Returns the table-local index of the stream. + pub fn stream_insert_write(&mut self, ty: TypeStreamTableIndex, rep: u32) -> Result { + self.insert(Slot::Stream { + rep, + ty, + state: TransmitLocalState::Write { done: false }, + }) + } + + /// Returns the `rep` and `state` associated with the stream pointed to by + /// `idx`. + /// + /// Returns an error if `idx` is out-of-bounds or doesn't point to a stream + /// of type `ty`. + pub fn stream_rep( + &mut self, + expected_ty: TypeStreamTableIndex, + idx: u32, + ) -> Result<(u32, &mut TransmitLocalState)> { + match self.get_mut(idx)? { + Slot::Stream { rep, ty, state } => { + if *ty != expected_ty { + bail!("handle is a stream of a different type"); + } + Ok((*rep, state)) + } + _ => bail!("handle is not a stream"), + } + } + + /// Removes the stream handle from `idx`, returning its `rep`. + /// + /// The stream must have the type `ty` and additionally be in a state + /// suitable for removal. + /// + /// Returns the `rep` for the stream along with whether the stream was + /// "done" or the writable end was witnessed as being done. + pub fn stream_remove_readable( + &mut self, + expected_ty: TypeStreamTableIndex, + idx: u32, + ) -> Result<(u32, bool)> { + let ret = match self.get_mut(idx)? { + Slot::Stream { rep, ty, state } => { + if *ty != expected_ty { + bail!("handle is a stream of a different type"); + } + let is_done = match state { + TransmitLocalState::Read { done } => *done, + TransmitLocalState::Write { .. } => { + bail!("handle is not a readable end of a stream") + } + TransmitLocalState::Busy => bail!("cannot remove busy stream"), + }; + (*rep, is_done) + } + _ => bail!("handle is not a stream"), + }; + self.remove(idx)?; + Ok(ret) + } + + /// Removes the writable stream handle from `idx`, returning its `rep`. + pub fn stream_remove_writable( + &mut self, + expected_ty: TypeStreamTableIndex, + idx: u32, + ) -> Result { + let ret = match self.get_mut(idx)? { + Slot::Stream { rep, ty, state } => { + if *ty != expected_ty { + bail!("handle is a stream of a different type"); + } + match state { + TransmitLocalState::Write { .. } => {} + TransmitLocalState::Read { .. } => { + bail!("passed read end to `stream.drop-writable`") + } + TransmitLocalState::Busy => bail!("cannot drop busy stream"), + } + *rep + } + _ => bail!("handle is not a stream"), + }; + self.remove(idx)?; + Ok(ret) + } + + /// Inserts a readable-end future of type `ty` and with the specified `rep` + /// into this table. + /// + /// Returns the table-local index of the future. + pub fn future_insert_read(&mut self, ty: TypeFutureTableIndex, rep: u32) -> Result { + self.insert(Slot::Future { + rep, + ty, + state: TransmitLocalState::Read { done: false }, + }) + } + + /// Inserts a writable-end future of type `ty` and with the specified `rep` + /// into this table. + /// + /// Returns the table-local index of the future. + pub fn future_insert_write(&mut self, ty: TypeFutureTableIndex, rep: u32) -> Result { + self.insert(Slot::Future { + rep, + ty, + state: TransmitLocalState::Write { done: false }, + }) + } + + /// Returns the `rep` and `state` associated with the future pointed to by + /// `idx`. + /// + /// Returns an error if `idx` is out-of-bounds or doesn't point to a future + /// of type `ty`. + pub fn future_rep( + &mut self, + expected_ty: TypeFutureTableIndex, + idx: u32, + ) -> Result<(u32, &mut TransmitLocalState)> { + match self.get_mut(idx)? { + Slot::Future { rep, ty, state } => { + if *ty != expected_ty { + bail!("handle is a future of a different type"); + } + Ok((*rep, state)) + } + _ => bail!("handle is not a future"), + } + } + + /// Removes the future handle from `idx`, returning its `rep`. + /// + /// The future must have the type `ty` and additionally be in a state + /// suitable for removal. + /// + /// Returns the `rep` for the future along with whether the future was + /// "done" or the writable end was witnessed as being done. + pub fn future_remove_readable( + &mut self, + expected_ty: TypeFutureTableIndex, + idx: u32, + ) -> Result<(u32, bool)> { + let ret = match self.get_mut(idx)? { + Slot::Future { rep, ty, state } => { + if *ty != expected_ty { + bail!("handle is a future of a different type"); + } + let is_done = match state { + TransmitLocalState::Read { done } => *done, + TransmitLocalState::Write { .. } => { + bail!("handle is not a readable end of a future") + } + TransmitLocalState::Busy => bail!("cannot remove busy future"), + }; + (*rep, is_done) + } + _ => bail!("handle is not a future"), + }; + self.remove(idx)?; + Ok(ret) + } + + /// Removes the writable future handle from `idx`, returning its `rep`. + pub fn future_remove_writable( + &mut self, + expected_ty: TypeFutureTableIndex, + idx: u32, + ) -> Result { + let ret = match self.get_mut(idx)? { + Slot::Future { rep, ty, state } => { + if *ty != expected_ty { + bail!("handle is a future of a different type"); + } + match state { + TransmitLocalState::Write { .. } => {} + TransmitLocalState::Read { .. } => { + bail!("passed read end to `future.drop-writable`") + } + TransmitLocalState::Busy => bail!("cannot drop busy future"), + } + *rep + } + _ => bail!("handle is not a future"), + }; + self.remove(idx)?; + Ok(ret) + } + + /// Inserts the error-context `rep` into this table, returning the index it + /// now resides at. + pub fn error_context_insert(&mut self, rep: u32) -> Result { + if let Some(( + dst_idx, + Slot::ErrorContext { + local_ref_count, .. + }, + )) = self.get_mut_by_rep(rep) + { + *local_ref_count += 1; + return Ok(dst_idx); + } + + self.insert(Slot::ErrorContext { + rep, + local_ref_count: 1, + }) + } + + /// Returns the `rep` of an error-context pointed to by `idx`. + pub fn error_context_rep(&mut self, idx: u32) -> Result { + match self.get_mut(idx)? { + Slot::ErrorContext { rep, .. } => Ok(*rep), + _ => bail!("handle is not an error-context"), + } + } + + /// Drops the error-context pointed to by `idx`. + /// + /// Returns the internal `rep` and whether the reference count has reached + /// 0 meaning it was fully removed. + pub fn error_context_drop(&mut self, idx: u32) -> Result<(u32, bool)> { + let rep = match self.get_mut(idx)? { + Slot::ErrorContext { + rep, + local_ref_count, + } => { + *local_ref_count -= 1; + if *local_ref_count > 0 { + return Ok((*rep, false)); + } + *rep + } + _ => bail!("handle is not an error-context"), + }; + self.remove(idx)?; + Ok((rep, true)) + } + + /// Inserts `rep` as a guest subtask into this table. + pub fn subtask_insert_guest(&mut self, rep: u32) -> Result { + self.insert(Slot::GuestTask { rep }) + } + + /// Inserts `rep` as a host subtask into this table. + pub fn subtask_insert_host(&mut self, rep: u32) -> Result { + self.insert(Slot::HostTask { rep }) + } + + /// Returns the `rep` of the subtask at `idx` as well as if it's a host + /// task or not. + pub fn subtask_rep(&mut self, idx: u32) -> Result<(u32, bool)> { + match self.get_mut(idx)? { + Slot::GuestTask { rep } => Ok((*rep, false)), + Slot::HostTask { rep } => Ok((*rep, true)), + _ => bail!("handle is not a subtask"), + } + } + + /// Removes the subtask set at `idx`, returning its `rep`. + pub fn subtask_remove(&mut self, idx: u32) -> Result<(u32, bool)> { + let ret = match self.get_mut(idx)? { + Slot::GuestTask { rep } => (*rep, false), + Slot::HostTask { rep } => (*rep, true), + _ => bail!("handle is not a subtask"), + }; + self.remove(idx)?; + Ok(ret) + } + + /// Inserts `rep` as a waitable set into this table. + pub fn waitable_set_insert(&mut self, rep: u32) -> Result { + self.insert(Slot::WaitableSet { rep }) + } + + /// Returns the `rep` of an waitable-set pointed to by `idx`. + pub fn waitable_set_rep(&mut self, idx: u32) -> Result { + match self.get_mut(idx)? { + Slot::WaitableSet { rep, .. } => Ok(*rep), + _ => bail!("handle is not an waitable-set"), + } + } + + /// Removes the waitable set at `idx`, returning its `rep`. + pub fn waitable_set_remove(&mut self, idx: u32) -> Result { + let ret = match self.get_mut(idx)? { + Slot::WaitableSet { rep } => *rep, + _ => bail!("handle is not a waitable-set"), + }; + self.remove(idx)?; + Ok(ret) + } + + /// Returns the `rep` for the waitable specified by `idx` along with what + /// kind of waitable it is. + pub fn waitable_rep(&mut self, idx: u32) -> Result<(u32, Waitable)> { + match self.get_mut(idx)? { + Slot::GuestTask { rep } => Ok((*rep, Waitable::Subtask { is_host: false })), + Slot::HostTask { rep } => Ok((*rep, Waitable::Subtask { is_host: true })), + Slot::Future { rep, .. } => Ok((*rep, Waitable::Future)), + Slot::Stream { rep, .. } => Ok((*rep, Waitable::Stream)), + _ => bail!("handle is not a waitable"), + } + } + + /// TODO: delete this ideally? + pub fn waitable_by_rep(&mut self, rep: u32) -> Result { + let Some((idx, slot)) = self.get_mut_by_rep(rep) else { + bail!("handle does not exist") + }; + match slot { + Slot::GuestTask { .. } => Ok(idx), + Slot::HostTask { .. } => Ok(idx), + Slot::Future { .. } => Ok(idx), + Slot::Stream { .. } => Ok(idx), + _ => bail!("handle is not a waitable"), + } + } +} diff --git a/crates/wasmtime/src/runtime/vm/component/libcalls.rs b/crates/wasmtime/src/runtime/vm/component/libcalls.rs index be65a8820acc..09b968f19754 100644 --- a/crates/wasmtime/src/runtime/vm/component/libcalls.rs +++ b/crates/wasmtime/src/runtime/vm/component/libcalls.rs @@ -702,7 +702,8 @@ fn waitable_set_new( caller_instance: u32, ) -> Result { instance - .concurrent_state_mut(store) + .id() + .get_mut(store) .waitable_set_new(RuntimeComponentInstanceIndex::from_u32(caller_instance)) } @@ -735,7 +736,7 @@ fn waitable_set_drop( caller_instance: u32, set: u32, ) -> Result<()> { - instance.concurrent_state_mut(store).waitable_set_drop( + instance.id().get_mut(store).waitable_set_drop( RuntimeComponentInstanceIndex::from_u32(caller_instance), set, ) @@ -749,7 +750,7 @@ fn waitable_join( waitable: u32, set: u32, ) -> Result<()> { - instance.concurrent_state_mut(store).waitable_join( + instance.id().get_mut(store).waitable_join( RuntimeComponentInstanceIndex::from_u32(caller_instance), waitable, set, @@ -768,7 +769,7 @@ fn subtask_drop( caller_instance: u32, task_id: u32, ) -> Result<()> { - instance.concurrent_state_mut(store).subtask_drop( + instance.id().get_mut(store).subtask_drop( RuntimeComponentInstanceIndex::from_u32(caller_instance), task_id, ) @@ -876,7 +877,7 @@ fn future_transfer( src_table: u32, dst_table: u32, ) -> Result { - instance.concurrent_state_mut(store).future_transfer( + instance.id().get_mut(store).future_transfer( src_idx, TypeFutureTableIndex::from_u32(src_table), TypeFutureTableIndex::from_u32(dst_table), @@ -891,7 +892,7 @@ fn stream_transfer( src_table: u32, dst_table: u32, ) -> Result { - instance.concurrent_state_mut(store).stream_transfer( + instance.id().get_mut(store).stream_transfer( src_idx, TypeStreamTableIndex::from_u32(src_table), TypeStreamTableIndex::from_u32(dst_table), @@ -909,7 +910,8 @@ fn error_context_transfer( let src_table = TypeComponentLocalErrorContextTableIndex::from_u32(src_table); let dst_table = TypeComponentLocalErrorContextTableIndex::from_u32(dst_table); instance - .concurrent_state_mut(store) + .id() + .get_mut(store) .error_context_transfer(src_idx, src_table, dst_table) } @@ -927,7 +929,8 @@ unsafe impl HostResultHasUnwindSentinel for ResourcePair { #[cfg(feature = "component-model-async")] fn future_new(store: &mut dyn VMStore, instance: Instance, ty: u32) -> Result { instance - .concurrent_state_mut(store) + .id() + .get_mut(store) .future_new(TypeFutureTableIndex::from_u32(ty)) } @@ -975,7 +978,7 @@ fn future_cancel_write( async_: u8, writer: u32, ) -> Result { - instance.concurrent_state_mut(store).future_cancel_write( + instance.id().get_mut(store).future_cancel_write( TypeFutureTableIndex::from_u32(ty), async_ != 0, writer, @@ -990,7 +993,7 @@ fn future_cancel_read( async_: u8, reader: u32, ) -> Result { - instance.concurrent_state_mut(store).future_cancel_read( + instance.id().get_mut(store).future_cancel_read( TypeFutureTableIndex::from_u32(ty), async_ != 0, reader, @@ -1024,7 +1027,8 @@ fn future_drop_readable( #[cfg(feature = "component-model-async")] fn stream_new(store: &mut dyn VMStore, instance: Instance, ty: u32) -> Result { instance - .concurrent_state_mut(store) + .id() + .get_mut(store) .stream_new(TypeStreamTableIndex::from_u32(ty)) } @@ -1076,7 +1080,7 @@ fn stream_cancel_write( async_: u8, writer: u32, ) -> Result { - instance.concurrent_state_mut(store).stream_cancel_write( + instance.id().get_mut(store).stream_cancel_write( TypeStreamTableIndex::from_u32(ty), async_ != 0, writer, @@ -1091,7 +1095,7 @@ fn stream_cancel_read( async_: u8, reader: u32, ) -> Result { - instance.concurrent_state_mut(store).stream_cancel_read( + instance.id().get_mut(store).stream_cancel_read( TypeStreamTableIndex::from_u32(ty), async_ != 0, reader, @@ -1213,7 +1217,7 @@ fn error_context_drop( ty: u32, err_ctx_handle: u32, ) -> Result<()> { - instance.concurrent_state_mut(store).error_context_drop( + instance.id().get_mut(store).error_context_drop( TypeComponentLocalErrorContextTableIndex::from_u32(ty), err_ctx_handle, ) diff --git a/crates/wasmtime/src/runtime/vm/component/resources.rs b/crates/wasmtime/src/runtime/vm/component/resources.rs index b5e1acfdfc18..6b67ce2448d5 100644 --- a/crates/wasmtime/src/runtime/vm/component/resources.rs +++ b/crates/wasmtime/src/runtime/vm/component/resources.rs @@ -11,9 +11,6 @@ //! * `ResourceTables` - the "here's everything" context which is required to //! perform canonical ABI operations. //! -//! * `ResourceTable` - an individual instance of a table of resources, -//! basically "just a slab" though. -//! //! * `CallContexts` - store-local information about active calls and borrows //! and runtime state tracking that to ensure that everything is handled //! correctly. @@ -23,20 +20,15 @@ //! about ABI details can be found in lifting/lowering throughout Wasmtime, //! namely in the `Resource` and `ResourceAny` types. +use super::{HandleTable, RemovedResource}; use crate::prelude::*; use core::error::Error; use core::fmt; -use core::mem; use wasmtime_environ::PrimaryMap; use wasmtime_environ::component::{ ComponentTypes, RuntimeComponentInstanceIndex, TypeResourceTableIndex, }; -/// The maximum handle value is specified in -/// -/// currently and keeps the upper bit free for use in the component. -const MAX_RESOURCE_HANDLE: u32 = 1 << 30; - /// Contextual state necessary to perform resource-related operations. /// /// This state a bit odd since it has a few optional bits, but the idea is that @@ -61,7 +53,7 @@ pub struct ResourceTables<'a> { /// `ResourceAny::resource_drop` which won't consult this table as it's /// only operating over the host table. pub guest: Option<( - &'a mut PrimaryMap, + &'a mut PrimaryMap, &'a ComponentTypes, )>, @@ -72,23 +64,13 @@ pub struct ResourceTables<'a> { /// as-if they're in-component resources. The major distinction though is /// that this is a heterogeneous table instead of only containing a single /// type. - pub host_table: Option<&'a mut ResourceTable>, + pub host_table: Option<&'a mut HandleTable>, /// Scope information about calls actively in use to track information such /// as borrow counts. pub calls: &'a mut CallContexts, } -/// An individual slab of resources used for a single table within a component. -/// Not much fancier than a general slab data structure. -#[derive(Default)] -pub struct ResourceTable { - /// Next slot to allocate, or `self.slots.len()` if they're all full. - next: u32, - /// Runtime state of all slots. - slots: Vec, -} - /// Typed representation of a "rep" for a resource. /// /// All resources in the component model are stored in a single heterogeneous @@ -120,7 +102,7 @@ pub enum TypedResource { } impl TypedResource { - fn rep(&self, access_ty: &TypedResourceIndex) -> Result { + pub(super) fn rep(&self, access_ty: &TypedResourceIndex) -> Result { match (self, access_ty) { (Self::Host(rep), TypedResourceIndex::Host(_)) => Ok(*rep), (Self::Host(_), expected) => bail!(ResourceTypeMismatch { @@ -168,7 +150,7 @@ pub enum TypedResourceIndex { } impl TypedResourceIndex { - fn raw_index(&self) -> u32 { + pub(super) fn raw_index(&self) -> u32 { match self { Self::Host(index) | Self::Component { index, .. } => *index, } @@ -182,29 +164,6 @@ impl TypedResourceIndex { } } -enum Slot { - /// This slot is free and points to the next free slot, forming a linked - /// list of free slots. - Free { next: u32 }, - - /// This slot contains an owned resource with the listed representation. - /// - /// The `lend_count` tracks how many times this has been lent out as a - /// `borrow` and if nonzero this can't be removed. - Own { - resource: TypedResource, - lend_count: u32, - }, - - /// This slot contains a `borrow` resource that's connected to the `scope` - /// provided. The `rep` is listed and dropping this borrow will decrement - /// the borrow count of the `scope`. - Borrow { - resource: TypedResource, - scope: usize, - }, -} - /// State related to borrows and calls within a component. /// /// This is created once per `Store` and updated and modified throughout the @@ -233,7 +192,7 @@ pub struct CallContext { } impl ResourceTables<'_> { - fn table_for_resource(&mut self, resource: &TypedResource) -> &mut ResourceTable { + fn table_for_resource(&mut self, resource: &TypedResource) -> &mut HandleTable { match resource { TypedResource::Host(_) => self.host_table.as_mut().unwrap(), TypedResource::Component { ty, .. } => { @@ -243,7 +202,7 @@ impl ResourceTables<'_> { } } - fn table_for_index(&mut self, index: &TypedResourceIndex) -> &mut ResourceTable { + fn table_for_index(&mut self, index: &TypedResourceIndex) -> &mut HandleTable { match index { TypedResourceIndex::Host(_) => self.host_table.as_mut().unwrap(), TypedResourceIndex::Component { ty, .. } => { @@ -257,17 +216,15 @@ impl ResourceTables<'_> { /// /// Note that this is the same as `resource_lower_own`. pub fn resource_new(&mut self, resource: TypedResource) -> Result { - self.table_for_resource(&resource).insert(Slot::Own { - resource, - lend_count: 0, - }) + self.table_for_resource(&resource) + .resource_own_insert(resource) } /// Implementation of the `resource.rep` canonical intrinsic. /// /// This one's one of the simpler ones: "just get the rep please" pub fn resource_rep(&mut self, index: TypedResourceIndex) -> Result { - self.table_for_index(&index).rep(index) + self.table_for_index(&index).resource_rep(index) } /// Implementation of the `resource.drop` canonical intrinsic minus the @@ -284,22 +241,12 @@ impl ResourceTables<'_> { /// to run. If `None` is returned then that means a `borrow` handle was /// removed and no destructor is necessary. pub fn resource_drop(&mut self, index: TypedResourceIndex) -> Result> { - match self.table_for_index(&index).remove(index)? { - Slot::Own { - resource, - lend_count: 0, - } => resource.rep(&index).map(Some), - Slot::Own { .. } => bail!("cannot remove owned resource while borrowed"), - Slot::Borrow { - scope, resource, .. - } => { - // Validate that this borrow has the correct type to ensure a - // trap is returned if this is a mis-typed `resource.drop`. - resource.rep(&index)?; + match self.table_for_index(&index).remove_resource(index)? { + RemovedResource::Own { rep } => Ok(Some(rep)), + RemovedResource::Borrow { scope } => { self.calls.scopes[scope].borrow_count -= 1; Ok(None) } - Slot::Free { .. } => unreachable!(), } } @@ -313,10 +260,8 @@ impl ResourceTables<'_> { /// /// This is an implementation of the canonical ABI `lower_own` function. pub fn resource_lower_own(&mut self, resource: TypedResource) -> Result { - self.table_for_resource(&resource).insert(Slot::Own { - resource, - lend_count: 0, - }) + self.table_for_resource(&resource) + .resource_own_insert(resource) } /// Attempts to remove an "own" handle from the specified table and its @@ -328,14 +273,9 @@ impl ResourceTables<'_> { /// /// This is an implementation of the canonical ABI `lift_own` function. pub fn resource_lift_own(&mut self, index: TypedResourceIndex) -> Result { - match self.table_for_index(&index).remove(index)? { - Slot::Own { - resource, - lend_count: 0, - } => resource.rep(&index), - Slot::Own { .. } => bail!("cannot remove owned resource while borrowed"), - Slot::Borrow { .. } => bail!("cannot lift own resource from a borrow"), - Slot::Free { .. } => unreachable!(), + match self.table_for_index(&index).remove_resource(index)? { + RemovedResource::Own { rep } => Ok(rep), + RemovedResource::Borrow { .. } => bail!("cannot lift own resource from a borrow"), } } @@ -349,21 +289,12 @@ impl ResourceTables<'_> { /// /// This is an implementation of the canonical ABI `lift_borrow` function. pub fn resource_lift_borrow(&mut self, index: TypedResourceIndex) -> Result { - match self.table_for_index(&index).get_mut(index)? { - Slot::Own { - resource, - lend_count, - } => { - let rep = resource.rep(&index)?; - // The decrement to this count happens in `exit_call`. - *lend_count = lend_count.checked_add(1).unwrap(); - let scope = self.calls.scopes.last_mut().unwrap(); - scope.lenders.push(index); - Ok(rep) - } - Slot::Borrow { resource, .. } => resource.rep(&index), - Slot::Free { .. } => unreachable!(), + let (rep, is_own) = self.table_for_index(&index).resource_lend(index)?; + if is_own { + let scope = self.calls.scopes.last_mut().unwrap(); + scope.lenders.push(index); } + Ok(rep) } /// Records a new `borrow` resource with the given representation within the @@ -383,7 +314,7 @@ impl ResourceTables<'_> { let borrow_count = &mut self.calls.scopes.last_mut().unwrap().borrow_count; *borrow_count = borrow_count.checked_add(1).unwrap(); self.table_for_resource(&resource) - .insert(Slot::Borrow { resource, scope }) + .resource_borrow_insert(resource, scope) } /// Enters a new calling context, starting a fresh count of borrows and @@ -408,88 +339,14 @@ impl ResourceTables<'_> { // Note the panics here which should never get triggered in theory // due to the dynamic tracking of borrows and such employed for // resources. - match self.table_for_index(lender).get_mut(*lender).unwrap() { - Slot::Own { lend_count, .. } => { - *lend_count -= 1; - } - _ => unreachable!(), - } + self.table_for_index(lender) + .resource_undo_lend(*lender) + .unwrap(); } Ok(()) } } -impl ResourceTable { - fn insert(&mut self, new: Slot) -> Result { - let next = self.next as usize; - if next == self.slots.len() { - self.slots.push(Slot::Free { - next: self.next.checked_add(1).unwrap(), - }); - } - let ret = self.next; - self.next = match mem::replace(&mut self.slots[next], new) { - Slot::Free { next } => next, - _ => unreachable!(), - }; - - // The component model reserves index 0 as never allocatable so add one - // to the table index to start the numbering at 1 instead. Also note - // that the component model places an upper-limit per-table on the - // maximum allowed index. - let ret = ret + 1; - if ret >= MAX_RESOURCE_HANDLE { - bail!("cannot allocate another handle: index overflow"); - } - Ok(ret) - } - - fn handle_index_to_table_index(&self, idx: u32) -> Option { - // NB: `idx` is decremented by one to account for the `+1` above during - // allocation. - let idx = idx.checked_sub(1)?; - usize::try_from(idx).ok() - } - - fn rep(&self, idx: TypedResourceIndex) -> Result { - let slot = self - .handle_index_to_table_index(idx.raw_index()) - .and_then(|i| self.slots.get(i)); - match slot { - None | Some(Slot::Free { .. }) => bail!(UnknownHandleIndex(idx)), - Some(Slot::Own { resource, .. } | Slot::Borrow { resource, .. }) => resource.rep(&idx), - } - } - - fn get_mut(&mut self, idx: TypedResourceIndex) -> Result<&mut Slot> { - let slot = self - .handle_index_to_table_index(idx.raw_index()) - .and_then(|i| self.slots.get_mut(i)); - match slot { - None | Some(Slot::Free { .. }) => bail!(UnknownHandleIndex(idx)), - Some(other) => Ok(other), - } - } - - fn remove(&mut self, idx: TypedResourceIndex) -> Result { - let to_fill = Slot::Free { next: self.next }; - let ret = mem::replace(self.get_mut(idx)?, to_fill); - self.next = idx.raw_index() - 1; - Ok(ret) - } -} - -#[derive(Debug)] -struct UnknownHandleIndex(TypedResourceIndex); - -impl fmt::Display for UnknownHandleIndex { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "unknown handle index {}", self.0.raw_index()) - } -} - -impl Error for UnknownHandleIndex {} - #[derive(Debug)] struct ResourceTypeMismatch { expected: TypedResourceIndex, From 6d04eeff357376e1afbd00492b8c741829e1c549 Mon Sep 17 00:00:00 2001 From: Joel Dice Date: Tue, 12 Aug 2025 11:28:39 -0600 Subject: [PATCH 2/3] stop using `HandleTable::reps_to_indexes` when delivering events Per review feedback, we'd like to get rid of `HandleTable::reps_to_indexes` entirely. This commit doesn't go quite that far, but now we only use it for `error-context` handles. For waitables, which can only be referenced by at most one guest at a time, we now store the guest handle in `WaitableCommon::handle` and retrieve it from there when delivering an event for that waitable. For `error-context` handles, the spec requirement that we always lower the same handle for the same `error-context`, combined with the fact that an `error-context` may be referenced by more than one component instance at a time, means we still need some general way to convert a host rep plus component index into a handle. Going forward, we could consider either removing that "same handle" requirement from the spec or consider an alternative implementation (e.g. storing a `HashMap` in the `ErrorContext` host state for keeping track of the handles for each referencing instance. Signed-off-by: Joel Dice --- .../src/runtime/component/concurrent.rs | 109 ++++++------------ .../concurrent/futures_and_streams.rs | 66 ++++++++--- .../src/runtime/vm/component/handle_table.rs | 20 +--- 3 files changed, 88 insertions(+), 107 deletions(-) diff --git a/crates/wasmtime/src/runtime/component/concurrent.rs b/crates/wasmtime/src/runtime/component/concurrent.rs index 4637627012fd..c497c120da9a 100644 --- a/crates/wasmtime/src/runtime/component/concurrent.rs +++ b/crates/wasmtime/src/runtime/component/concurrent.rs @@ -569,7 +569,7 @@ enum WaitMode { Fiber(StoreFiber<'static>), /// The guest task is waiting via a callback declared as part of an /// async-lifted export. - Callback(RuntimeComponentInstanceIndex), + Callback, } /// Represents the reason a fiber is suspending itself. @@ -594,13 +594,6 @@ enum GuestCallKind { /// Indicates there's an event to deliver to the task, possibly related to a /// waitable set the task has been waiting on or polling. DeliverEvent { - /// The (sub-)component instance in which the task has most recently - /// been executing. - /// - /// Note that this might not be the same as the instance the guest task - /// started executing in given that one or more synchronous guest->guest - /// calls may have occurred involving multiple instances. - instance: RuntimeComponentInstanceIndex, /// The waitable set the event belongs to, if any. /// /// If this is `None` the event will be waiting in the @@ -615,11 +608,7 @@ enum GuestCallKind { impl fmt::Debug for GuestCallKind { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { - Self::DeliverEvent { instance, set } => f - .debug_struct("DeliverEvent") - .field("instance", instance) - .field("set", set) - .finish(), + Self::DeliverEvent { set } => f.debug_struct("DeliverEvent").field("set", set).finish(), Self::Start(_) => f.debug_tuple("Start").finish(), } } @@ -672,13 +661,6 @@ struct PollParams { task: TableId, /// The waitable set being polled. set: TableId, - /// The (sub-)component instance in which the task has most recently been - /// executing. - /// - /// Note that this might not be the same as the instance the guest task - /// started executing in given that one or more synchronous guest->guest - /// calls may have occurred involving multiple instances. - instance: RuntimeComponentInstanceIndex, } /// Represents a pending work item to be handled by the event loop for a given @@ -781,10 +763,7 @@ impl ComponentInstance { task.event = Some(Event::None); state.push_low_priority(WorkItem::GuestCall(GuestCall { task: guest_task, - kind: GuestCallKind::DeliverEvent { - instance: runtime_instance, - set: None, - }, + kind: GuestCallKind::DeliverEvent { set: None }, })); } callback_code::WAIT | callback_code::POLL => { @@ -797,10 +776,7 @@ impl ComponentInstance { // An event is immediately available; deliver it ASAP. state.push_high_priority(WorkItem::GuestCall(GuestCall { task: guest_task, - kind: GuestCallKind::DeliverEvent { - instance: runtime_instance, - set: Some(set), - }, + kind: GuestCallKind::DeliverEvent { set: Some(set) }, })); } else { // No event is immediately available. @@ -810,7 +786,6 @@ impl ComponentInstance { // event has arrived after that. state.push_low_priority(WorkItem::Poll(PollParams { task: guest_task, - instance: runtime_instance, set, })); } @@ -826,7 +801,7 @@ impl ComponentInstance { let old = state .get_mut(set)? .waiting - .insert(guest_task, WaitMode::Callback(runtime_instance)); + .insert(guest_task, WaitMode::Callback); assert!(old.is_none()); } _ => unreachable!(), @@ -844,7 +819,6 @@ impl ComponentInstance { fn get_event( mut self: Pin<&mut Self>, guest_task: TableId, - instance: RuntimeComponentInstanceIndex, set: Option>, ) -> Result)>> { let state = self.as_mut().concurrent_state_mut(); @@ -863,15 +837,14 @@ impl ComponentInstance { }) .transpose()? { - let event = waitable.common(state)?.event.take().unwrap(); + let common = waitable.common(state)?; + let handle = common.handle.unwrap(); + let event = common.event.take().unwrap(); log::trace!( - "deliver event {event:?} to {guest_task:?} for {waitable:?}; set {set:?}" + "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}" ); - let handle = - self.as_mut().guest_tables().0[instance].waitable_by_rep(waitable.rep())?; - waitable.on_delivery(self, event); Some((event, Some((waitable, handle)))) @@ -973,6 +946,8 @@ impl ComponentInstance { } }; + waitable.common(concurrent_state)?.handle = None; + if waitable.take_event(concurrent_state)?.is_some() { bail!("cannot drop a subtask with an undelivered event"); } @@ -1396,7 +1371,6 @@ impl Instance { state.push_high_priority(WorkItem::GuestCall(GuestCall { task: params.task, kind: GuestCallKind::DeliverEvent { - instance: params.instance, set: Some(params.set), }, })); @@ -1407,7 +1381,6 @@ impl Instance { state.push_high_priority(WorkItem::GuestCall(GuestCall { task: params.task, kind: GuestCallKind::DeliverEvent { - instance: params.instance, set: Some(params.set), }, })); @@ -1487,15 +1460,9 @@ impl Instance { /// Execute the specified guest call. fn handle_guest_call(self, store: &mut dyn VMStore, call: GuestCall) -> Result<()> { match call.kind { - GuestCallKind::DeliverEvent { - instance: runtime_instance, - set, - } => { - let (event, waitable) = self - .id() - .get_mut(store) - .get_event(call.task, runtime_instance, set)? - .unwrap(); + GuestCallKind::DeliverEvent { set } => { + let (event, waitable) = + self.id().get_mut(store).get_event(call.task, set)?.unwrap(); let state = self.concurrent_state_mut(store); let task = state.get_mut(call.task)?; let runtime_instance = task.instance; @@ -2257,7 +2224,7 @@ impl Instance { let state = self.concurrent_state_mut(store.0); - let event = Waitable::Guest(guest_task).take_event(state)?; + let event = guest_waitable.take_event(state)?; let Some(Event::Subtask { status }) = event else { unreachable!(); }; @@ -2271,13 +2238,13 @@ impl Instance { // It hasn't returned yet, but the caller is calling via an // async-lowered import, so we generate a handle for the task // waitable and return the status. - break ( - status, - Some( - self.id().get_mut(store.0).guest_tables().0[caller_instance] - .subtask_insert_guest(guest_task.rep())?, - ), - ); + let handle = self.id().get_mut(store.0).guest_tables().0[caller_instance] + .subtask_insert_guest(guest_task.rep())?; + self.concurrent_state_mut(store.0) + .get_mut(guest_task)? + .common + .handle = Some(handle); + break (status, Some(handle)); } else { // The callee hasn't returned yet, and the caller is calling via // a sync-lowered import, so we loop and keep waiting until the @@ -2459,6 +2426,10 @@ impl Instance { self.concurrent_state_mut(store.0).push_future(future); let handle = self.id().get_mut(store.0).guest_tables().0[caller_instance] .subtask_insert_host(task.rep())?; + self.concurrent_state_mut(store.0) + .get_mut(task)? + .common + .handle = Some(handle); log::trace!( "assign {task:?} handle {handle} for {caller:?} instance {caller_instance:?}" ); @@ -2731,7 +2702,6 @@ impl Instance { async_, WaitableCheck::Wait(WaitableCheckParams { set: TableId::new(rep), - caller_instance, options, payload, }), @@ -2757,7 +2727,6 @@ impl Instance { async_, WaitableCheck::Poll(WaitableCheckParams { set: TableId::new(rep), - caller_instance, options, payload, }), @@ -2832,11 +2801,10 @@ impl Instance { let result = match check { // Deliver any pending events to the guest and return. WaitableCheck::Wait(params) | WaitableCheck::Poll(params) => { - let event = self.id().get_mut(store).get_event( - guest_task, - params.caller_instance, - Some(params.set), - )?; + let event = self + .id() + .get_mut(store) + .get_event(guest_task, Some(params.set))?; let (ordinal, handle, result) = if wait { let (event, waitable) = event.unwrap(); @@ -2954,12 +2922,9 @@ impl Instance { .unwrap() { WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber), - WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall { + WaitMode::Callback => WorkItem::GuestCall(GuestCall { task: guest_task, - kind: GuestCallKind::DeliverEvent { - instance, - set: None, - }, + kind: GuestCallKind::DeliverEvent { set: None }, }), }; concurrent_state.push_high_priority(item); @@ -3710,6 +3675,8 @@ struct WaitableCommon { event: Option, /// The set to which this waitable belongs, if any. set: Option>, + /// The handle with which the guest refers to this waitable, if any. + handle: Option, } /// Represents a Component Model Async `waitable`. @@ -3824,12 +3791,9 @@ impl Waitable { let item = match mode { WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber), - WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall { + WaitMode::Callback => WorkItem::GuestCall(GuestCall { task, - kind: GuestCallKind::DeliverEvent { - instance, - set: Some(set), - }, + kind: GuestCallKind::DeliverEvent { set: Some(set) }, }), }; state.push_high_priority(item); @@ -4358,7 +4322,6 @@ fn unpack_callback_code(code: u32) -> (u32, u32) { /// `waitable-set.poll`. struct WaitableCheckParams { set: TableId, - caller_instance: RuntimeComponentInstanceIndex, options: OptionsIndex, payload: u32, } diff --git a/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs b/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs index 3f6d970904f7..c5a704aa3235 100644 --- a/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs +++ b/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs @@ -655,7 +655,9 @@ impl FutureReader { } let id = TableId::::new(rep); let concurrent_state = cx.instance_mut().concurrent_state_mut(); - let state = concurrent_state.get(id)?.state; + let future = concurrent_state.get_mut(id)?; + future.common.handle = None; + let state = future.state; if concurrent_state.get(state)?.done { bail!("cannot lift future after previous read succeeded"); @@ -722,14 +724,22 @@ pub(crate) fn lower_future_to_index( match ty { InterfaceType::Future(dst) => { let concurrent_state = cx.instance_mut().concurrent_state_mut(); - let state = concurrent_state - .get(TableId::::new(rep))? - .state; + let id = TableId::::new(rep); + let state = concurrent_state.get(id)?.state; let rep = concurrent_state.get(state)?.read_handle.rep(); - cx.instance_mut() + let handle = cx + .instance_mut() .table_for_transmit(TransmitIndex::Future(dst)) - .future_insert_read(dst, rep) + .future_insert_read(dst, rep)?; + + cx.instance_mut() + .concurrent_state_mut() + .get_mut(id)? + .common + .handle = Some(handle); + + Ok(handle) } _ => func::bad_type_info(), } @@ -1220,6 +1230,11 @@ impl StreamReader { bail!("cannot lift stream after being notified that the writable end dropped"); } let id = TableId::::new(rep); + cx.instance_mut() + .concurrent_state_mut() + .get_mut(id)? + .common + .handle = None; Ok(Self::new(id, cx.instance_handle())) } _ => func::bad_type_info(), @@ -1281,14 +1296,22 @@ pub(crate) fn lower_stream_to_index( match ty { InterfaceType::Stream(dst) => { let concurrent_state = cx.instance_mut().concurrent_state_mut(); - let state = concurrent_state - .get(TableId::::new(rep))? - .state; + let id = TableId::::new(rep); + let state = concurrent_state.get(id)?.state; let rep = concurrent_state.get(state)?.read_handle.rep(); - cx.instance_mut() + let handle = cx + .instance_mut() .table_for_transmit(TransmitIndex::Stream(dst)) - .stream_insert_read(dst, rep) + .stream_insert_read(dst, rep)?; + + cx.instance_mut() + .concurrent_state_mut() + .get_mut(id)? + .common + .handle = Some(handle); + + Ok(handle) } _ => func::bad_type_info(), } @@ -3048,7 +3071,7 @@ impl ComponentInstance { let (write, read) = self.as_mut().concurrent_state_mut().new_transmit()?; let table = self.as_mut().table_for_transmit(ty); - let (read, write) = match ty { + let (read_handle, write_handle) = match ty { TransmitIndex::Future(ty) => ( table.future_insert_read(ty, read.rep())?, table.future_insert_write(ty, write.rep())?, @@ -3058,7 +3081,15 @@ impl ComponentInstance { table.stream_insert_write(ty, write.rep())?, ), }; - Ok(ResourcePair { write, read }) + + let state = self.as_mut().concurrent_state_mut(); + state.get_mut(read)?.common.handle = Some(read_handle); + state.get_mut(write)?.common.handle = Some(write_handle); + + Ok(ResourcePair { + write: write_handle, + read: read_handle, + }) } /// Cancel a pending write for the specified stream or future from the guest. @@ -3167,10 +3198,15 @@ impl ComponentInstance { bail!("cannot lift after being notified that the writable end dropped"); } let dst_table = self.as_mut().table_for_transmit(dst); - match dst { + let handle = match dst { TransmitIndex::Future(idx) => dst_table.future_insert_read(idx, rep), TransmitIndex::Stream(idx) => dst_table.stream_insert_read(idx, rep), - } + }?; + self.concurrent_state_mut() + .get_mut(TableId::::new(rep))? + .common + .handle = Some(handle); + Ok(handle) } /// Implements the `future.new` intrinsic. diff --git a/crates/wasmtime/src/runtime/vm/component/handle_table.rs b/crates/wasmtime/src/runtime/vm/component/handle_table.rs index 9e777a37cc55..a4a8381b8d82 100644 --- a/crates/wasmtime/src/runtime/vm/component/handle_table.rs +++ b/crates/wasmtime/src/runtime/vm/component/handle_table.rs @@ -114,11 +114,7 @@ enum Slot { impl Slot { fn rep_for_reps_to_indexes(&self) -> Option { match self { - Self::HostTask { rep } - | Self::GuestTask { rep } - | Self::Stream { rep, .. } - | Self::Future { rep, .. } - | Self::ErrorContext { rep, .. } => Some(*rep), + Self::ErrorContext { rep, .. } => Some(*rep), _ => None, } } @@ -648,18 +644,4 @@ impl HandleTable { _ => bail!("handle is not a waitable"), } } - - /// TODO: delete this ideally? - pub fn waitable_by_rep(&mut self, rep: u32) -> Result { - let Some((idx, slot)) = self.get_mut_by_rep(rep) else { - bail!("handle does not exist") - }; - match slot { - Slot::GuestTask { .. } => Ok(idx), - Slot::HostTask { .. } => Ok(idx), - Slot::Future { .. } => Ok(idx), - Slot::Stream { .. } => Ok(idx), - _ => bail!("handle is not a waitable"), - } - } } From 0c4ea1c6483e666ce6163b9abd3105d42731e6d5 Mon Sep 17 00:00:00 2001 From: Joel Dice Date: Tue, 12 Aug 2025 14:19:18 -0600 Subject: [PATCH 3/3] remove `HandleTable::reps_to_indexes` Turns out the spec no longer requires that guests receive the same handle for a given `error-context` as the one they already have, so we no longer need this field -- nor do we need to maintain a per-component-instance reference count. Signed-off-by: Joel Dice --- .../concurrent/futures_and_streams.rs | 5 +- .../src/runtime/vm/component/handle_table.rs | 91 ++----------------- 2 files changed, 7 insertions(+), 89 deletions(-) diff --git a/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs b/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs index c5a704aa3235..61b798719d79 100644 --- a/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs +++ b/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs @@ -3152,8 +3152,7 @@ impl ComponentInstance { ) -> Result<()> { let local_handle_table = self.as_mut().table_for_error_context(ty); - // Reduce the local (sub)component ref count, removing tracking if necessary - let (rep, local_ref_removed) = local_handle_table.error_context_drop(error_context)?; + let rep = local_handle_table.error_context_drop(error_context)?; let global_ref_count_idx = TypeComponentGlobalErrorContextTableIndex::from_u32(rep); @@ -3167,8 +3166,6 @@ impl ComponentInstance { assert!(*global_ref_count >= 1); *global_ref_count -= 1; if *global_ref_count == 0 { - assert!(local_ref_removed); - state .global_error_context_ref_counts .remove(&global_ref_count_idx); diff --git a/crates/wasmtime/src/runtime/vm/component/handle_table.rs b/crates/wasmtime/src/runtime/vm/component/handle_table.rs index a4a8381b8d82..c550e2fde871 100644 --- a/crates/wasmtime/src/runtime/vm/component/handle_table.rs +++ b/crates/wasmtime/src/runtime/vm/component/handle_table.rs @@ -101,33 +101,13 @@ enum Slot { /// Represents an error-context handle. ErrorContext { - /// Number of references held by the (sub-)component. - /// - /// This does not include the number of references which might be held - /// by other (sub-)components. - local_ref_count: u32, - rep: u32, }, } -impl Slot { - fn rep_for_reps_to_indexes(&self) -> Option { - match self { - Self::ErrorContext { rep, .. } => Some(*rep), - _ => None, - } - } -} - pub struct HandleTable { next: u32, slots: Vec, - // TODO: This is a sparse table (where zero means "no entry"); it might make - // more sense to use a `HashMap` here, but we'd need one that's - // no_std-compatible. A `BTreeMap` might also be appropriate if we restrict - // ourselves to `alloc::collections`. - reps_to_indexes: Vec, } impl Default for HandleTable { @@ -135,7 +115,6 @@ impl Default for HandleTable { Self { next: 0, slots: Vec::new(), - reps_to_indexes: Vec::new(), } } } @@ -149,16 +128,6 @@ impl HandleTable { } fn insert(&mut self, slot: Slot) -> Result { - let rep_usize = slot - .rep_for_reps_to_indexes() - .map(|r| usize::try_from(r).unwrap()); - - if let Some(rep) = rep_usize { - if matches!(self.reps_to_indexes.get(rep), Some(idx) if *idx != 0) { - bail!("rep {rep} already exists in this table"); - } - } - let next = self.next as usize; if next == self.slots.len() { self.slots.push(Slot::Free { @@ -179,28 +148,14 @@ impl HandleTable { bail!("cannot allocate another handle: index overflow"); } - if let Some(rep) = rep_usize { - if self.reps_to_indexes.len() <= rep { - self.reps_to_indexes.resize(rep.checked_add(1).unwrap(), 0); - } - self.reps_to_indexes[rep] = ret; - } - Ok(ret) } fn remove(&mut self, idx: u32) -> Result<()> { let to_fill = Slot::Free { next: self.next }; let slot = self.get_mut(idx)?; - let rep = slot.rep_for_reps_to_indexes(); *slot = to_fill; self.next = idx - 1; - - if let Some(rep) = rep { - let rep = usize::try_from(rep).unwrap(); - assert_eq!(idx, self.reps_to_indexes[rep]); - self.reps_to_indexes[rep] = 0; - } Ok(()) } @@ -221,16 +176,6 @@ impl HandleTable { } } - fn get_mut_by_rep(&mut self, rep: u32) -> Option<(u32, &mut Slot)> { - let index = *self.reps_to_indexes.get(usize::try_from(rep).unwrap())?; - if index > 0 { - let slot = self.get_mut(index).unwrap(); - Some((index, slot)) - } else { - None - } - } - /// Inserts a new `own` resource into this table whose type/rep are /// specified by `resource`. pub fn resource_own_insert(&mut self, resource: TypedResource) -> Result { @@ -532,51 +477,27 @@ impl HandleTable { /// Inserts the error-context `rep` into this table, returning the index it /// now resides at. pub fn error_context_insert(&mut self, rep: u32) -> Result { - if let Some(( - dst_idx, - Slot::ErrorContext { - local_ref_count, .. - }, - )) = self.get_mut_by_rep(rep) - { - *local_ref_count += 1; - return Ok(dst_idx); - } - - self.insert(Slot::ErrorContext { - rep, - local_ref_count: 1, - }) + self.insert(Slot::ErrorContext { rep }) } /// Returns the `rep` of an error-context pointed to by `idx`. pub fn error_context_rep(&mut self, idx: u32) -> Result { match self.get_mut(idx)? { - Slot::ErrorContext { rep, .. } => Ok(*rep), + Slot::ErrorContext { rep } => Ok(*rep), _ => bail!("handle is not an error-context"), } } /// Drops the error-context pointed to by `idx`. /// - /// Returns the internal `rep` and whether the reference count has reached - /// 0 meaning it was fully removed. - pub fn error_context_drop(&mut self, idx: u32) -> Result<(u32, bool)> { + /// Returns the internal `rep`. + pub fn error_context_drop(&mut self, idx: u32) -> Result { let rep = match self.get_mut(idx)? { - Slot::ErrorContext { - rep, - local_ref_count, - } => { - *local_ref_count -= 1; - if *local_ref_count > 0 { - return Ok((*rep, false)); - } - *rep - } + Slot::ErrorContext { rep } => *rep, _ => bail!("handle is not an error-context"), }; self.remove(idx)?; - Ok((rep, true)) + Ok(rep) } /// Inserts `rep` as a guest subtask into this table.