diff --git a/Cargo.lock b/Cargo.lock index d9e83e364a..8b78dd53cd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5267,7 +5267,7 @@ dependencies = [ [[package]] name = "wit-bindgen" version = "0.41.0" -source = "git+https://github.com/bytecodealliance/witx-bindgen#adbeb1bdadea0b85754fefef79a28198f6fdfd6c" +source = "git+https://github.com/bytecodealliance/witx-bindgen#b8d39b3f2b188154ead61079dd3d788851b6c183" dependencies = [ "wit-bindgen-rt 0.41.0", "wit-bindgen-rust-macro", @@ -5276,7 +5276,7 @@ dependencies = [ [[package]] name = "wit-bindgen-core" version = "0.41.0" -source = "git+https://github.com/bytecodealliance/witx-bindgen#adbeb1bdadea0b85754fefef79a28198f6fdfd6c" +source = "git+https://github.com/bytecodealliance/witx-bindgen#b8d39b3f2b188154ead61079dd3d788851b6c183" dependencies = [ "anyhow", "heck 0.5.0", @@ -5295,7 +5295,7 @@ dependencies = [ [[package]] name = "wit-bindgen-rt" version = "0.41.0" -source = "git+https://github.com/bytecodealliance/witx-bindgen#adbeb1bdadea0b85754fefef79a28198f6fdfd6c" +source = "git+https://github.com/bytecodealliance/witx-bindgen#b8d39b3f2b188154ead61079dd3d788851b6c183" dependencies = [ "bitflags 2.6.0", "futures", @@ -5305,7 +5305,7 @@ dependencies = [ [[package]] name = "wit-bindgen-rust" version = "0.41.0" -source = "git+https://github.com/bytecodealliance/witx-bindgen#adbeb1bdadea0b85754fefef79a28198f6fdfd6c" +source = "git+https://github.com/bytecodealliance/witx-bindgen#b8d39b3f2b188154ead61079dd3d788851b6c183" dependencies = [ "anyhow", "heck 0.5.0", @@ -5320,7 +5320,7 @@ dependencies = [ [[package]] name = "wit-bindgen-rust-macro" version = "0.41.0" -source = "git+https://github.com/bytecodealliance/witx-bindgen#adbeb1bdadea0b85754fefef79a28198f6fdfd6c" +source = "git+https://github.com/bytecodealliance/witx-bindgen#b8d39b3f2b188154ead61079dd3d788851b6c183" dependencies = [ "anyhow", "prettyplease", diff --git a/crates/test-programs/src/bin/async_http_echo.rs b/crates/test-programs/src/bin/async_http_echo.rs index cf04c66856..7820609be2 100644 --- a/crates/test-programs/src/bin/async_http_echo.rs +++ b/crates/test-programs/src/bin/async_http_echo.rs @@ -53,10 +53,7 @@ impl Handler for Component { assert!(chunk.is_empty()); } StreamResult::Closed => break, - // FIXME(WebAssembly/component-model#490): this should - // be a panic but will require some spec changes because - // right now this and `Complete(0)` are the same. - StreamResult::Cancelled => {} + StreamResult::Cancelled => unreachable!(), } } diff --git a/crates/test-programs/src/bin/p3_sockets_tcp_sample_application.rs b/crates/test-programs/src/bin/p3_sockets_tcp_sample_application.rs index f37b92e113..b09eeb77b5 100644 --- a/crates/test-programs/src/bin/p3_sockets_tcp_sample_application.rs +++ b/crates/test-programs/src/bin/p3_sockets_tcp_sample_application.rs @@ -32,10 +32,7 @@ async fn test_tcp_sample_application(family: IpAddressFamily, bind_address: IpSo }, async { let (result, _) = data_tx.write(vec![]).await; - // FIXME(WebAssembly/component-model#490): this should be a - // panic but will require some spec changes because right - // now this and `Complete(0)` are the same. - assert_eq!(result, StreamResult::Cancelled); + assert_eq!(result, StreamResult::Complete(0)); let remaining = data_tx.write_all(first_message.into()).await; assert!(remaining.is_empty()); drop(data_tx); diff --git a/crates/wasmtime/src/runtime/component/concurrent.rs b/crates/wasmtime/src/runtime/component/concurrent.rs index cac500071e..4c4f1da31e 100644 --- a/crates/wasmtime/src/runtime/component/concurrent.rs +++ b/crates/wasmtime/src/runtime/component/concurrent.rs @@ -20,7 +20,7 @@ use { future::{self, FutureExt}, stream::{FuturesUnordered, StreamExt}, }, - futures_and_streams::{FlatAbi, StreamFutureState, TableIndex, TransmitHandle}, + futures_and_streams::{FlatAbi, ReturnCode, StreamFutureState, TableIndex, TransmitHandle}, once_cell::sync::Lazy, ready_chunks::ReadyChunks, states::StateTable, @@ -99,21 +99,21 @@ enum Event { CallStarted, CallReturned, StreamRead { - count: u32, + code: ReturnCode, handle: u32, ty: TypeStreamTableIndex, }, StreamWrite { - count: u32, + code: ReturnCode, pending: Option<(TypeStreamTableIndex, u32)>, }, FutureRead { - count: u32, + code: ReturnCode, handle: u32, ty: TypeFutureTableIndex, }, FutureWrite { - count: u32, + code: ReturnCode, pending: Option<(TypeFutureTableIndex, u32)>, }, } @@ -125,10 +125,10 @@ impl Event { Event::_CallStarting => (1, 0), Event::CallStarted => (2, 0), Event::CallReturned => (3, 0), - Event::StreamRead { count, .. } => (5, count), - Event::StreamWrite { count, .. } => (6, count), - Event::FutureRead { count, .. } => (7, count), - Event::FutureWrite { count, .. } => (8, count), + Event::StreamRead { code, .. } => (5, code.encode()), + Event::StreamWrite { code, .. } => (6, code.encode()), + Event::FutureRead { code, .. } => (7, code.encode()), + Event::FutureWrite { code, .. } => (8, code.encode()), } } } @@ -2820,18 +2820,20 @@ unsafe impl VMComponentAsyncStore for StoreInner { future: u32, address: u32, ) -> Result { - instance.guest_write( - StoreContextMut(self), - memory, - realloc, - string_encoding, - async_, - TableIndex::Future(ty), - None, - future, - address, - 1, - ) + instance + .guest_write( + StoreContextMut(self), + memory, + realloc, + string_encoding, + async_, + TableIndex::Future(ty), + None, + future, + address, + 1, + ) + .map(|result| result.encode()) } fn future_read( @@ -2845,18 +2847,20 @@ unsafe impl VMComponentAsyncStore for StoreInner { future: u32, address: u32, ) -> Result { - instance.guest_read( - StoreContextMut(self), - memory, - realloc, - string_encoding, - async_, - TableIndex::Future(ty), - None, - future, - address, - 1, - ) + instance + .guest_read( + StoreContextMut(self), + memory, + realloc, + string_encoding, + async_, + TableIndex::Future(ty), + None, + future, + address, + 1, + ) + .map(|result| result.encode()) } fn stream_write( @@ -2871,18 +2875,20 @@ unsafe impl VMComponentAsyncStore for StoreInner { address: u32, count: u32, ) -> Result { - instance.guest_write( - StoreContextMut(self), - memory, - realloc, - string_encoding, - async_, - TableIndex::Stream(ty), - None, - stream, - address, - count, - ) + instance + .guest_write( + StoreContextMut(self), + memory, + realloc, + string_encoding, + async_, + TableIndex::Stream(ty), + None, + stream, + address, + count, + ) + .map(|result| result.encode()) } fn stream_read( @@ -2897,18 +2903,20 @@ unsafe impl VMComponentAsyncStore for StoreInner { address: u32, count: u32, ) -> Result { - instance.guest_read( - StoreContextMut(self), - memory, - realloc, - string_encoding, - async_, - TableIndex::Stream(ty), - None, - stream, - address, - count, - ) + instance + .guest_read( + StoreContextMut(self), + memory, + realloc, + string_encoding, + async_, + TableIndex::Stream(ty), + None, + stream, + address, + count, + ) + .map(|result| result.encode()) } fn flat_stream_write( @@ -2924,21 +2932,23 @@ unsafe impl VMComponentAsyncStore for StoreInner { address: u32, count: u32, ) -> Result { - instance.guest_write( - StoreContextMut(self), - memory, - realloc, - StringEncoding::Utf8 as u8, - async_, - TableIndex::Stream(ty), - Some(FlatAbi { - size: payload_size, - align: payload_align, - }), - stream, - address, - count, - ) + instance + .guest_write( + StoreContextMut(self), + memory, + realloc, + StringEncoding::Utf8 as u8, + async_, + TableIndex::Stream(ty), + Some(FlatAbi { + size: payload_size, + align: payload_align, + }), + stream, + address, + count, + ) + .map(|result| result.encode()) } fn flat_stream_read( @@ -2954,21 +2964,23 @@ unsafe impl VMComponentAsyncStore for StoreInner { address: u32, count: u32, ) -> Result { - instance.guest_read( - StoreContextMut(self), - memory, - realloc, - StringEncoding::Utf8 as u8, - async_, - TableIndex::Stream(ty), - Some(FlatAbi { - size: payload_size, - align: payload_align, - }), - stream, - address, - count, - ) + instance + .guest_read( + StoreContextMut(self), + memory, + realloc, + StringEncoding::Utf8 as u8, + async_, + TableIndex::Stream(ty), + Some(FlatAbi { + size: payload_size, + align: payload_align, + }), + stream, + address, + count, + ) + .map(|result| result.encode()) } fn error_context_debug_message( diff --git a/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs b/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs index a9cd7ba671..f9b5c75780 100644 --- a/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs +++ b/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs @@ -48,8 +48,37 @@ pub use buffers::{ReadBuffer, VecBuffer, WriteBuffer}; mod buffers; -const BLOCKED: usize = 0xffff_ffff; -const CLOSED: usize = 0x8000_0000; +#[derive(Copy, Clone, Debug, PartialEq)] +pub enum ReturnCode { + Blocked, + Completed(u32), + Closed(u32), + Cancelled(u32), +} + +impl ReturnCode { + pub fn encode(&self) -> u32 { + const BLOCKED: u32 = 0xffff_ffff; + const COMPLETED: u32 = 0x0; + const CLOSED: u32 = 0x1; + const CANCELLED: u32 = 0x2; + match self { + ReturnCode::Blocked => BLOCKED, + ReturnCode::Completed(n) => { + debug_assert!(*n < (1 << 28)); + (n << 4) | COMPLETED + } + ReturnCode::Closed(n) => { + debug_assert!(*n < (1 << 28)); + (n << 4) | CLOSED + } + ReturnCode::Cancelled(n) => { + debug_assert!(*n < (1 << 28)); + (n << 4) | CANCELLED + } + } + } +} #[derive(Copy, Clone, Debug)] pub(super) enum TableIndex { @@ -118,7 +147,7 @@ fn waitable_state(ty: TableIndex, state: StreamFutureState) -> WaitableState { fn accept_reader, U>( mut buffer: B, tx: oneshot::Sender>, -) -> impl FnOnce(&mut ComponentInstance, Reader) -> Result + Send + Sync + 'static { +) -> impl FnOnce(&mut ComponentInstance, Reader) -> Result + Send + Sync + 'static { move |instance, reader| { let count = match reader { Reader::Guest { @@ -155,7 +184,7 @@ fn accept_reader, U>( buffer, closed: false, }); - count + ReturnCode::Completed(count.try_into().unwrap()) } Reader::Host { accept } => { let count = accept(buffer.remaining().as_ptr().cast(), buffer.remaining().len()); @@ -164,14 +193,14 @@ fn accept_reader, U>( buffer, closed: false, }); - count + ReturnCode::Completed(count.try_into().unwrap()) } Reader::End => { _ = tx.send(HostResult { buffer, closed: true, }); - CLOSED + ReturnCode::Closed(0) } }; @@ -182,7 +211,7 @@ fn accept_reader, U>( fn accept_writer, U>( mut buffer: B, tx: oneshot::Sender>, -) -> impl FnOnce(Writer) -> Result + Send + Sync + 'static { +) -> impl FnOnce(Writer) -> Result + Send + Sync + 'static { move |writer| { let count = match writer { Writer::Guest { @@ -214,7 +243,7 @@ fn accept_writer, U>( buffer, closed: false, }); - count + ReturnCode::Completed(count.try_into().unwrap()) } Writer::Host { pointer, count } => { let count = count.min(buffer.remaining_capacity()); @@ -223,14 +252,14 @@ fn accept_writer, U>( buffer, closed: false, }); - count + ReturnCode::Completed(count.try_into().unwrap()) } Writer::End => { _ = tx.send(HostResult { buffer, closed: true, }); - CLOSED + ReturnCode::Closed(0) } }; @@ -1249,7 +1278,7 @@ enum WriteState { post_write: PostWrite, }, HostReady { - accept: Box Result + Send + Sync>, + accept: Box Result + Send + Sync>, post_write: PostWrite, }, Closed, @@ -1272,7 +1301,7 @@ enum ReadState { handle: u32, }, HostReady { - accept: Box Result + Send + Sync>, + accept: Box Result + Send + Sync>, }, Closed, } @@ -1662,7 +1691,7 @@ impl ComponentInstance { .. } => { let read_handle = transmit.read_handle; - let count = accept_reader::(buffer, tx)( + let code = accept_reader::(buffer, tx)( self, Reader::Guest { lower: RawLowerContext { options: &options }, @@ -1672,22 +1701,23 @@ impl ComponentInstance { }, )?; - let count = u32::try_from(count).unwrap(); self.push_event( read_handle.rep(), match ty { - TableIndex::Future(ty) => Event::FutureRead { count, ty, handle }, - TableIndex::Stream(ty) => Event::StreamRead { count, ty, handle }, + TableIndex::Future(ty) => Event::FutureRead { code, ty, handle }, + TableIndex::Stream(ty) => Event::StreamRead { code, ty, handle }, }, )?; } ReadState::HostReady { accept } => { - let count = accept(Writer::Host { + let code = accept(Writer::Host { pointer: buffer.remaining().as_ptr().cast(), count: buffer.remaining().len(), })?; - buffer.forget(count); + if let ReturnCode::Completed(n) = code { + buffer.forget(n.try_into().unwrap()); + } _ = tx.send(HostResult { buffer, closed: false, @@ -1749,7 +1779,7 @@ impl ComponentInstance { let instance = self as *mut _; let types = self.component_types(); let lift = unsafe { &mut LiftContext::new(store.0, &options, types, instance) }; - let count = accept_writer::(buffer, tx)(Writer::Guest { + let code = accept_writer::(buffer, tx)(Writer::Guest { lift, ty: payload(ty, types), address, @@ -1763,16 +1793,15 @@ impl ComponentInstance { true }; - let count = u32::try_from(count).unwrap(); self.push_event( write_handle.rep(), match ty { TableIndex::Future(ty) => Event::FutureWrite { - count, + code, pending: pending.then_some((ty, handle)), }, TableIndex::Stream(ty) => Event::StreamWrite { - count, + code, pending: pending.then_some((ty, handle)), }, }, @@ -1811,7 +1840,7 @@ impl ComponentInstance { Ok(()) } - fn host_cancel_write(&mut self, rep: u32) -> Result { + fn host_cancel_write(&mut self, rep: u32) -> Result { let transmit_id = TableId::::new(rep); let transmit = self.get_mut(transmit_id)?; @@ -1830,10 +1859,10 @@ impl ComponentInstance { log::trace!("canceled write {rep}"); - Ok(0) + Ok(ReturnCode::Cancelled(0)) } - fn host_cancel_read(&mut self, rep: u32) -> Result { + fn host_cancel_read(&mut self, rep: u32) -> Result { let transmit_id = TableId::::new(rep); let transmit = self.get_mut(transmit_id)?; @@ -1852,7 +1881,7 @@ impl ComponentInstance { log::trace!("canceled read {rep}"); - Ok(0) + Ok(ReturnCode::Cancelled(0)) } /// Close the writer end of a Future or Stream @@ -1901,15 +1930,14 @@ impl ComponentInstance { ReadState::GuestReady { ty, handle, .. } => { let read_handle = transmit.read_handle; - let push_param = CLOSED; + let code = ReturnCode::Closed(0); // Ensure the final read of the guest is queued, with appropriate closure indicator - let count = u32::try_from(push_param).unwrap(); self.push_event( read_handle.rep(), match ty { - TableIndex::Future(ty) => Event::FutureRead { count, ty, handle }, - TableIndex::Stream(ty) => Event::StreamRead { count, ty, handle }, + TableIndex::Future(ty) => Event::FutureRead { code, ty, handle }, + TableIndex::Stream(ty) => Event::StreamRead { code, ty, handle }, }, )?; } @@ -1974,16 +2002,16 @@ impl ComponentInstance { true }; - let count = u32::try_from(CLOSED).unwrap(); + let code = ReturnCode::Closed(0); self.push_event( write_handle.rep(), match ty { TableIndex::Future(ty) => Event::FutureWrite { - count, + code, pending: pending.then_some((ty, handle)), }, TableIndex::Stream(ty) => Event::StreamWrite { - count, + code, pending: pending.then_some((ty, handle)), }, }, @@ -2165,7 +2193,7 @@ impl ComponentInstance { handle: u32, address: u32, count: u32, - ) -> Result { + ) -> Result { if !async_ { bail!("synchronous stream and future writes not yet supported"); } @@ -2232,18 +2260,18 @@ impl ComponentInstance { rep, )?; + let code = ReturnCode::Completed(count.try_into().unwrap()); { - let count = u32::try_from(count).unwrap(); self.push_event( read_handle_rep, match read_ty { TableIndex::Future(ty) => Event::FutureRead { - count, + code, ty, handle: read_handle, }, TableIndex::Stream(ty) => Event::StreamRead { - count, + code, ty, handle: read_handle, }, @@ -2251,7 +2279,7 @@ impl ComponentInstance { )?; } - count + code } ReadState::HostReady { accept } => { @@ -2280,17 +2308,17 @@ impl ComponentInstance { post_write: PostWrite::Continue, }; - BLOCKED + ReturnCode::Blocked } - ReadState::Closed => CLOSED, + ReadState::Closed => ReturnCode::Closed(0), }; - if result != BLOCKED { + if result != ReturnCode::Blocked { *self.get_mut_by_index(ty, handle)?.1 = StreamFutureState::Write; } - Ok(u32::try_from(result).unwrap()) + Ok(result) } pub(super) fn guest_read( @@ -2305,13 +2333,12 @@ impl ComponentInstance { handle: u32, address: u32, count: u32, - ) -> Result { + ) -> Result { if !async_ { bail!("synchronous stream and future reads not yet supported"); } let address = usize::try_from(address).unwrap(); - let count = usize::try_from(count).unwrap(); let options = unsafe { Options::new( store.0.id(), @@ -2357,7 +2384,7 @@ impl ComponentInstance { let write_handle_rep = transmit.write_handle.rep(); - let count = count.min(write_count); + let count = usize::try_from(count).unwrap().min(write_count); self.copy( store.as_context_mut(), @@ -2379,34 +2406,34 @@ impl ComponentInstance { true }; + let code = ReturnCode::Completed(count.try_into().unwrap()); { - let count = u32::try_from(count).unwrap(); self.push_event( write_handle_rep, match write_ty { TableIndex::Future(ty) => Event::FutureWrite { - count, + code, pending: pending.then_some((ty, write_handle)), }, TableIndex::Stream(ty) => Event::StreamWrite { - count, + code, pending: pending.then_some((ty, write_handle)), }, }, )?; } - count + code } WriteState::HostReady { accept, post_write } => { - let count = accept( + let code = accept( self, Reader::Guest { lower: RawLowerContext { options: &options }, ty, address: usize::try_from(address).unwrap(), - count, + count: count.try_into().unwrap(), }, )?; @@ -2414,7 +2441,7 @@ impl ComponentInstance { self.get_mut(transmit_id)?.write = WriteState::Closed; } - count + code } WriteState::Open => { @@ -2430,20 +2457,25 @@ impl ComponentInstance { handle, }; - BLOCKED + ReturnCode::Blocked } - WriteState::Closed => CLOSED, + WriteState::Closed => ReturnCode::Closed(0), }; - if result != BLOCKED { + if result != ReturnCode::Blocked { *self.get_mut_by_index(ty, handle)?.1 = StreamFutureState::Read; } - Ok(u32::try_from(result).unwrap()) + Ok(result) } - fn guest_cancel_write(&mut self, ty: TableIndex, writer: u32, _async_: bool) -> Result { + fn guest_cancel_write( + &mut self, + ty: TableIndex, + writer: u32, + _async_: bool, + ) -> Result { let (rep, WaitableState::Stream(_, state) | WaitableState::Future(_, state)) = self.state_table(ty).get_mut_by_index(writer)? else { @@ -2465,7 +2497,12 @@ impl ComponentInstance { self.host_cancel_write(rep) } - fn guest_cancel_read(&mut self, ty: TableIndex, reader: u32, _async_: bool) -> Result { + fn guest_cancel_read( + &mut self, + ty: TableIndex, + reader: u32, + _async_: bool, + ) -> Result { let (rep, WaitableState::Stream(_, state) | WaitableState::Future(_, state)) = self.state_table(ty).get_mut_by_index(reader)? else { @@ -2760,6 +2797,7 @@ impl ComponentInstance { writer: u32, ) -> Result { self.guest_cancel_write(TableIndex::Future(ty), writer, async_) + .map(|result| result.encode()) } pub(crate) fn future_cancel_read( @@ -2769,6 +2807,7 @@ impl ComponentInstance { reader: u32, ) -> Result { self.guest_cancel_read(TableIndex::Future(ty), reader, async_) + .map(|result| result.encode()) } pub(crate) fn future_close_writable( @@ -2798,6 +2837,7 @@ impl ComponentInstance { writer: u32, ) -> Result { self.guest_cancel_write(TableIndex::Stream(ty), writer, async_) + .map(|result| result.encode()) } pub(crate) fn stream_cancel_read( @@ -2807,6 +2847,7 @@ impl ComponentInstance { reader: u32, ) -> Result { self.guest_cancel_read(TableIndex::Stream(ty), reader, async_) + .map(|result| result.encode()) } pub(crate) fn stream_close_writable(