Skip to content

Commit 59d7034

Browse files
authored
Refine Guard* APIs and implementation (#11354)
This is a follow-up to #11325 with a number of cosmetic changes about the shape of the API and structure of the internals: * `{Stream,Future}{Reader,Writer}::guard` is now an alternative constructor to `Guard*::new` (import fewer types). * Internally `WithAccessor` and `DropWithStore` are removed in favor of direct `Drop for Guard*` impls. * An `Option` is used to replace `ManuallyDrop` and `unsafe` code. * `{Stream,Future}{Reader,Writer}::close{,_with}` now take `&mut self` instead of `self` to be more composable with `&mut self` arguments during `Drop` for other structures (e.g. build-your-own drop-with-store). * The type parameters on `Guard*` are simplified to just `T`, the future or stream payload, and `A: AsAccessor`. This helps cut down on the complexity of signatures. * `Guard*` types now have `into_{stream,future}` as an alternative to `.into()` which doesn't require type annotations.
1 parent 1df3468 commit 59d7034

File tree

4 files changed

+392
-223
lines changed

4 files changed

+392
-223
lines changed

crates/misc/component-async-tests/tests/scenario/streams.rs

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ use {
1515
wasmtime::{
1616
Engine, Store,
1717
component::{
18-
GuardedFutureReader, GuardedStreamReader, GuardedStreamWriter, Linker, ResourceTable,
19-
VecBuffer,
18+
Accessor, GuardedFutureReader, GuardedStreamReader, GuardedStreamWriter, Linker,
19+
ResourceTable, VecBuffer,
2020
},
2121
},
2222
wasmtime_wasi::p2::WasiCtxBuilder,
@@ -49,15 +49,15 @@ pub async fn async_watch_streams() -> Result<()> {
4949
let instance = linker.instantiate_async(&mut store, &component).await?;
5050

5151
// Test watching and then dropping the read end of a stream.
52-
let (mut tx, rx) = instance.stream::<u8>(&mut store)?;
52+
let (mut tx, mut rx) = instance.stream::<u8>(&mut store)?;
5353
instance
5454
.run_concurrent(&mut store, async |store| {
5555
futures::join!(tx.watch_reader(store), async { rx.close_with(store) }).1
5656
})
5757
.await?;
5858

5959
// Test dropping and then watching the read end of a stream.
60-
let (mut tx, rx) = instance.stream::<u8>(&mut store)?;
60+
let (mut tx, mut rx) = instance.stream::<u8>(&mut store)?;
6161
instance
6262
.run_concurrent(&mut store, async |store| {
6363
rx.close_with(store);
@@ -66,15 +66,15 @@ pub async fn async_watch_streams() -> Result<()> {
6666
.await?;
6767

6868
// Test watching and then dropping the write end of a stream.
69-
let (tx, mut rx) = instance.stream::<u8>(&mut store)?;
69+
let (mut tx, mut rx) = instance.stream::<u8>(&mut store)?;
7070
instance
7171
.run_concurrent(&mut store, async |store| {
7272
futures::join!(rx.watch_writer(store), async { tx.close_with(store) }).1
7373
})
7474
.await?;
7575

7676
// Test dropping and then watching the write end of a stream.
77-
let (tx, mut rx) = instance.stream::<u8>(&mut store)?;
77+
let (mut tx, mut rx) = instance.stream::<u8>(&mut store)?;
7878
instance
7979
.run_concurrent(&mut store, async |store| {
8080
tx.close_with(store);
@@ -83,15 +83,15 @@ pub async fn async_watch_streams() -> Result<()> {
8383
.await?;
8484

8585
// Test watching and then dropping the read end of a future.
86-
let (mut tx, rx) = instance.future::<u8>(&mut store, || 42)?;
86+
let (mut tx, mut rx) = instance.future::<u8>(&mut store, || 42)?;
8787
instance
8888
.run_concurrent(&mut store, async |store| {
8989
futures::join!(tx.watch_reader(store), async { rx.close_with(store) }).1
9090
})
9191
.await?;
9292

9393
// Test dropping and then watching the read end of a future.
94-
let (mut tx, rx) = instance.future::<u8>(&mut store, || 42)?;
94+
let (mut tx, mut rx) = instance.future::<u8>(&mut store, || 42)?;
9595
instance
9696
.run_concurrent(&mut store, async |store| {
9797
rx.close_with(store);
@@ -100,15 +100,15 @@ pub async fn async_watch_streams() -> Result<()> {
100100
.await?;
101101

102102
// Test watching and then dropping the write end of a future.
103-
let (tx, mut rx) = instance.future::<u8>(&mut store, || 42)?;
103+
let (mut tx, mut rx) = instance.future::<u8>(&mut store, || 42)?;
104104
instance
105105
.run_concurrent(&mut store, async |store| {
106106
futures::join!(rx.watch_writer(store), async { tx.close_with(store) }).1
107107
})
108108
.await?;
109109

110110
// Test dropping and then watching the write end of a future.
111-
let (tx, mut rx) = instance.future::<u8>(&mut store, || 42)?;
111+
let (mut tx, mut rx) = instance.future::<u8>(&mut store, || 42)?;
112112
instance
113113
.run_concurrent(&mut store, async |store| {
114114
tx.close_with(store);
@@ -117,8 +117,11 @@ pub async fn async_watch_streams() -> Result<()> {
117117
.await?;
118118

119119
enum Event<'a> {
120-
Write(Option<GuardedStreamWriter<'a, u8, Ctx>>),
121-
Read(Option<GuardedStreamReader<'a, u8, Ctx>>, Option<u8>),
120+
Write(Option<GuardedStreamWriter<u8, &'a Accessor<Ctx>>>),
121+
Read(
122+
Option<GuardedStreamReader<u8, &'a Accessor<Ctx>>>,
123+
Option<u8>,
124+
),
122125
}
123126

124127
// Test watching, then writing to, then dropping, then writing again to the
@@ -212,9 +215,9 @@ pub async fn test_closed_streams(watch: bool) -> Result<()> {
212215
let instance = linker.instantiate_async(&mut store, &component).await?;
213216

214217
enum StreamEvent<'a> {
215-
FirstWrite(Option<GuardedStreamWriter<'a, u8, Ctx>>),
216-
FirstRead(Option<GuardedStreamReader<'a, u8, Ctx>>, Vec<u8>),
217-
SecondWrite(Option<GuardedStreamWriter<'a, u8, Ctx>>),
218+
FirstWrite(Option<GuardedStreamWriter<u8, &'a Accessor<Ctx>>>),
219+
FirstRead(Option<GuardedStreamReader<u8, &'a Accessor<Ctx>>>, Vec<u8>),
220+
SecondWrite(Option<GuardedStreamWriter<u8, &'a Accessor<Ctx>>>),
218221
GuestCompleted,
219222
}
220223

crates/misc/component-async-tests/tests/scenario/transmit.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -364,15 +364,18 @@ async fn test_transmit_with<Test: TransmitTest + 'static>(component: &str) -> Re
364364

365365
enum Event<'a, Test: TransmitTest> {
366366
Result(Test::Result),
367-
ControlWriteA(Option<GuardedStreamWriter<'a, Control, Ctx>>),
368-
ControlWriteB(Option<GuardedStreamWriter<'a, Control, Ctx>>),
369-
ControlWriteC(Option<GuardedStreamWriter<'a, Control, Ctx>>),
367+
ControlWriteA(Option<GuardedStreamWriter<Control, &'a Accessor<Ctx>>>),
368+
ControlWriteB(Option<GuardedStreamWriter<Control, &'a Accessor<Ctx>>>),
369+
ControlWriteC(Option<GuardedStreamWriter<Control, &'a Accessor<Ctx>>>),
370370
ControlWriteD,
371371
WriteA,
372372
WriteB(bool),
373-
ReadC(Option<GuardedStreamReader<'a, String, Ctx>>, Option<String>),
373+
ReadC(
374+
Option<GuardedStreamReader<String, &'a Accessor<Ctx>>>,
375+
Option<String>,
376+
),
374377
ReadD(Option<String>),
375-
ReadNone(Option<GuardedStreamReader<'a, String, Ctx>>),
378+
ReadNone(Option<GuardedStreamReader<String, &'a Accessor<Ctx>>>),
376379
}
377380

378381
let (control_tx, control_rx) = instance.stream(&mut store)?;

crates/wasi/src/p3/sockets/host/types/tcp.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,7 @@ impl HostTcpSocketWithStore for WasiSockets {
392392
) -> wasmtime::Result<(StreamReader<u8>, FutureReader<Result<(), ErrorCode>>)> {
393393
store.with(|mut view| {
394394
let instance = view.instance();
395-
let (data_tx, data_rx) = instance
395+
let (mut data_tx, data_rx) = instance
396396
.stream(&mut view)
397397
.context("failed to create stream")?;
398398
let TcpSocket { tcp_state, .. } = get_socket_mut(view.get().table, &socket)?;
@@ -411,7 +411,7 @@ impl HostTcpSocketWithStore for WasiSockets {
411411
}
412412
prev => {
413413
*tcp_state = prev;
414-
let (result_tx, result_rx) = instance
414+
let (mut result_tx, result_rx) = instance
415415
.future(&mut view, || Err(ErrorCode::InvalidState))
416416
.context("failed to create future")?;
417417
result_tx.close(&mut view);

0 commit comments

Comments
 (0)