Skip to content

Commit 228515c

Browse files
authored
Don't support fallible drop in futures_and_streams (#11351)
* Don't support fallible drop in futures_and_streams This commit is a refinement of #11325 to use `.unwrap()` internally instead of ignoring errors from dropping futures and streams. Fallible drop isn't supported in Rust and these shouldn't panic assuming the host is properly matching handles to stores. * Fix build of wasmtime-wasi * Don't empty the table during store destruction Leave it around while futures/fibers are being manually dropped so any destructors associated there get access to the table (as required by streams/futures/etc). * Remove unused import
1 parent 5a8e46e commit 228515c

File tree

8 files changed

+56
-71
lines changed

8 files changed

+56
-71
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/misc/component-async-tests/tests/scenario/streams.rs

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -54,71 +54,67 @@ pub async fn async_watch_streams() -> Result<()> {
5454
.run_concurrent(&mut store, async |store| {
5555
futures::join!(tx.watch_reader(store), async { rx.close_with(store) }).1
5656
})
57-
.await??;
57+
.await?;
5858

5959
// Test dropping and then watching the read end of a stream.
6060
let (mut tx, rx) = instance.stream::<u8>(&mut store)?;
6161
instance
6262
.run_concurrent(&mut store, async |store| {
63-
rx.close_with(store)?;
63+
rx.close_with(store);
6464
tx.watch_reader(store).await;
65-
anyhow::Ok(())
6665
})
67-
.await??;
66+
.await?;
6867

6968
// Test watching and then dropping the write end of a stream.
7069
let (tx, mut rx) = instance.stream::<u8>(&mut store)?;
7170
instance
7271
.run_concurrent(&mut store, async |store| {
7372
futures::join!(rx.watch_writer(store), async { tx.close_with(store) }).1
7473
})
75-
.await??;
74+
.await?;
7675

7776
// Test dropping and then watching the write end of a stream.
7877
let (tx, mut rx) = instance.stream::<u8>(&mut store)?;
7978
instance
8079
.run_concurrent(&mut store, async |store| {
81-
tx.close_with(store)?;
80+
tx.close_with(store);
8281
rx.watch_writer(store).await;
83-
anyhow::Ok(())
8482
})
85-
.await??;
83+
.await?;
8684

8785
// Test watching and then dropping the read end of a future.
8886
let (mut tx, rx) = instance.future::<u8>(&mut store, || 42)?;
8987
instance
9088
.run_concurrent(&mut store, async |store| {
9189
futures::join!(tx.watch_reader(store), async { rx.close_with(store) }).1
9290
})
93-
.await??;
91+
.await?;
9492

9593
// Test dropping and then watching the read end of a future.
9694
let (mut tx, rx) = instance.future::<u8>(&mut store, || 42)?;
9795
instance
9896
.run_concurrent(&mut store, async |store| {
99-
rx.close_with(store)?;
97+
rx.close_with(store);
10098
tx.watch_reader(store).await;
101-
anyhow::Ok(())
10299
})
103-
.await??;
100+
.await?;
104101

105102
// Test watching and then dropping the write end of a future.
106103
let (tx, mut rx) = instance.future::<u8>(&mut store, || 42)?;
107104
instance
108105
.run_concurrent(&mut store, async |store| {
109106
futures::join!(rx.watch_writer(store), async { tx.close_with(store) }).1
110107
})
111-
.await??;
108+
.await?;
112109

113110
// Test dropping and then watching the write end of a future.
114111
let (tx, mut rx) = instance.future::<u8>(&mut store, || 42)?;
115112
instance
116113
.run_concurrent(&mut store, async |store| {
117-
tx.close_with(store)?;
114+
tx.close_with(store);
118115
rx.watch_writer(store).await;
119-
anyhow::Ok(())
120116
})
121-
.await??;
117+
.await?;
122118

123119
enum Event<'a> {
124120
Write(Option<GuardedStreamWriter<'a, u8, Ctx>>),

crates/wasi/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ test-programs-artifacts = { workspace = true }
4545
tempfile = { workspace = true }
4646
wasmtime = { workspace = true, features = ['cranelift', 'incremental-cache'] }
4747
wasmtime-test-util = { workspace = true }
48+
env_logger = { workspace = true }
4849

4950
[target.'cfg(unix)'.dependencies]
5051
rustix = { workspace = true, features = ["event", "fs", "net"] }

crates/wasi/src/p3/sockets/host/types/tcp.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -414,8 +414,8 @@ impl HostTcpSocketWithStore for WasiSockets {
414414
let (result_tx, result_rx) = instance
415415
.future(&mut view, || Err(ErrorCode::InvalidState))
416416
.context("failed to create future")?;
417-
result_tx.close(&mut view)?;
418-
data_tx.close(&mut view)?;
417+
result_tx.close(&mut view);
418+
data_tx.close(&mut view);
419419
Ok((data_rx, result_rx))
420420
}
421421
}

crates/wasi/tests/all/p3/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ impl wasmtime_wasi::p2::IoView for Ctx {
4646
}
4747

4848
async fn run(path: &str) -> anyhow::Result<()> {
49+
let _ = env_logger::try_init();
4950
let path = Path::new(path);
5051
let engine = test_programs_artifacts::engine(|config| {
5152
config.async_support(true);

crates/wasmtime/src/runtime/component/concurrent.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4308,9 +4308,9 @@ impl ConcurrentState {
43084308
fibers: &mut Vec<StoreFiber<'static>>,
43094309
futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
43104310
) {
4311-
for entry in mem::take(&mut self.table) {
4312-
if let Ok(set) = entry.downcast::<WaitableSet>() {
4313-
for mode in set.waiting.into_values() {
4311+
for entry in self.table.iter_mut() {
4312+
if let Some(set) = entry.downcast_mut::<WaitableSet>() {
4313+
for mode in mem::take(&mut set.waiting).into_values() {
43144314
if let WaitMode::Fiber(fiber) = mode {
43154315
fibers.push(fiber);
43164316
}

crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -407,10 +407,10 @@ pub(super) struct FlatAbi {
407407
/// them) which require access to the store in order to be disposed of properly.
408408
trait DropWithStore: Sized {
409409
/// Dispose of `self` using the specified store.
410-
fn drop(&mut self, store: impl AsContextMut) -> Result<()>;
410+
fn drop(&mut self, store: impl AsContextMut);
411411

412412
/// Dispose of `self` using the specified accessor.
413-
fn drop_with(&mut self, accessor: impl AsAccessor) -> Result<()> {
413+
fn drop_with(&mut self, accessor: impl AsAccessor) {
414414
accessor.as_accessor().with(|store| self.drop(store))
415415
}
416416
}
@@ -521,15 +521,15 @@ impl<T> FutureWriter<T> {
521521
}
522522

523523
/// Close this `FutureWriter`, writing the default value.
524-
pub fn close(mut self, store: impl AsContextMut) -> Result<()>
524+
pub fn close(mut self, store: impl AsContextMut)
525525
where
526526
T: func::Lower + Send + Sync + 'static,
527527
{
528528
self.drop(store)
529529
}
530530

531531
/// Close this `FutureWriter`, writing the default value.
532-
pub fn close_with(mut self, accessor: impl AsAccessor) -> Result<()>
532+
pub fn close_with(mut self, accessor: impl AsAccessor)
533533
where
534534
T: func::Lower + Send + Sync + 'static,
535535
{
@@ -538,12 +538,13 @@ impl<T> FutureWriter<T> {
538538
}
539539

540540
impl<T: func::Lower + Send + Sync + 'static> DropWithStore for FutureWriter<T> {
541-
fn drop(&mut self, mut store: impl AsContextMut) -> Result<()> {
541+
fn drop(&mut self, mut store: impl AsContextMut) {
542542
// `self` should never be used again, but leave an invalid handle there just in case.
543543
let id = mem::replace(&mut self.id, TableId::new(0));
544544
let default = self.default;
545545
self.instance
546546
.host_drop_writer(store.as_context_mut(), id, Some(&move || Ok(default())))
547+
.unwrap()
547548
}
548549
}
549550

@@ -714,25 +715,27 @@ impl<T> FutureReader<T> {
714715
}
715716

716717
/// Close this `FutureReader`.
717-
pub fn close(mut self, store: impl AsContextMut) -> Result<()> {
718+
pub fn close(mut self, store: impl AsContextMut) {
718719
self.drop(store)
719720
}
720721

721722
/// Close this `FutureReader`.
722-
pub fn close_with(mut self, accessor: impl AsAccessor) -> Result<()> {
723+
pub fn close_with(mut self, accessor: impl AsAccessor) {
723724
accessor.as_accessor().with(|access| self.drop(access))
724725
}
725726
}
726727

727728
impl<T> DropWithStore for FutureReader<T> {
728-
fn drop(&mut self, mut store: impl AsContextMut) -> Result<()> {
729+
fn drop(&mut self, mut store: impl AsContextMut) {
729730
// `self` should never be used again, but leave an invalid handle there just in case.
730731
let id = mem::replace(&mut self.id, TableId::new(0));
731-
self.instance.host_drop_reader(
732-
store.as_context_mut().0.traitobj_mut(),
733-
id,
734-
TransmitKind::Future,
735-
)
732+
self.instance
733+
.host_drop_reader(
734+
store.as_context_mut().0.traitobj_mut(),
735+
id,
736+
TransmitKind::Future,
737+
)
738+
.unwrap()
736739
}
737740
}
738741

@@ -980,22 +983,23 @@ impl<T> StreamWriter<T> {
980983
}
981984

982985
/// Close this `StreamWriter`.
983-
pub fn close(mut self, store: impl AsContextMut) -> Result<()> {
986+
pub fn close(mut self, store: impl AsContextMut) {
984987
self.drop(store)
985988
}
986989

987990
/// Close this `StreamWriter`.
988-
pub fn close_with(mut self, accessor: impl AsAccessor) -> Result<()> {
991+
pub fn close_with(mut self, accessor: impl AsAccessor) {
989992
accessor.as_accessor().with(|access| self.drop(access))
990993
}
991994
}
992995

993996
impl<T> DropWithStore for StreamWriter<T> {
994-
fn drop(&mut self, mut store: impl AsContextMut) -> Result<()> {
997+
fn drop(&mut self, mut store: impl AsContextMut) {
995998
// `self` should never be used again, but leave an invalid handle there just in case.
996999
let id = mem::replace(&mut self.id, TableId::new(0));
9971000
self.instance
9981001
.host_drop_writer(store.as_context_mut(), id, None::<&dyn Fn() -> Result<()>>)
1002+
.unwrap()
9991003
}
10001004
}
10011005

@@ -1182,25 +1186,27 @@ impl<T> StreamReader<T> {
11821186
}
11831187

11841188
/// Close this `StreamReader`.
1185-
pub fn close(mut self, store: impl AsContextMut) -> Result<()> {
1189+
pub fn close(mut self, store: impl AsContextMut) {
11861190
self.drop(store)
11871191
}
11881192

11891193
/// Close this `StreamReader`.
1190-
pub fn close_with(mut self, accessor: impl AsAccessor) -> Result<()> {
1194+
pub fn close_with(mut self, accessor: impl AsAccessor) {
11911195
accessor.as_accessor().with(|access| self.drop(access))
11921196
}
11931197
}
11941198

11951199
impl<T> DropWithStore for StreamReader<T> {
1196-
fn drop(&mut self, mut store: impl AsContextMut) -> Result<()> {
1200+
fn drop(&mut self, mut store: impl AsContextMut) {
11971201
// `self` should never be used again, but leave an invalid handle there just in case.
11981202
let id = mem::replace(&mut self.id, TableId::new(0));
1199-
self.instance.host_drop_reader(
1200-
store.as_context_mut().0.traitobj_mut(),
1201-
id,
1202-
TransmitKind::Stream,
1203-
)
1203+
self.instance
1204+
.host_drop_reader(
1205+
store.as_context_mut().0.traitobj_mut(),
1206+
id,
1207+
TransmitKind::Stream,
1208+
)
1209+
.unwrap()
12041210
}
12051211
}
12061212

crates/wasmtime/src/runtime/component/concurrent/table.rs

Lines changed: 6 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use std::collections::BTreeSet;
1313
use std::fmt;
1414
use std::hash::{Hash, Hasher};
1515
use std::marker::PhantomData;
16-
use std::vec::{self, Vec};
16+
use std::vec::Vec;
1717

1818
pub struct TableId<T> {
1919
rep: u32,
@@ -356,32 +356,12 @@ impl Table {
356356
}
357357
Ok(e)
358358
}
359-
}
360-
361-
pub struct TableIterator(vec::IntoIter<Entry>);
362-
363-
impl Iterator for TableIterator {
364-
type Item = Box<dyn Any + Send + Sync>;
365359

366-
fn next(&mut self) -> Option<Self::Item> {
367-
loop {
368-
if let Some(entry) = self.0.next() {
369-
if let Entry::Occupied { entry } = entry {
370-
break Some(entry.entry);
371-
}
372-
} else {
373-
break None;
374-
}
375-
}
376-
}
377-
}
378-
379-
impl IntoIterator for Table {
380-
type Item = Box<dyn Any + Send + Sync>;
381-
type IntoIter = TableIterator;
382-
383-
fn into_iter(self) -> TableIterator {
384-
TableIterator(self.entries.into_iter())
360+
pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut (dyn Any + Send + Sync)> {
361+
self.entries.iter_mut().filter_map(|entry| match entry {
362+
Entry::Occupied { entry } => Some(&mut *entry.entry),
363+
Entry::Free { .. } => None,
364+
})
385365
}
386366
}
387367

0 commit comments

Comments
 (0)