From d680050924783e2e6bc9f50e5486916bbb64c9b8 Mon Sep 17 00:00:00 2001 From: Joel Dice Date: Wed, 11 Jun 2025 13:05:36 -0600 Subject: [PATCH] implement new `stream`/`future` trap conditions As of https://github.com/WebAssembly/component-model/pull/524, we now trap if a guest tries to use a stream or future handle (besides dropping it) after it is "done" (see that PR for what "done" means). This required more careful tracking of the state of each end of the stream or future. In the process of debugging this into shape, I added some trace logging and addressed a couple of issues I noticed: - `wasi-http`'s `Response::into_http` wasn't noticing a non-empty buffer at the end of a stream - the `p3_http_middleware` test was not clearing its buffer before each read Signed-off-by: Joel Dice --- Cargo.lock | 12 +- Cargo.toml | 9 +- .../src/bin/p3_http_middleware.rs | 1 + crates/wasi-http/Cargo.toml | 1 + crates/wasi-http/src/p3/response.rs | 14 +- crates/wasi-http/tests/all/p3/outgoing.rs | 2 + crates/wasi-http/tests/all/p3/proxy.rs | 2 + crates/wasi/Cargo.toml | 1 + crates/wasi/tests/all/p3/mod.rs | 2 + .../src/runtime/component/concurrent.rs | 13 +- .../concurrent/futures_and_streams.rs | 292 ++++++++--- crates/wasmtime/src/runtime/func.rs | 2 +- .../component-model-async/trap-if-done.wast | 473 ++++++++++++++++++ 13 files changed, 720 insertions(+), 104 deletions(-) create mode 100644 tests/misc_testsuite/component-model-async/trap-if-done.wast diff --git a/Cargo.lock b/Cargo.lock index a6febf9af8..ccef9a39de 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4755,6 +4755,7 @@ dependencies = [ "cap-rand", "cap-std", "cap-time-ext", + "env_logger 0.11.5", "fs-set-times", "futures", "io-extras", @@ -4795,6 +4796,7 @@ dependencies = [ "async-trait", "base64", "bytes", + "env_logger 0.11.5", "flate2", "futures", "http", @@ -5217,7 +5219,7 @@ dependencies = [ [[package]] name = "wit-bindgen" version = "0.42.1" -source = "git+https://github.com/bytecodealliance/wit-bindgen#13b0ab0338268e218134c52511ac69b113895849" +source = "git+https://github.com/dicej/wit-bindgen?branch=no-reads-writes-after-dropped#21a41990e1edfa11e313f254d57d4db9340e25a7" dependencies = [ "wit-bindgen-rt 0.42.1", "wit-bindgen-rust-macro", @@ -5226,7 +5228,7 @@ dependencies = [ [[package]] name = "wit-bindgen-core" version = "0.42.1" -source = "git+https://github.com/bytecodealliance/wit-bindgen#13b0ab0338268e218134c52511ac69b113895849" +source = "git+https://github.com/dicej/wit-bindgen?branch=no-reads-writes-after-dropped#21a41990e1edfa11e313f254d57d4db9340e25a7" dependencies = [ "anyhow", "heck 0.5.0", @@ -5254,7 +5256,7 @@ dependencies = [ [[package]] name = "wit-bindgen-rt" version = "0.42.1" -source = "git+https://github.com/bytecodealliance/wit-bindgen#13b0ab0338268e218134c52511ac69b113895849" +source = "git+https://github.com/dicej/wit-bindgen?branch=no-reads-writes-after-dropped#21a41990e1edfa11e313f254d57d4db9340e25a7" dependencies = [ "bitflags 2.6.0", "futures", @@ -5264,7 +5266,7 @@ dependencies = [ [[package]] name = "wit-bindgen-rust" version = "0.42.1" -source = "git+https://github.com/bytecodealliance/wit-bindgen#13b0ab0338268e218134c52511ac69b113895849" +source = "git+https://github.com/dicej/wit-bindgen?branch=no-reads-writes-after-dropped#21a41990e1edfa11e313f254d57d4db9340e25a7" dependencies = [ "anyhow", "heck 0.5.0", @@ -5279,7 +5281,7 @@ dependencies = [ [[package]] name = "wit-bindgen-rust-macro" version = "0.42.1" -source = "git+https://github.com/bytecodealliance/wit-bindgen#13b0ab0338268e218134c52511ac69b113895849" +source = "git+https://github.com/dicej/wit-bindgen?branch=no-reads-writes-after-dropped#21a41990e1edfa11e313f254d57d4db9340e25a7" dependencies = [ "anyhow", "prettyplease", diff --git a/Cargo.toml b/Cargo.toml index c419cd6a68..30ce6b67c7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -631,9 +631,12 @@ lto = true # wasm-wave = { git = "https://github.com/bytecodealliance/wasm-tools" } # wasm-compose = { git = "https://github.com/bytecodealliance/wasm-tools" } # wasm-metadata = { git = "https://github.com/bytecodealliance/wasm-tools" } -wit-bindgen = { git = "https://github.com/bytecodealliance/wit-bindgen" } -wit-bindgen-rt = { git = "https://github.com/bytecodealliance/wit-bindgen" } -wit-bindgen-rust-macro = { git = "https://github.com/bytecodealliance/wit-bindgen" } +# wit-bindgen = { git = "https://github.com/bytecodealliance/wit-bindgen" } +# wit-bindgen-rt = { git = "https://github.com/bytecodealliance/wit-bindgen" } +# wit-bindgen-rust-macro = { git = "https://github.com/bytecodealliance/wit-bindgen" } +wit-bindgen = { git = "https://github.com/dicej/wit-bindgen", branch = "no-reads-writes-after-dropped" } +wit-bindgen-rt = { git = "https://github.com/dicej/wit-bindgen", branch = "no-reads-writes-after-dropped" } +wit-bindgen-rust-macro = { git = "https://github.com/dicej/wit-bindgen", branch = "no-reads-writes-after-dropped" } # wasmparser = { path = '../wasm-tools/crates/wasmparser' } # wat = { path = '../wasm-tools/crates/wat' } diff --git a/crates/test-programs/src/bin/p3_http_middleware.rs b/crates/test-programs/src/bin/p3_http_middleware.rs index e71af92242..ca066aa519 100644 --- a/crates/test-programs/src/bin/p3_http_middleware.rs +++ b/crates/test-programs/src/bin/p3_http_middleware.rs @@ -66,6 +66,7 @@ impl Handler for Component { let remaining = pipe_tx.write_all(mem::take(decoder.get_mut())).await; assert!(remaining.is_empty()); *decoder.get_mut() = remaining; + chunk.clear(); (status, chunk) = body.read(chunk).await; } diff --git a/crates/wasi-http/Cargo.toml b/crates/wasi-http/Cargo.toml index a8c4d28b5e..eeedda8f1a 100644 --- a/crates/wasi-http/Cargo.toml +++ b/crates/wasi-http/Cargo.toml @@ -34,6 +34,7 @@ rustls = { workspace = true } webpki-roots = { workspace = true } [dev-dependencies] +env_logger = { workspace = true } test-programs-artifacts = { workspace = true } test-log = { workspace = true } tracing-subscriber = { workspace = true } diff --git a/crates/wasi-http/src/p3/response.rs b/crates/wasi-http/src/p3/response.rs index 5f93280a96..b6e98747d3 100644 --- a/crates/wasi-http/src/p3/response.rs +++ b/crates/wasi-http/src/p3/response.rs @@ -214,14 +214,14 @@ impl Response { let fut = async move { loop { let (tail, mut rx_buffer) = contents.await; - if let Some(tail) = tail { - let buffer = rx_buffer.split(); - if !buffer.is_empty() { - if let Err(..) = contents_tx.send(buffer.freeze()).await { - break; - } - rx_buffer.reserve(DEFAULT_BUFFER_CAPACITY); + let buffer = rx_buffer.split(); + if !buffer.is_empty() { + if let Err(..) = contents_tx.send(buffer.freeze()).await { + break; } + rx_buffer.reserve(DEFAULT_BUFFER_CAPACITY); + } + if let Some(tail) = tail { contents = tail.read(rx_buffer).boxed(); } else { debug_assert!(rx_buffer.is_empty()); diff --git a/crates/wasi-http/tests/all/p3/outgoing.rs b/crates/wasi-http/tests/all/p3/outgoing.rs index dffa2071f1..5f9d5b22b8 100644 --- a/crates/wasi-http/tests/all/p3/outgoing.rs +++ b/crates/wasi-http/tests/all/p3/outgoing.rs @@ -9,6 +9,8 @@ foreach_p3_http!(assert_test_exists); use super::proxy::{p3_http_echo, p3_http_middleware, p3_http_middleware_with_chain}; async fn run(path: &str, server: &Server) -> anyhow::Result<()> { + _ = env_logger::try_init(); + let engine = test_programs_artifacts::engine(|config| { config.wasm_backtrace_details(wasmtime::WasmBacktraceDetails::Enable); config.async_support(true); diff --git a/crates/wasi-http/tests/all/p3/proxy.rs b/crates/wasi-http/tests/all/p3/proxy.rs index 3c8601ab46..6af11a5dec 100644 --- a/crates/wasi-http/tests/all/p3/proxy.rs +++ b/crates/wasi-http/tests/all/p3/proxy.rs @@ -106,6 +106,8 @@ pub async fn p3_http_middleware_with_chain() -> Result<()> { } async fn test_http_echo(component: &str, use_compression: bool) -> Result<()> { + _ = env_logger::try_init(); + let body = b"And the mome raths outgrabe"; // Prepare the raw body, optionally compressed if that's what we're diff --git a/crates/wasi/Cargo.toml b/crates/wasi/Cargo.toml index 1e669c1e92..ba06f3db09 100644 --- a/crates/wasi/Cargo.toml +++ b/crates/wasi/Cargo.toml @@ -38,6 +38,7 @@ futures = { workspace = true } url = { workspace = true } [dev-dependencies] +env_logger = { workspace = true } tokio = { workspace = true, features = ["time", "sync", "io-std", "io-util", "rt", "rt-multi-thread", "net", "macros", "fs"] } test-log = { workspace = true } tracing-subscriber = { workspace = true } diff --git a/crates/wasi/tests/all/p3/mod.rs b/crates/wasi/tests/all/p3/mod.rs index a6f8fdf64f..d2a14f9008 100644 --- a/crates/wasi/tests/all/p3/mod.rs +++ b/crates/wasi/tests/all/p3/mod.rs @@ -104,6 +104,8 @@ impl WasiSocketsView for Ctx { } async fn run(path: &str) -> anyhow::Result<()> { + _ = env_logger::try_init(); + let path = Path::new(path); let engine = test_programs_artifacts::engine(|config| { config.async_support(true); diff --git a/crates/wasmtime/src/runtime/component/concurrent.rs b/crates/wasmtime/src/runtime/component/concurrent.rs index ababc50716..e63510552e 100644 --- a/crates/wasmtime/src/runtime/component/concurrent.rs +++ b/crates/wasmtime/src/runtime/component/concurrent.rs @@ -4115,18 +4115,18 @@ impl Waitable { assert_eq!(rep, self.rep()); assert_eq!(*state, StreamFutureState::Busy); *state = match event { - Event::FutureRead { .. } => StreamFutureState::Read, - Event::FutureWrite { .. } => StreamFutureState::Write, + Event::FutureRead { .. } => StreamFutureState::Read { done: false }, + Event::FutureWrite { .. } => StreamFutureState::Write { done: false }, _ => unreachable!(), }; } Event::StreamRead { pending: Some((ty, handle)), - .. + code, } | Event::StreamWrite { pending: Some((ty, handle)), - .. + code, } => { let runtime_instance = instance.component().types()[ty].instance; let (rep, WaitableState::Stream(actual_ty, state)) = instance.waitable_tables() @@ -4139,9 +4139,10 @@ impl Waitable { assert_eq!(*actual_ty, ty); assert_eq!(rep, self.rep()); assert_eq!(*state, StreamFutureState::Busy); + let done = matches!(code, ReturnCode::Dropped(_)); *state = match event { - Event::StreamRead { .. } => StreamFutureState::Read, - Event::StreamWrite { .. } => StreamFutureState::Write, + Event::StreamRead { .. } => StreamFutureState::Read { done }, + Event::StreamWrite { .. } => StreamFutureState::Write { done }, _ => unreachable!(), }; } diff --git a/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs b/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs index d3083049fd..8b52290981 100644 --- a/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs +++ b/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs @@ -25,6 +25,7 @@ use { }, std::{ boxed::Box, + fmt, future::Future, iter, marker::PhantomData, @@ -338,10 +339,20 @@ fn accept_writer, U>( /// given component instance. #[derive(Debug, Eq, PartialEq)] pub(super) enum StreamFutureState { - /// Only the write end is owned by this component instance. - Write, - /// Only the read end is owned by this component instance. - Read, + /// The write end of the stream or future. + Write { + /// Whether the component instance has been notified that the stream or + /// future is "done" (i.e. the other end has dropped, or, in the case of + /// a future, a value has been transmitted). + done: bool, + }, + /// The read end of the stream or future. + Read { + /// Whether the component instance has been notified that the stream or + /// future is "done" (i.e. the other end has dropped, or, in the case of + /// a future, a value has been transmitted). + done: bool, + }, /// A read or write is in progress. Busy, } @@ -638,13 +649,22 @@ impl HostFuture { get_mut_by_index_from(state_table, TableIndex::Future(src), index)?; match state { - StreamFutureState::Read => { + StreamFutureState::Read { .. } => { state_table.remove_by_index(index)?; } - StreamFutureState::Write => bail!("cannot transfer write end of future"), + StreamFutureState::Write { .. } => bail!("cannot transfer write end of future"), StreamFutureState::Busy => bail!("cannot transfer busy future"), } + let state = cx + .instance_mut() + .get(TableId::::new(rep))? + .state; + + if cx.instance_mut().get(state)?.done { + bail!("cannot lift future after previous read succeeded"); + } + Ok(Self::new(rep, cx.instance_handle())) } _ => func::bad_type_info(), @@ -668,7 +688,10 @@ pub(crate) fn lower_future_to_index( cx.instance_mut() .state_table(TableIndex::Future(dst)) - .insert(rep, WaitableState::Future(dst, StreamFutureState::Read)) + .insert( + rep, + WaitableState::Future(dst, StreamFutureState::Read { done: false }), + ) } _ => func::bad_type_info(), } @@ -1004,10 +1027,13 @@ impl HostStream { get_mut_by_index_from(state_table, TableIndex::Stream(src), index)?; match state { - StreamFutureState::Read => { + StreamFutureState::Read { done: true } => bail!( + "cannot lift stream after being notified that the writable end dropped" + ), + StreamFutureState::Read { done: false } => { state_table.remove_by_index(index)?; } - StreamFutureState::Write => bail!("cannot transfer write end of stream"), + StreamFutureState::Write { .. } => bail!("cannot transfer write end of stream"), StreamFutureState::Busy => bail!("cannot transfer busy stream"), } @@ -1034,7 +1060,10 @@ pub(crate) fn lower_stream_to_index( cx.instance_mut() .state_table(TableIndex::Stream(dst)) - .insert(rep, WaitableState::Stream(dst, StreamFutureState::Read)) + .insert( + rep, + WaitableState::Stream(dst, StreamFutureState::Read { done: false }), + ) } _ => func::bad_type_info(), } @@ -1331,8 +1360,8 @@ struct TransmitState { writer_watcher: Option>, /// Like `writer_watcher`, but for the reverse direction. reader_watcher: Option>, - /// Whether the write end may be dropped or not. - may_drop_writer: bool, + /// Whether futher values may be transmitted via this stream or future. + done: bool, } impl Default for TransmitState { @@ -1344,7 +1373,7 @@ impl Default for TransmitState { write: WriteState::Open, reader_watcher: None, writer_watcher: None, - may_drop_writer: true, + done: false, } } } @@ -1379,6 +1408,17 @@ enum WriteState { Dropped, } +impl fmt::Debug for WriteState { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Self::Open => f.debug_tuple("Open").finish(), + Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(), + Self::HostReady { .. } => f.debug_tuple("HostReady").finish(), + Self::Dropped => f.debug_tuple("Dropped").finish(), + } + } +} + /// Represents the state of the read end of a stream or future. enum ReadState { /// The read end is open, but no read is pending. @@ -1400,6 +1440,17 @@ enum ReadState { Dropped, } +impl fmt::Debug for ReadState { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Self::Open => f.debug_tuple("Open").finish(), + Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(), + Self::HostReady { .. } => f.debug_tuple("HostReady").finish(), + Self::Dropped => f.debug_tuple("Dropped").finish(), + } + } +} + /// Parameter type to pass to a `ReadState::HostReady` closure. /// /// See also `accept_writer`. @@ -1462,7 +1513,7 @@ impl Instance { default: fn() -> T, mut store: S, ) -> Result<(FutureWriter, FutureReader)> { - let (write, read) = store.as_context_mut()[self.id()].new_transmit(TransmitKind::Future)?; + let (write, read) = store.as_context_mut()[self.id()].new_transmit()?; Ok(( FutureWriter::new( @@ -1498,7 +1549,7 @@ impl Instance { self, mut store: S, ) -> Result<(StreamWriter, StreamReader)> { - let (write, read) = store.as_context_mut()[self.id()].new_transmit(TransmitKind::Stream)?; + let (write, read) = store.as_context_mut()[self.id()].new_transmit()?; Ok(( StreamWriter::new( @@ -1694,7 +1745,7 @@ impl Instance { let transmit = store[self.id()] .get_mut(transmit_id) .with_context(|| format!("retrieving state for transmit [{transmit_rep}]"))?; - transmit.may_drop_writer = true; + log::trace!("host_write state {transmit_id:?}; {:?}", transmit.read); let new_state = if let ReadState::Dropped = &transmit.read { ReadState::Dropped @@ -1728,6 +1779,10 @@ impl Instance { handle, .. } => { + if let TransmitKind::Future = kind { + transmit.done = true; + } + let read_handle = transmit.read_handle; let code = accept_reader::(store.as_context_mut(), buffer, tx, kind)( store.0.traitobj_mut(), @@ -1807,6 +1862,7 @@ impl Instance { let transmit = store[self.id()] .get_mut(transmit_id) .with_context(|| rep.to_string())?; + log::trace!("host_read state {transmit_id:?}; {:?}", transmit.write); let new_state = if let WriteState::Dropped = &transmit.write { WriteState::Dropped @@ -1833,6 +1889,10 @@ impl Instance { post_write, .. } => { + if let TableIndex::Future(_) = ty { + transmit.done = true; + } + let write_handle = transmit.write_handle; let types = store[self.id()].component().types().clone(); let lift = @@ -1917,6 +1977,11 @@ impl Instance { let transmit = instance .get_mut(transmit_id) .with_context(|| format!("error closing reader {transmit_rep}"))?; + log::trace!( + "host_drop_reader state {transmit_id:?}; read state {:?} write state {:?}", + transmit.read, + transmit.write + ); transmit.read = ReadState::Dropped; transmit.reader_watcher = None; @@ -2167,19 +2232,30 @@ impl Instance { }; let instance = &mut store[self.id()]; let (rep, state) = instance.get_mut_by_index(ty, handle)?; - let StreamFutureState::Write = *state else { + let StreamFutureState::Write { done } = *state else { bail!( - "invalid handle {handle}; expected {:?}; got {:?}", - StreamFutureState::Write, + "invalid handle {handle}; expected `Write`; got {:?}", *state ); }; + + if done { + bail!("cannot write to stream after being notified that the readable end dropped"); + } + *state = StreamFutureState::Busy; let transmit_handle = TableId::::new(rep); let transmit_id = instance.get(transmit_handle)?.state; - log::trace!("guest_write {transmit_handle:?} (handle {handle}; state {transmit_id:?})",); let transmit = instance.get_mut(transmit_id)?; - transmit.may_drop_writer = true; + log::trace!( + "guest_write {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}", + transmit.read + ); + + if transmit.done { + bail!("cannot write to future after previous write succeeded or readable end dropped"); + } + let new_state = if let ReadState::Dropped = &transmit.read { ReadState::Dropped } else { @@ -2212,6 +2288,10 @@ impl Instance { } => { assert_eq!(flat_abi, read_flat_abi); + if let TableIndex::Future(_) = ty { + transmit.done = true; + } + // Note that zero-length reads and writes are handling specially // by the spec to allow each end to signal readiness to the // other. Quoting the spec: @@ -2309,6 +2389,10 @@ impl Instance { } ReadState::HostReady { accept } => { + if let TableIndex::Future(_) = ty { + transmit.done = true; + } + let types = instance.component().types().clone(); let lift = &mut LiftContext::new(store.0.store_opaque_mut(), &options, &types, self); @@ -2325,11 +2409,22 @@ impl Instance { ReturnCode::Blocked } - ReadState::Dropped => ReturnCode::Dropped(0), + ReadState::Dropped => { + if let TableIndex::Future(_) = ty { + transmit.done = true; + } + + ReturnCode::Dropped(0) + } }; if result != ReturnCode::Blocked { - *store[self.id()].get_mut_by_index(ty, handle)?.1 = StreamFutureState::Write; + *store[self.id()].get_mut_by_index(ty, handle)?.1 = StreamFutureState::Write { + done: matches!( + (result, ty), + (ReturnCode::Dropped(_), TableIndex::Stream(_)) + ), + }; } Ok(result) @@ -2371,18 +2466,27 @@ impl Instance { }; let instance = &mut store[self.id()]; let (rep, state) = instance.get_mut_by_index(ty, handle)?; - let StreamFutureState::Read = *state else { - bail!( - "invalid handle {handle}; expected {:?}; got {:?}", - StreamFutureState::Read, - *state - ); + let StreamFutureState::Read { done } = *state else { + bail!("invalid handle {handle}; expected `Read`; got {:?}", *state); }; + + if done { + bail!("cannot read from stream after being notified that the writable end dropped"); + } + *state = StreamFutureState::Busy; let transmit_handle = TableId::::new(rep); let transmit_id = instance.get(transmit_handle)?.state; - log::trace!("guest_read {transmit_handle:?} (handle {handle}; state {transmit_id:?})",); let transmit = instance.get_mut(transmit_id)?; + log::trace!( + "guest_read {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}", + transmit.write + ); + + if transmit.done { + bail!("cannot read from future after previous read succeeded"); + } + let new_state = if let WriteState::Dropped = &transmit.write { WriteState::Dropped } else { @@ -2415,6 +2519,10 @@ impl Instance { } => { assert_eq!(flat_abi, write_flat_abi); + if let TableIndex::Future(_) = ty { + transmit.done = true; + } + let write_handle_rep = transmit.write_handle.rep(); // See the comment in `guest_write` for the @@ -2505,6 +2613,10 @@ impl Instance { } WriteState::HostReady { accept, post_write } => { + if let TableIndex::Future(_) = ty { + transmit.done = true; + } + let code = accept( store.0.traitobj_mut(), self, @@ -2532,7 +2644,12 @@ impl Instance { }; if result != ReturnCode::Blocked { - *store[self.id()].get_mut_by_index(ty, handle)?.1 = StreamFutureState::Read; + *store[self.id()].get_mut_by_index(ty, handle)?.1 = StreamFutureState::Read { + done: matches!( + (result, ty), + (ReturnCode::Dropped(_), TableIndex::Stream(_)) + ), + }; } Ok(result) @@ -2555,8 +2672,8 @@ impl Instance { } }; match state { - StreamFutureState::Read => {} - StreamFutureState::Write => { + StreamFutureState::Read { .. } => {} + StreamFutureState::Write { .. } => { bail!("passed write end to `{{stream|future}}.drop-readable`") } StreamFutureState::Busy => bail!("cannot drop busy stream or future"), @@ -2834,10 +2951,7 @@ impl ComponentInstance { /// Allocate a new future or stream, including the `TransmitState` and the /// `TransmitHandle`s corresponding to the read and write ends. - fn new_transmit( - &mut self, - kind: TransmitKind, - ) -> Result<(TableId, TableId)> { + fn new_transmit(&mut self) -> Result<(TableId, TableId)> { let state_id = self.push(TransmitState::default())?; let write = self.push(TransmitHandle::new(state_id))?; @@ -2847,10 +2961,6 @@ impl ComponentInstance { state.write_handle = write; state.read_handle = read; - if let TransmitKind::Future = kind { - state.may_drop_writer = false; - } - log::trace!("new transmit: state {state_id:?}; write {write:?}; read {read:?}",); Ok((write, read)) @@ -2883,16 +2993,15 @@ impl ComponentInstance { /// write ends to the (sub-)component instance to which the specified /// `TableIndex` belongs. fn guest_new(&mut self, ty: TableIndex) -> Result { - let (write, read) = self.new_transmit(match ty { - TableIndex::Future(_) => TransmitKind::Future, - TableIndex::Stream(_) => TransmitKind::Stream, - })?; - let read = self - .state_table(ty) - .insert(read.rep(), waitable_state(ty, StreamFutureState::Read))?; - let write = self - .state_table(ty) - .insert(write.rep(), waitable_state(ty, StreamFutureState::Write))?; + let (write, read) = self.new_transmit()?; + let read = self.state_table(ty).insert( + read.rep(), + waitable_state(ty, StreamFutureState::Read { done: false }), + )?; + let write = self.state_table(ty).insert( + write.rep(), + waitable_state(ty, StreamFutureState::Write { done: false }), + )?; Ok(ResourcePair { write, read }) } @@ -2901,10 +3010,14 @@ impl ComponentInstance { /// # Arguments /// /// * `rep` - The `TransmitState` rep for the stream or future. - /// * `kind` - Whether `rep` is for a stream or a future. - fn host_cancel_write(&mut self, rep: u32, kind: TransmitKind) -> Result { + fn host_cancel_write(&mut self, rep: u32) -> Result { let transmit_id = TableId::::new(rep); let transmit = self.get_mut(transmit_id)?; + log::trace!( + "host_cancel_write state {transmit_id:?}; write state {:?} read state {:?}", + transmit.read, + transmit.write + ); let code = if let Some(event) = Waitable::Transmit(transmit.write_handle).take_event(self)? @@ -2935,10 +3048,6 @@ impl ComponentInstance { log::trace!("cancelled write {transmit_id:?}"); - if let (TransmitKind::Future, ReturnCode::Cancelled(0)) = (kind, code) { - transmit.may_drop_writer = false; - } - Ok(code) } @@ -2950,6 +3059,11 @@ impl ComponentInstance { fn host_cancel_read(&mut self, rep: u32) -> Result { let transmit_id = TableId::::new(rep); let transmit = self.get_mut(transmit_id)?; + log::trace!( + "host_cancel_read state {transmit_id:?}; read state {:?} write state {:?}", + transmit.read, + transmit.write + ); let code = if let Some(event) = Waitable::Transmit(transmit.read_handle).take_event(self)? { let (Event::FutureRead { code, .. } | Event::StreamRead { code, .. }) = event else { @@ -2991,10 +3105,11 @@ impl ComponentInstance { let transmit = self .get_mut(transmit_id) .with_context(|| format!("error closing writer {transmit_rep}"))?; - - if !transmit.may_drop_writer { - bail!("cannot drop future write end without first writing a value") - } + log::trace!( + "host_drop_writer state {transmit_id:?}; write state {:?} read state {:?}", + transmit.read, + transmit.write + ); transmit.writer_watcher = None; @@ -3007,6 +3122,13 @@ impl ComponentInstance { *post_write = PostWrite::Drop; } v @ WriteState::Open => { + if let (TransmitKind::Future, false) = ( + kind, + transmit.done || matches!(transmit.read, ReadState::Dropped), + ) { + bail!("cannot drop future write end without first writing a value") + } + *v = WriteState::Dropped; } WriteState::Dropped => unreachable!("write state is already dropped"), @@ -3087,27 +3209,26 @@ impl ComponentInstance { writer: u32, _async_: bool, ) -> Result { - let (rep, state) = self.state_table(ty).get_mut_by_index(writer)?; - let (state, kind) = match state { - WaitableState::Stream(_, state) => (state, TransmitKind::Stream), - WaitableState::Future(_, state) => (state, TransmitKind::Future), - _ => bail!("invalid stream or future handle"), + let (rep, WaitableState::Stream(_, state) | WaitableState::Future(_, state)) = + self.state_table(ty).get_mut_by_index(writer)? + else { + bail!("invalid stream or future handle"); }; let id = TableId::::new(rep); log::trace!("guest cancel write {id:?} (handle {writer})"); match state { - StreamFutureState::Write => { + StreamFutureState::Write { .. } => { bail!("stream or future write cancelled when no write is pending") } - StreamFutureState::Read => { + StreamFutureState::Read { .. } => { bail!("passed read end to `{{stream|future}}.cancel-write`") } StreamFutureState::Busy => { - *state = StreamFutureState::Write; + *state = StreamFutureState::Write { done: false }; } } let rep = self.get(id)?.state.rep(); - self.host_cancel_write(rep, kind) + self.host_cancel_write(rep) } /// Cancel a pending read for the specified stream or future from the guest. @@ -3125,14 +3246,14 @@ impl ComponentInstance { let id = TableId::::new(rep); log::trace!("guest cancel read {id:?} (handle {reader})"); match state { - StreamFutureState::Read => { + StreamFutureState::Read { .. } => { bail!("stream or future read cancelled when no read is pending") } - StreamFutureState::Write => { + StreamFutureState::Write { .. } => { bail!("passed write end to `{{stream|future}}.cancel-read`") } StreamFutureState::Busy => { - *state = StreamFutureState::Read; + *state = StreamFutureState::Read { done: false }; } } let rep = self.get(id)?.state.rep(); @@ -3153,17 +3274,16 @@ impl ComponentInstance { } }; match state { - StreamFutureState::Write => {} - StreamFutureState::Read => { + StreamFutureState::Write { .. } => {} + StreamFutureState::Read { .. } => { bail!("passed read end to `{{stream|future}}.drop-writable`") } StreamFutureState::Busy => bail!("cannot drop busy stream or future"), } - let transmit_rep = self - .get(TableId::::new(transmit_rep))? - .state - .rep(); + let id = TableId::::new(transmit_rep); + let transmit_rep = self.get(id)?.state.rep(); + log::trace!("guest_drop_writable: drop writer {id:?}"); self.host_drop_writer(transmit_rep, kind) } @@ -3241,13 +3361,21 @@ impl ComponentInstance { let (_, src_state) = match_state(src_state)?; match src_state { - StreamFutureState::Read => { + StreamFutureState::Read { done: true } => { + bail!("cannot lift stream after being notified that the writable end dropped") + } + StreamFutureState::Read { done: false } => { src_table.remove_by_index(src_idx)?; let dst_table = &mut self.waitable_tables()[dst_instance]; - dst_table.insert(rep, make_state(dst, StreamFutureState::Read)) + dst_table.insert( + rep, + make_state(dst, StreamFutureState::Read { done: false }), + ) + } + StreamFutureState::Write { .. } => { + bail!("cannot transfer write end of stream or future") } - StreamFutureState::Write => bail!("cannot transfer write end of stream or future"), StreamFutureState::Busy => bail!("cannot transfer busy stream or future"), } } diff --git a/crates/wasmtime/src/runtime/func.rs b/crates/wasmtime/src/runtime/func.rs index aff565c30e..3c4b07a5dd 100644 --- a/crates/wasmtime/src/runtime/func.rs +++ b/crates/wasmtime/src/runtime/func.rs @@ -1706,7 +1706,7 @@ impl EntryStoreContext { /// This function restores the values stored in this struct. We invoke this /// function through this type's `Drop` implementation. This ensures that we /// even restore the values if we unwind the stack (e.g., because we are - /// panicing out of a Wasm execution). + /// panicking out of a Wasm execution). #[inline] fn exit_wasm(&mut self) { unsafe { diff --git a/tests/misc_testsuite/component-model-async/trap-if-done.wast b/tests/misc_testsuite/component-model-async/trap-if-done.wast new file mode 100644 index 0000000000..b8c832b052 --- /dev/null +++ b/tests/misc_testsuite/component-model-async/trap-if-done.wast @@ -0,0 +1,473 @@ +;;! component_model_async = true + +;; This test has two components $C and $D, where $D imports and calls $C. +;; $C contains utility functions used by $D to create futures/streams, +;; write to them and close them. $D uses these utility functions to test for +;; all the cases where, once a future/stream is "done", further uses of the +;; future/stream trap. +;; +;; $D exports a list of functions, one for each case of trapping. Since traps +;; take out their containing instance, a fresh instance of $Tester is created +;; for each call to a $D export. +;; +;; When testing traps involving the readable end, the exports of $D take a +;; "bool" parameter that toggles whether the trap is triggered by +;; {stream,future}.{read,write} or by lifting, and the top-level commands +;; pass 'false' and 'true'. +;; +;; (Copied from +;; https://github.com/WebAssembly/component-model/blob/fix-future/test/async/trap-if-done.wast) +(component definition $Tester + (component $C + (core module $Memory (memory (export "mem") 1)) + (core instance $memory (instantiate $Memory)) + (core module $CM + (import "" "mem" (memory 1)) + (import "" "waitable.join" (func $waitable.join (param i32 i32))) + (import "" "waitable-set.new" (func $waitable-set.new (result i32))) + (import "" "waitable-set.wait" (func $waitable-set.wait (param i32 i32) (result i32))) + (import "" "future.new" (func $future.new (result i64))) + (import "" "future.write" (func $future.write (param i32 i32) (result i32))) + (import "" "future.drop-writable" (func $future.drop-writable (param i32))) + (import "" "stream.new" (func $stream.new (result i64))) + (import "" "stream.write" (func $stream.write (param i32 i32 i32) (result i32))) + (import "" "stream.drop-writable" (func $stream.drop-writable (param i32))) + + (global $writable-end (mut i32) (i32.const 0)) + (global $ws (mut i32) (i32.const 0)) + + (func $start (global.set $ws (call $waitable-set.new))) + (start $start) + + (func $start-future (export "start-future") (result i32) + ;; create a new future, return the readable end to the caller + (local $ret64 i64) + (local.set $ret64 (call $future.new)) + (global.set $writable-end (i32.wrap_i64 (i64.shr_u (local.get $ret64) (i64.const 32)))) + (call $waitable.join (global.get $writable-end) (global.get $ws) ) + (i32.wrap_i64 (local.get $ret64)) + ) + (func $future-write (export "future-write") (result i32) + ;; the caller will assert what they expect the return value to be + (i32.store (i32.const 16) (i32.const 42)) + (call $future.write (global.get $writable-end) (i32.const 16)) + ) + (func $acknowledge-future-write (export "acknowledge-future-write") + ;; confirm we got a FUTURE_WRITE $writable-end COMPLETED event + (local $ret i32) + (local.set $ret (call $waitable-set.wait (global.get $ws) (i32.const 0))) + (if (i32.ne (i32.const 5 (; FUTURE_WRITE ;)) (local.get $ret)) + (then unreachable)) + (if (i32.ne (global.get $writable-end) (i32.load (i32.const 0))) + (then unreachable)) + (if (i32.ne (i32.const 0 (; COMPLETED ;)) (i32.load (i32.const 4))) + (then unreachable)) + ) + (func $future-drop-writable (export "future-drop-writable") + ;; maybe boom + (call $future.drop-writable (global.get $writable-end)) + ) + + (func $start-stream (export "start-stream") (result i32) + ;; create a new stream, return the readable end to the caller + (local $ret64 i64) + (local.set $ret64 (call $stream.new)) + (global.set $writable-end (i32.wrap_i64 (i64.shr_u (local.get $ret64) (i64.const 32)))) + (call $waitable.join (global.get $writable-end) (global.get $ws) ) + (i32.wrap_i64 (local.get $ret64)) + ) + (func $stream-write (export "stream-write") (result i32) + ;; the caller will assert what they expect the return value to be + (i32.store (i32.const 16) (i32.const 42)) + (call $stream.write (global.get $writable-end) (i32.const 16) (i32.const 1)) + ) + (func $acknowledge-stream-write (export "acknowledge-stream-write") + ;; confirm we got a STREAM_WRITE $writable-end COMPLETED event + (local $ret i32) + (local.set $ret (call $waitable-set.wait (global.get $ws) (i32.const 0))) + (if (i32.ne (i32.const 3 (; STREAM_WRITE ;)) (local.get $ret)) + (then unreachable)) + (if (i32.ne (global.get $writable-end) (i32.load (i32.const 0))) + (then unreachable)) + (if (i32.ne (i32.const 0x11 (; DROPPED=1 | (1<<4) ;)) (i32.load (i32.const 4))) + (then unreachable)) + ) + (func $stream-drop-writable (export "stream-drop-writable") + ;; maybe boom + (call $stream.drop-writable (global.get $writable-end)) + ) + ) + (type $FT (future u8)) + (type $ST (stream u8)) + (canon waitable.join (core func $waitable.join)) + (canon waitable-set.new (core func $waitable-set.new)) + (canon waitable-set.wait (memory $memory "mem") (core func $waitable-set.wait)) + (canon future.new $FT (core func $future.new)) + (canon future.write $FT async (memory $memory "mem") (core func $future.write)) + (canon future.drop-writable $FT (core func $future.drop-writable)) + (canon stream.new $ST (core func $stream.new)) + (canon stream.write $ST async (memory $memory "mem") (core func $stream.write)) + (canon stream.drop-writable $ST (core func $stream.drop-writable)) + (core instance $cm (instantiate $CM (with "" (instance + (export "mem" (memory $memory "mem")) + (export "waitable.join" (func $waitable.join)) + (export "waitable-set.new" (func $waitable-set.new)) + (export "waitable-set.wait" (func $waitable-set.wait)) + (export "future.new" (func $future.new)) + (export "future.write" (func $future.write)) + (export "future.drop-writable" (func $future.drop-writable)) + (export "stream.new" (func $stream.new)) + (export "stream.write" (func $stream.write)) + (export "stream.drop-writable" (func $stream.drop-writable)) + )))) + (func (export "start-future") (result (future u8)) (canon lift (core func $cm "start-future"))) + (func (export "future-write") (result u32) (canon lift (core func $cm "future-write"))) + (func (export "acknowledge-future-write") (canon lift (core func $cm "acknowledge-future-write"))) + (func (export "future-drop-writable") (canon lift (core func $cm "future-drop-writable"))) + (func (export "start-stream") (result (stream u8)) (canon lift (core func $cm "start-stream"))) + (func (export "stream-write") (result u32) (canon lift (core func $cm "stream-write"))) + (func (export "acknowledge-stream-write") (canon lift (core func $cm "acknowledge-stream-write"))) + (func (export "stream-drop-writable") (canon lift (core func $cm "stream-drop-writable"))) + ) + (component $D + (import "c" (instance $c + (export "start-future" (func (result (future u8)))) + (export "future-write" (func (result u32))) + (export "acknowledge-future-write" (func)) + (export "future-drop-writable" (func)) + (export "start-stream" (func (result (stream u8)))) + (export "stream-write" (func (result u32))) + (export "acknowledge-stream-write" (func)) + (export "stream-drop-writable" (func)) + )) + + (core module $Memory (memory (export "mem") 1)) + (core instance $memory (instantiate $Memory)) + (core module $Core + (import "" "mem" (memory 1)) + (import "" "waitable.join" (func $waitable.join (param i32 i32))) + (import "" "waitable-set.new" (func $waitable-set.new (result i32))) + (import "" "waitable-set.wait" (func $waitable-set.wait (param i32 i32) (result i32))) + (import "" "future.read" (func $future.read (param i32 i32) (result i32))) + (import "" "future.drop-readable" (func $future.drop-readable (param i32))) + (import "" "stream.read" (func $stream.read (param i32 i32 i32) (result i32))) + (import "" "stream.drop-readable" (func $stream.drop-readable (param i32))) + (import "" "start-future" (func $start-future (result i32))) + (import "" "future-write" (func $future-write (result i32))) + (import "" "acknowledge-future-write" (func $acknowledge-future-write)) + (import "" "future-drop-writable" (func $future-drop-writable)) + (import "" "start-stream" (func $start-stream (result i32))) + (import "" "stream-write" (func $stream-write (result i32))) + (import "" "acknowledge-stream-write" (func $acknowledge-stream-write)) + (import "" "stream-drop-writable" (func $stream-drop-writable)) + + (func $trap-after-future-eager-write (export "trap-after-future-eager-write") + (local $ret i32) + (local $fr i32) + (local.set $fr (call $start-future)) + + ;; start a read on our end so the next write will succeed + (local.set $ret (call $future.read (local.get $fr) (i32.const 16))) + (if (i32.ne (i32.const -1 (; BLOCKED ;)) (local.get $ret)) + (then unreachable)) + + ;; calling future.write in $C should succeed eagerly + (local.set $ret (call $future-write)) + (if (i32.ne (i32.const 0 (; COMPLETED ;)) (local.get $ret)) + (then unreachable)) + (if (i32.ne (i32.const 42) (i32.load8_u (i32.const 16))) + (then unreachable)) + + ;; calling future.write in $C now should trap + (drop (call $future-write)) + ) + (func $trap-after-future-async-write (export "trap-after-future-async-write") + (local $ret i32) + (local $fr i32) + (local.set $fr (call $start-future)) + + ;; calling future.write in $C should block + (local.set $ret (call $future-write)) + (if (i32.ne (i32.const -1 (; BLOCKED ;)) (local.get $ret)) + (then unreachable)) + + ;; our future.read should then succeed eagerly + (local.set $ret (call $future.read (local.get $fr) (i32.const 16))) + (if (i32.ne (i32.const 0 (; COMPLETED ;)) (local.get $ret)) + (then unreachable)) + (if (i32.ne (i32.const 42) (i32.load8_u (i32.const 16))) + (then unreachable)) + + ;; let $C see the write completed so the future is 'done' + (call $acknowledge-future-write) + + ;; trying to call future.write again in $C should trap + (drop (call $future-write)) + ) + (func $trap-after-future-reader-dropped (export "trap-after-future-reader-dropped") + (local $ret i32) + (local $fr i32) + (local.set $fr (call $start-future)) + + ;; drop our readable end before writer can write + (call $future.drop-readable (local.get $fr)) + + ;; let $C try to future.write and find out we DROPPED + (local.set $ret (call $future-write)) + (if (i32.ne (i32.const 1 (; DROPPED ;)) (local.get $ret)) + (then unreachable)) + + ;; trying to call future.write again in $C should trap + (drop (call $future-write)) + ) + (func $trap-after-future-eager-read (export "trap-after-future-eager-read") (param $bool i32) (result i32) + (local $ret i32) + (local $fr i32) + (local.set $fr (call $start-future)) + + ;; calling future.write in $C should block + (local.set $ret (call $future-write)) + (if (i32.ne (i32.const -1 (; BLOCKED ;)) (local.get $ret)) + (then unreachable)) + + ;; our future.read should then succeed eagerly + (local.set $ret (call $future.read (local.get $fr) (i32.const 16))) + (if (i32.ne (i32.const 0 (; COMPLETED ;)) (local.get $ret)) + (then unreachable)) + (if (i32.ne (i32.const 42) (i32.load8_u (i32.const 16))) + (then unreachable)) + + (if (i32.eqz (local.get $bool)) (then + ;; calling future.read again should then trap + (drop (call $future.read (local.get $fr) (i32.const 16))) + ) (else + ;; lifting the future by returning it should also trap + (return (local.get $fr)) + )) + unreachable + ) + (func $trap-after-future-async-read (export "trap-after-future-async-read") (param $bool i32) (result i32) + (local $ret i32) (local $ws i32) + (local $fr i32) + (local.set $fr (call $start-future)) + + ;; read first, so it blocks + (local.set $ret (call $future.read (local.get $fr) (i32.const 16))) + (if (i32.ne (i32.const -1 (; BLOCKED ;)) (local.get $ret)) + (then unreachable)) + + ;; calling future.write in $C should then succeed eagerly + (local.set $ret (call $future-write)) + (if (i32.ne (i32.const 0 (; COMPLETED ;)) (local.get $ret)) + (then unreachable)) + (if (i32.ne (i32.const 42) (i32.load8_u (i32.const 16))) + (then unreachable)) + + ;; wait to see that our blocked future.read COMPLETED, producing '42' + (local.set $ws (call $waitable-set.new)) + (call $waitable.join (local.get $fr) (local.get $ws)) + (local.set $ret (call $waitable-set.wait (local.get $ws) (i32.const 0))) + (if (i32.ne (i32.const 4 (; FUTURE_READ ;)) (local.get $ret)) + (then unreachable)) + (if (i32.ne (local.get $fr) (i32.load (i32.const 0))) + (then unreachable)) + (if (i32.ne (i32.const 0 (; COMPLETED ;)) (i32.load (i32.const 4))) + (then unreachable)) + (if (i32.ne (i32.const 42) (i32.load (i32.const 16))) + (then unreachable)) + + (if (i32.eqz (local.get $bool)) (then + ;; calling future.read again should then trap + (drop (call $future.read (local.get $fr) (i32.const 16))) + ) (else + ;; lifting the future by returning it should also trap + (return (local.get $fr)) + )) + unreachable + ) + (func $trap-after-stream-reader-eager-dropped (export "trap-after-stream-reader-eager-dropped") + (local $ret i32) + (local $sr i32) + (local.set $sr (call $start-stream)) + + ;; drop our readable end before writer can write + (call $stream.drop-readable (local.get $sr)) + + ;; let $C try to stream.write and find out we DROPPED + (local.set $ret (call $stream-write)) + (if (i32.ne (i32.const 1 (; DROPPED ;)) (local.get $ret)) + (then unreachable)) + + ;; trying to call stream.write again in $C should trap + (drop (call $stream-write)) + ) + (func $trap-after-stream-reader-async-dropped (export "trap-after-stream-reader-async-dropped") + (local $ret i32) + (local $sr i32) + (local.set $sr (call $start-stream)) + + ;; calling stream.write in $C should block + (local.set $ret (call $stream-write)) + (if (i32.ne (i32.const -1 (; BLOCKED ;)) (local.get $ret)) + (then unreachable)) + + ;; our stream.read should then succeed eagerly + (local.set $ret (call $stream.read (local.get $sr) (i32.const 16) (i32.const 100))) + (if (i32.ne (i32.const 0x10 (; COMPLETED=0 | (1<<4) ;)) (local.get $ret)) + (then unreachable)) + (if (i32.ne (i32.const 42) (i32.load8_u (i32.const 16))) + (then unreachable)) + + ;; then drop our readable end + (call $stream.drop-readable (local.get $sr)) + + ;; let $C see that it's stream.write COMPLETED and wrote 1 elem + (call $acknowledge-stream-write) + + ;; now calling stream.write again in $C will trap + (drop (call $stream-write)) + ) + (func $trap-after-stream-writer-eager-dropped (export "trap-after-stream-writer-eager-dropped") (param $bool i32) (result i32) + (local $ret i32) + (local $sr i32) + (local.set $sr (call $start-stream)) + + ;; immediately drop the writable end + (call $stream-drop-writable) + + ;; calling stream.read will see that the writer dropped + (local.set $ret (call $stream.read (local.get $sr) (i32.const 16) (i32.const 100))) + (if (i32.ne (i32.const 0x01 (; DROPPED=1 | (0<<4) ;)) (local.get $ret)) + (then unreachable)) + + (if (i32.eqz (local.get $bool)) (then + ;; calling stream.read again should then trap + (drop (call $stream.read (local.get $sr) (i32.const 16) (i32.const 100))) + ) (else + ;; lifting the stream by returning it should also trap + (return (local.get $sr)) + )) + unreachable + ) + (func $trap-after-stream-writer-async-dropped (export "trap-after-stream-writer-async-dropped") (param $bool i32) (result i32) + (local $ret i32) (local $ws i32) + (local $sr i32) + (local.set $sr (call $start-stream)) + + ;; start a read on our end first which will block + (local.set $ret (call $stream.read (local.get $sr) (i32.const 16) (i32.const 100))) + (if (i32.ne (i32.const -1 (; BLOCKED ;)) (local.get $ret)) + (then unreachable)) + + ;; drop the writable end before writing anything + (call $stream-drop-writable) + + ;; wait to see that our blocked stream.read was DROPPED + (local.set $ws (call $waitable-set.new)) + (call $waitable.join (local.get $sr) (local.get $ws)) + (local.set $ret (call $waitable-set.wait (local.get $ws) (i32.const 0))) + (if (i32.ne (i32.const 2 (; STREAM_READ ;)) (local.get $ret)) + (then unreachable)) + (if (i32.ne (local.get $sr) (i32.load (i32.const 0))) + (then unreachable)) + (if (i32.ne (i32.const 0x01 (; DROPPED=1 | (0<<4) ;)) (i32.load (i32.const 4))) + (then unreachable)) + + (if (i32.eqz (local.get $bool)) (then + ;; calling stream.read again should then trap + (drop (call $stream.read (local.get $sr) (i32.const 16) (i32.const 100))) + ) (else + ;; lifting the stream by returning it should also trap + (return (local.get $sr)) + )) + unreachable + ) + ) + (type $FT (future u8)) + (type $ST (stream u8)) + (canon waitable.join (core func $waitable.join)) + (canon waitable-set.new (core func $waitable-set.new)) + (canon waitable-set.wait (memory $memory "mem") (core func $waitable-set.wait)) + (canon future.new $FT (core func $future.new)) + (canon future.read $FT async (memory $memory "mem") (core func $future.read)) + (canon future.drop-readable $FT (core func $future.drop-readable)) + (canon stream.new $ST (core func $stream.new)) + (canon stream.read $ST async (memory $memory "mem") (core func $stream.read)) + (canon stream.drop-readable $ST (core func $stream.drop-readable)) + (canon lower (func $c "start-future") (core func $start-future')) + (canon lower (func $c "future-write") (core func $future-write')) + (canon lower (func $c "acknowledge-future-write") (core func $acknowledge-future-write')) + (canon lower (func $c "future-drop-writable") (core func $future-drop-writable')) + (canon lower (func $c "start-stream") (core func $start-stream')) + (canon lower (func $c "stream-write") (core func $stream-write')) + (canon lower (func $c "acknowledge-stream-write") (core func $acknowledge-stream-write')) + (canon lower (func $c "stream-drop-writable") (core func $stream-drop-writable')) + (core instance $core (instantiate $Core (with "" (instance + (export "mem" (memory $memory "mem")) + (export "waitable.join" (func $waitable.join)) + (export "waitable-set.new" (func $waitable-set.new)) + (export "waitable-set.wait" (func $waitable-set.wait)) + (export "future.new" (func $future.new)) + (export "future.read" (func $future.read)) + (export "future.drop-readable" (func $future.drop-readable)) + (export "stream.new" (func $stream.new)) + (export "stream.read" (func $stream.read)) + (export "stream.drop-readable" (func $stream.drop-readable)) + (export "start-future" (func $start-future')) + (export "future-write" (func $future-write')) + (export "acknowledge-future-write" (func $acknowledge-future-write')) + (export "future-drop-writable" (func $future-drop-writable')) + (export "start-stream" (func $start-stream')) + (export "stream-write" (func $stream-write')) + (export "acknowledge-stream-write" (func $acknowledge-stream-write')) + (export "stream-drop-writable" (func $stream-drop-writable')) + )))) + (func (export "trap-after-future-eager-write") (canon lift (core func $core "trap-after-future-eager-write"))) + (func (export "trap-after-future-async-write") (canon lift (core func $core "trap-after-future-async-write"))) + (func (export "trap-after-future-reader-dropped") (canon lift (core func $core "trap-after-future-reader-dropped"))) + (func (export "trap-after-future-eager-read") (param "bool" bool) (result $FT) (canon lift (core func $core "trap-after-future-eager-read"))) + (func (export "trap-after-future-async-read") (param "bool" bool) (result $FT) (canon lift (core func $core "trap-after-future-async-read"))) + (func (export "trap-after-stream-reader-eager-dropped") (canon lift (core func $core "trap-after-stream-reader-eager-dropped"))) + (func (export "trap-after-stream-reader-async-dropped") (canon lift (core func $core "trap-after-stream-reader-async-dropped"))) + (func (export "trap-after-stream-writer-eager-dropped") (param "bool" bool) (result $ST) (canon lift (core func $core "trap-after-stream-writer-eager-dropped"))) + (func (export "trap-after-stream-writer-async-dropped") (param "bool" bool) (result $ST) (canon lift (core func $core "trap-after-stream-writer-async-dropped"))) + ) + (instance $c (instantiate $C)) + (instance $d (instantiate $D (with "c" (instance $c)))) + (func (export "trap-after-future-eager-write") (alias export $d "trap-after-future-eager-write")) + (func (export "trap-after-future-async-write") (alias export $d "trap-after-future-async-write")) + (func (export "trap-after-future-reader-dropped") (alias export $d "trap-after-future-reader-dropped")) + (func (export "trap-after-future-eager-read") (alias export $d "trap-after-future-eager-read")) + (func (export "trap-after-future-async-read") (alias export $d "trap-after-future-async-read")) + (func (export "trap-after-stream-reader-eager-dropped") (alias export $d "trap-after-stream-reader-eager-dropped")) + (func (export "trap-after-stream-reader-async-dropped") (alias export $d "trap-after-stream-reader-async-dropped")) + (func (export "trap-after-stream-writer-eager-dropped") (alias export $d "trap-after-stream-writer-eager-dropped")) + (func (export "trap-after-stream-writer-async-dropped") (alias export $d "trap-after-stream-writer-async-dropped")) +) + +(component instance $i1 $Tester) +(assert_trap (invoke "trap-after-future-eager-write") "cannot write to future after previous write succeeded or readable end dropped") +(component instance $i2 $Tester) +(assert_trap (invoke "trap-after-future-async-write") "cannot write to future after previous write succeeded or readable end dropped") +(component instance $i3 $Tester) +(assert_trap (invoke "trap-after-future-reader-dropped") "cannot write to future after previous write succeeded or readable end dropped") +(component instance $i4.1 $Tester) +(assert_trap (invoke "trap-after-future-eager-read" (bool.const false)) "cannot read from future after previous read succeeded") +(component instance $i4.2 $Tester) +(assert_trap (invoke "trap-after-future-eager-read" (bool.const true)) "cannot lift future after previous read succeeded") +(component instance $i5.1 $Tester) +(assert_trap (invoke "trap-after-future-async-read" (bool.const false)) "cannot read from future after previous read succeeded") +(component instance $i5.2 $Tester) +(assert_trap (invoke "trap-after-future-async-read" (bool.const true)) "cannot lift future after previous read succeeded") +(component instance $i6 $Tester) +(assert_trap (invoke "trap-after-stream-reader-eager-dropped") "cannot write to stream after being notified that the readable end dropped") +(component instance $i7 $Tester) +(assert_trap (invoke "trap-after-stream-reader-async-dropped") "cannot write to stream after being notified that the readable end dropped") +(component instance $i8.1 $Tester) +(assert_trap (invoke "trap-after-stream-writer-eager-dropped" (bool.const false)) "cannot read from stream after being notified that the writable end dropped") +(component instance $i8.2 $Tester) +(assert_trap (invoke "trap-after-stream-writer-eager-dropped" (bool.const true)) "cannot lift stream after being notified that the writable end dropped") +(component instance $i9.1 $Tester) +(assert_trap (invoke "trap-after-stream-writer-async-dropped" (bool.const false)) "cannot read from stream after being notified that the writable end dropped") +(component instance $i9.2 $Tester) +(assert_trap (invoke "trap-after-stream-writer-async-dropped" (bool.const true)) "cannot lift stream after being notified that the writable end dropped")