Skip to content

Commit 587ca6f

Browse files
authored
ensure component value lowerings are run on a worker fiber (#11353)
* ensure component value lowerings are run on a worker fiber This is necessary because lowering a component value may require calling realloc, which may involve epoch interruption, which may require suspending the current fiber. Which obviously doesn't work if we're not running in a fiber. Also, we need to make sure there are no host frames on the stack. Note the use of `Mutex` for `WorkItem::WorkerFunction` and `WorkerItem::Function`. We never lock the mutex -- it's only used to plumb through the inner, non-`Sync` value while satisfying the compiler that `Store` is `Sync`. Signed-off-by: Joel Dice <joel.dice@fermyon.com> * add "we're on a fiber" assertion to `LowerContext::new` Signed-off-by: Joel Dice <joel.dice@fermyon.com> --------- Signed-off-by: Joel Dice <joel.dice@fermyon.com>
1 parent d2e1761 commit 587ca6f

File tree

3 files changed

+62
-26
lines changed

3 files changed

+62
-26
lines changed

crates/wasmtime/src/runtime/component/concurrent.rs

Lines changed: 48 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -674,6 +674,12 @@ impl GuestCall {
674674
}
675675
}
676676

677+
/// Job to be run on a worker fiber.
678+
enum WorkerItem {
679+
GuestCall(GuestCall),
680+
Function(Mutex<Box<dyn FnOnce(&mut dyn VMStore, Instance) -> Result<()> + Send>>),
681+
}
682+
677683
/// Represents state related to an in-progress poll operation (e.g. `task.poll`
678684
/// or `CallbackCode.POLL`).
679685
#[derive(Debug)]
@@ -702,6 +708,8 @@ enum WorkItem {
702708
GuestCall(GuestCall),
703709
/// A pending `task.poll` or `CallbackCode.POLL` operation.
704710
Poll(PollParams),
711+
/// A job to run on a worker fiber.
712+
WorkerFunction(Mutex<Box<dyn FnOnce(&mut dyn VMStore, Instance) -> Result<()> + Send>>),
705713
}
706714

707715
impl fmt::Debug for WorkItem {
@@ -711,6 +719,7 @@ impl fmt::Debug for WorkItem {
711719
Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
712720
Self::GuestCall(call) => f.debug_tuple("GuestCall").field(call).finish(),
713721
Self::Poll(params) => f.debug_tuple("Poll").field(params).finish(),
722+
Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
714723
}
715724
}
716725
}
@@ -1403,8 +1412,22 @@ impl Instance {
14031412
// immediately if one of them fails.
14041413
let next = match self.set_tls(store.0, || next.as_mut().poll(cx)) {
14051414
Poll::Ready(Some(output)) => {
1406-
if let Err(e) = output.consume(store.0.traitobj_mut(), self) {
1407-
return Poll::Ready(Err(e));
1415+
match output {
1416+
HostTaskOutput::Result(Err(e)) => return Poll::Ready(Err(e)),
1417+
HostTaskOutput::Result(Ok(())) => {}
1418+
HostTaskOutput::Function(fun) => {
1419+
// Defer calling this function to a worker fiber
1420+
// in case it involves calling a guest realloc
1421+
// function as part of a lowering operation.
1422+
//
1423+
// TODO: This isn't necessary for _all_
1424+
// `HostOutput::Function`s, so we could optimize
1425+
// by adding another variant to `HostOutput` to
1426+
// distinguish which ones need it and which
1427+
// don't.
1428+
self.concurrent_state_mut(store.0)
1429+
.push_high_priority(WorkItem::WorkerFunction(Mutex::new(fun)))
1430+
}
14081431
}
14091432
Poll::Ready(true)
14101433
}
@@ -1424,12 +1447,14 @@ impl Instance {
14241447
let ready = mem::take(&mut state.low_priority);
14251448
if ready.is_empty() {
14261449
return match next {
1427-
// In this case, one of the futures in
1428-
// `ConcurrentState::futures` completed
1429-
// successfully, so we return now and continue the
1430-
// outer loop in case there is another one ready to
1431-
// complete.
1432-
Poll::Ready(true) => Poll::Ready(Ok(Either::Right(Vec::new()))),
1450+
Poll::Ready(true) => {
1451+
// In this case, one of the futures in
1452+
// `ConcurrentState::futures` completed
1453+
// successfully, so we return now and continue
1454+
// the outer loop in case there is another one
1455+
// ready to complete.
1456+
Poll::Ready(Ok(Either::Right(Vec::new())))
1457+
}
14331458
Poll::Ready(false) => {
14341459
// Poll the future we were passed one last time
14351460
// in case one of `ConcurrentState::futures` had
@@ -1534,7 +1559,8 @@ impl Instance {
15341559
WorkItem::GuestCall(call) => {
15351560
let state = self.concurrent_state_mut(store);
15361561
if call.is_ready(state)? {
1537-
self.run_on_worker(store, call).await?;
1562+
self.run_on_worker(store, WorkerItem::GuestCall(call))
1563+
.await?;
15381564
} else {
15391565
let task = state.get_mut(call.task)?;
15401566
if !task.starting_sent {
@@ -1583,6 +1609,9 @@ impl Instance {
15831609
}));
15841610
}
15851611
}
1612+
WorkItem::WorkerFunction(fun) => {
1613+
self.run_on_worker(store, WorkerItem::Function(fun)).await?;
1614+
}
15861615
}
15871616

15881617
Ok(())
@@ -1628,23 +1657,25 @@ impl Instance {
16281657
}
16291658

16301659
/// Execute the specified guest call on a worker fiber.
1631-
async fn run_on_worker(self, store: &mut StoreOpaque, call: GuestCall) -> Result<()> {
1660+
async fn run_on_worker(self, store: &mut StoreOpaque, item: WorkerItem) -> Result<()> {
16321661
let worker = if let Some(fiber) = self.concurrent_state_mut(store).worker.take() {
16331662
fiber
16341663
} else {
16351664
fiber::make_fiber(store.traitobj_mut(), move |store| {
16361665
loop {
1637-
let call = self.concurrent_state_mut(store).guest_call.take().unwrap();
1638-
self.handle_guest_call(store, call)?;
1666+
match self.concurrent_state_mut(store).worker_item.take().unwrap() {
1667+
WorkerItem::GuestCall(call) => self.handle_guest_call(store, call)?,
1668+
WorkerItem::Function(fun) => fun.into_inner().unwrap()(store, self)?,
1669+
}
16391670

16401671
self.suspend(store, SuspendReason::NeedWork)?;
16411672
}
16421673
})?
16431674
};
16441675

1645-
let guest_call = &mut self.concurrent_state_mut(store).guest_call;
1646-
assert!(guest_call.is_none());
1647-
*guest_call = Some(call);
1676+
let worker_item = &mut self.concurrent_state_mut(store).worker_item;
1677+
assert!(worker_item.is_none());
1678+
*worker_item = Some(item);
16481679

16491680
self.resume_fiber(store, worker).await
16501681
}
@@ -4187,7 +4218,7 @@ pub struct ConcurrentState {
41874218
/// This helps us avoid creating a new fiber for each `GuestCall` work item.
41884219
worker: Option<StoreFiber<'static>>,
41894220
/// A place to stash the work item for which we're resuming a worker fiber.
4190-
guest_call: Option<GuestCall>,
4221+
worker_item: Option<WorkerItem>,
41914222

41924223
/// (Sub)Component specific error context tracking
41934224
///
@@ -4249,7 +4280,7 @@ impl ConcurrentState {
42494280
low_priority: Vec::new(),
42504281
suspend_reason: None,
42514282
worker: None,
4252-
guest_call: None,
4283+
worker_item: None,
42534284
error_context_tables,
42544285
global_error_context_ref_counts: BTreeMap::new(),
42554286
component: component.clone(),

crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use super::{
33
Event, GlobalErrorContextRefCount, LocalErrorContextRefCount, StateTable, Waitable,
44
WaitableCommon, WaitableState,
55
};
6-
use crate::component::concurrent::{ConcurrentState, HostTaskOutput, tls};
6+
use crate::component::concurrent::{ConcurrentState, WorkItem};
77
use crate::component::func::{self, LiftContext, LowerContext, Options};
88
use crate::component::matching::InstanceType;
99
use crate::component::values::{ErrorContextAny, FutureAny, StreamAny};
@@ -24,7 +24,7 @@ use std::iter;
2424
use std::marker::PhantomData;
2525
use std::mem::{self, ManuallyDrop, MaybeUninit};
2626
use std::string::{String, ToString};
27-
use std::sync::Arc;
27+
use std::sync::{Arc, Mutex};
2828
use std::task::{Poll, Waker};
2929
use std::vec::Vec;
3030
use wasmtime_environ::component::{
@@ -1792,13 +1792,12 @@ impl Instance {
17921792
// embedder frames on the stack is unsound.
17931793
let (tx, rx) = oneshot::channel();
17941794
let token = StoreToken::new(store.as_context_mut());
1795-
self.concurrent_state_mut(store.0)
1796-
.push_future(Box::pin(async move {
1797-
HostTaskOutput::Result(tls::get(|store| {
1798-
_ = tx.send(accept(token.as_context_mut(store))?);
1799-
Ok(())
1800-
}))
1801-
}));
1795+
self.concurrent_state_mut(store.0).push_high_priority(
1796+
WorkItem::WorkerFunction(Mutex::new(Box::new(move |store, _| {
1797+
_ = tx.send(accept(token.as_context_mut(store))?);
1798+
Ok(())
1799+
}))),
1800+
);
18021801
Err(rx)
18031802
}
18041803
}

crates/wasmtime/src/runtime/component/func/options.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,12 @@ impl<'a, T: 'static> LowerContext<'a, T> {
241241
types: &'a ComponentTypes,
242242
instance: Instance,
243243
) -> LowerContext<'a, T> {
244+
#[cfg(all(debug_assertions, feature = "component-model-async"))]
245+
if store.engine().config().async_support {
246+
// Assert that we're running on a fiber, which is necessary in
247+
// case we call the guest's realloc function.
248+
store.0.with_blocking(|_, _| {});
249+
}
244250
LowerContext {
245251
store,
246252
options,

0 commit comments

Comments
 (0)