Skip to content
This repository was archived by the owner on Sep 8, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions crates/test-programs/src/bin/async_http_echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,7 @@ impl Handler for Component {
assert!(chunk.is_empty());
}
StreamResult::Closed => break,
// FIXME(WebAssembly/component-model#490): this should
// be a panic but will require some spec changes because
// right now this and `Complete(0)` are the same.
StreamResult::Cancelled => {}
StreamResult::Cancelled => unreachable!(),
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,7 @@ async fn test_tcp_sample_application(family: IpAddressFamily, bind_address: IpSo
},
async {
let (result, _) = data_tx.write(vec![]).await;
// FIXME(WebAssembly/component-model#490): this should be a
// panic but will require some spec changes because right
// now this and `Complete(0)` are the same.
assert_eq!(result, StreamResult::Cancelled);
assert_eq!(result, StreamResult::Complete(0));
let remaining = data_tx.write_all(first_message.into()).await;
assert!(remaining.is_empty());
drop(data_tx);
Expand Down
186 changes: 99 additions & 87 deletions crates/wasmtime/src/runtime/component/concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use {
future::{self, FutureExt},
stream::{FuturesUnordered, StreamExt},
},
futures_and_streams::{FlatAbi, StreamFutureState, TableIndex, TransmitHandle},
futures_and_streams::{FlatAbi, ReturnCode, StreamFutureState, TableIndex, TransmitHandle},
once_cell::sync::Lazy,
ready_chunks::ReadyChunks,
states::StateTable,
Expand Down Expand Up @@ -99,21 +99,21 @@ enum Event {
CallStarted,
CallReturned,
StreamRead {
count: u32,
code: ReturnCode,
handle: u32,
ty: TypeStreamTableIndex,
},
StreamWrite {
count: u32,
code: ReturnCode,
pending: Option<(TypeStreamTableIndex, u32)>,
},
FutureRead {
count: u32,
code: ReturnCode,
handle: u32,
ty: TypeFutureTableIndex,
},
FutureWrite {
count: u32,
code: ReturnCode,
pending: Option<(TypeFutureTableIndex, u32)>,
},
}
Expand All @@ -125,10 +125,10 @@ impl Event {
Event::_CallStarting => (1, 0),
Event::CallStarted => (2, 0),
Event::CallReturned => (3, 0),
Event::StreamRead { count, .. } => (5, count),
Event::StreamWrite { count, .. } => (6, count),
Event::FutureRead { count, .. } => (7, count),
Event::FutureWrite { count, .. } => (8, count),
Event::StreamRead { code, .. } => (5, code.encode()),
Event::StreamWrite { code, .. } => (6, code.encode()),
Event::FutureRead { code, .. } => (7, code.encode()),
Event::FutureWrite { code, .. } => (8, code.encode()),
}
}
}
Expand Down Expand Up @@ -2820,18 +2820,20 @@ unsafe impl<T> VMComponentAsyncStore for StoreInner<T> {
future: u32,
address: u32,
) -> Result<u32> {
instance.guest_write(
StoreContextMut(self),
memory,
realloc,
string_encoding,
async_,
TableIndex::Future(ty),
None,
future,
address,
1,
)
instance
.guest_write(
StoreContextMut(self),
memory,
realloc,
string_encoding,
async_,
TableIndex::Future(ty),
None,
future,
address,
1,
)
.map(|result| result.encode())
}

fn future_read(
Expand All @@ -2845,18 +2847,20 @@ unsafe impl<T> VMComponentAsyncStore for StoreInner<T> {
future: u32,
address: u32,
) -> Result<u32> {
instance.guest_read(
StoreContextMut(self),
memory,
realloc,
string_encoding,
async_,
TableIndex::Future(ty),
None,
future,
address,
1,
)
instance
.guest_read(
StoreContextMut(self),
memory,
realloc,
string_encoding,
async_,
TableIndex::Future(ty),
None,
future,
address,
1,
)
.map(|result| result.encode())
}

fn stream_write(
Expand All @@ -2871,18 +2875,20 @@ unsafe impl<T> VMComponentAsyncStore for StoreInner<T> {
address: u32,
count: u32,
) -> Result<u32> {
instance.guest_write(
StoreContextMut(self),
memory,
realloc,
string_encoding,
async_,
TableIndex::Stream(ty),
None,
stream,
address,
count,
)
instance
.guest_write(
StoreContextMut(self),
memory,
realloc,
string_encoding,
async_,
TableIndex::Stream(ty),
None,
stream,
address,
count,
)
.map(|result| result.encode())
}

fn stream_read(
Expand All @@ -2897,18 +2903,20 @@ unsafe impl<T> VMComponentAsyncStore for StoreInner<T> {
address: u32,
count: u32,
) -> Result<u32> {
instance.guest_read(
StoreContextMut(self),
memory,
realloc,
string_encoding,
async_,
TableIndex::Stream(ty),
None,
stream,
address,
count,
)
instance
.guest_read(
StoreContextMut(self),
memory,
realloc,
string_encoding,
async_,
TableIndex::Stream(ty),
None,
stream,
address,
count,
)
.map(|result| result.encode())
}

fn flat_stream_write(
Expand All @@ -2924,21 +2932,23 @@ unsafe impl<T> VMComponentAsyncStore for StoreInner<T> {
address: u32,
count: u32,
) -> Result<u32> {
instance.guest_write(
StoreContextMut(self),
memory,
realloc,
StringEncoding::Utf8 as u8,
async_,
TableIndex::Stream(ty),
Some(FlatAbi {
size: payload_size,
align: payload_align,
}),
stream,
address,
count,
)
instance
.guest_write(
StoreContextMut(self),
memory,
realloc,
StringEncoding::Utf8 as u8,
async_,
TableIndex::Stream(ty),
Some(FlatAbi {
size: payload_size,
align: payload_align,
}),
stream,
address,
count,
)
.map(|result| result.encode())
}

fn flat_stream_read(
Expand All @@ -2954,21 +2964,23 @@ unsafe impl<T> VMComponentAsyncStore for StoreInner<T> {
address: u32,
count: u32,
) -> Result<u32> {
instance.guest_read(
StoreContextMut(self),
memory,
realloc,
StringEncoding::Utf8 as u8,
async_,
TableIndex::Stream(ty),
Some(FlatAbi {
size: payload_size,
align: payload_align,
}),
stream,
address,
count,
)
instance
.guest_read(
StoreContextMut(self),
memory,
realloc,
StringEncoding::Utf8 as u8,
async_,
TableIndex::Stream(ty),
Some(FlatAbi {
size: payload_size,
align: payload_align,
}),
stream,
address,
count,
)
.map(|result| result.encode())
}

fn error_context_debug_message(
Expand Down
Loading
Loading