Skip to content
This repository was archived by the owner on Sep 8, 2025. It is now read-only.

Commit c42ed27

Browse files
authored
Refactor AbortHandle, renamed to JoinHandle (#11414)
* Refactor `AbortHandle`, renamed to `JoinHandle` This commit is inspired by recent discussions about providing more guarantees around dropping WASI resources. This commit renames `wasmtime::component::AbortHandle` to `wasmtime::component::JoinHandle` and additionally implements `Future for JoinHandle` to know when the associated task's future has been dropped. The renaming is intended to more closely align with `tokio::task::JoinHandle` where `tokio::task::AbortHandle` is notably different and doesn't have a `Future` implementation. The `Future` implementation helps embedders know exactly when a value has been dropped and synchronize on that. * Clarify lack of abort-on-drop behavior
1 parent c6dddea commit c42ed27

5 files changed

Lines changed: 270 additions & 58 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/wasmtime/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ tempfile = { workspace = true }
103103
libtest-mimic = { workspace = true }
104104
cranelift-native = { workspace = true }
105105
wasmtime-test-util = { workspace = true }
106+
tokio = { workspace = true, features = ["macros", "sync"] }
106107

107108
[build-dependencies]
108109
cc = { workspace = true, optional = true }

crates/wasmtime/src/runtime/component/concurrent.rs

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ use wasmtime_environ::component::{
8787
TypeTupleIndex,
8888
};
8989

90-
pub use abort::AbortHandle;
90+
pub use abort::JoinHandle;
9191
pub use futures_and_streams::{
9292
ErrorContext, FutureReader, FutureWriter, GuardedFutureReader, GuardedFutureWriter,
9393
GuardedStreamReader, GuardedStreamWriter, ReadBuffer, StreamReader, StreamWriter, VecBuffer,
@@ -226,7 +226,7 @@ where
226226
/// Spawn a background task.
227227
///
228228
/// See [`Accessor::spawn`] for details.
229-
pub fn spawn(&mut self, task: impl AccessorTask<T, D, Result<()>>) -> AbortHandle
229+
pub fn spawn(&mut self, task: impl AccessorTask<T, D, Result<()>>) -> JoinHandle
230230
where
231231
T: 'static,
232232
{
@@ -497,14 +497,14 @@ where
497497
/// or `future` such that the code to write to the write end of that
498498
/// `stream` or `future` must run after the function returns.
499499
///
500-
/// The returned [`AbortHandle`] may be used to cancel the task.
500+
/// The returned [`JoinHandle`] may be used to cancel the task.
501501
///
502502
/// # Panics
503503
///
504504
/// Panics if called within a closure provided to the [`Accessor::with`]
505505
/// function. This can only be called outside an active invocation of
506506
/// [`Accessor::with`].
507-
pub fn spawn(&self, task: impl AccessorTask<T, D, Result<()>>) -> AbortHandle
507+
pub fn spawn(&self, task: impl AccessorTask<T, D, Result<()>>) -> JoinHandle
508508
where
509509
T: 'static,
510510
{
@@ -1123,7 +1123,7 @@ impl ConcurrentState {
11231123
WaitableState::HostTask => {
11241124
let id = TableId::<HostTask>::new(rep);
11251125
let task = self.get(id)?;
1126-
if task.abort_handle.is_some() {
1126+
if task.join_handle.is_some() {
11271127
bail!("cannot drop a subtask which has not yet resolved");
11281128
}
11291129
(Waitable::Host(id), task.caller_instance, true)
@@ -1344,7 +1344,7 @@ impl Instance {
13441344
self,
13451345
mut store: impl AsContextMut<Data = U>,
13461346
task: impl AccessorTask<U, HasSelf<U>, Result<()>>,
1347-
) -> AbortHandle {
1347+
) -> JoinHandle {
13481348
let mut store = store.as_context_mut();
13491349
let accessor = Accessor::new(StoreToken::new(store.as_context_mut()), Some(self));
13501350
self.spawn_with_accessor(store, accessor, task)
@@ -1357,7 +1357,7 @@ impl Instance {
13571357
mut store: StoreContextMut<T>,
13581358
accessor: Accessor<T, D>,
13591359
task: impl AccessorTask<T, D, Result<()>>,
1360-
) -> AbortHandle
1360+
) -> JoinHandle
13611361
where
13621362
T: 'static,
13631363
D: HasData + ?Sized,
@@ -1368,7 +1368,7 @@ impl Instance {
13681368
// hook calls to poll and possibly spawn more background tasks on each
13691369
// iteration.
13701370
let (handle, future) =
1371-
AbortHandle::run(async move { HostTaskOutput::Result(task.run(&accessor).await) });
1371+
JoinHandle::run(async move { HostTaskOutput::Result(task.run(&accessor).await) });
13721372
self.concurrent_state_mut(store.0)
13731373
.push_future(Box::pin(async move {
13741374
future.await.unwrap_or(HostTaskOutput::Result(Ok(())))
@@ -2544,7 +2544,7 @@ impl Instance {
25442544

25452545
// Create an abortable future which hooks calls to poll and manages call
25462546
// context state for the future.
2547-
let (abort_handle, future) = AbortHandle::run(async move {
2547+
let (join_handle, future) = JoinHandle::run(async move {
25482548
let mut future = pin!(future);
25492549
let mut call_context = None;
25502550
future::poll_fn(move |cx| {
@@ -2586,7 +2586,7 @@ impl Instance {
25862586
// We create a new host task even though it might complete immediately
25872587
// (in which case we won't need to pass a waitable back to the guest).
25882588
// If it does complete immediately, we'll remove it before we return.
2589-
let task = state.push(HostTask::new(caller_instance, Some(abort_handle)))?;
2589+
let task = state.push(HostTask::new(caller_instance, Some(join_handle)))?;
25902590

25912591
log::trace!("new host task child of {caller:?}: {task:?}");
25922592
let token = StoreToken::new(store.as_context_mut());
@@ -2604,7 +2604,7 @@ impl Instance {
26042604
let mut store = token.as_context_mut(store);
26052605
lower(store.as_context_mut(), instance, result?)?;
26062606
let state = instance.concurrent_state_mut(store.0);
2607-
state.get_mut(task)?.abort_handle.take();
2607+
state.get_mut(task)?.join_handle.take();
26082608
Waitable::Host(task).set_event(
26092609
state,
26102610
Some(Event::Subtask {
@@ -3106,7 +3106,7 @@ impl Instance {
31063106
log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
31073107

31083108
if let Waitable::Host(host_task) = waitable {
3109-
if let Some(handle) = concurrent_state.get_mut(host_task)?.abort_handle.take() {
3109+
if let Some(handle) = concurrent_state.get_mut(host_task)?.join_handle.take() {
31103110
handle.abort();
31113111
return Ok(Status::ReturnCancelled as u32);
31123112
}
@@ -3678,18 +3678,18 @@ type HostTaskFuture = Pin<Box<dyn Future<Output = HostTaskOutput> + Send + 'stat
36783678
struct HostTask {
36793679
common: WaitableCommon,
36803680
caller_instance: RuntimeComponentInstanceIndex,
3681-
abort_handle: Option<AbortHandle>,
3681+
join_handle: Option<JoinHandle>,
36823682
}
36833683

36843684
impl HostTask {
36853685
fn new(
36863686
caller_instance: RuntimeComponentInstanceIndex,
3687-
abort_handle: Option<AbortHandle>,
3687+
join_handle: Option<JoinHandle>,
36883688
) -> Self {
36893689
Self {
36903690
common: WaitableCommon::default(),
36913691
caller_instance,
3692-
abort_handle,
3692+
join_handle,
36933693
}
36943694
}
36953695
}

0 commit comments

Comments
 (0)