Skip to content

Commit 4194e0a

Browse files
committed
refactor {Stream,Future}|{Reader,Writer} APIs and internals
This makes a several changes to how `{Stream,Future}|{Reader,Writer}` work to make them more efficient and, in some ways, more ergonomic: - The background tasks have been removed, allowing reads and writes to complete without task context switching. We now only allocate and use oneshot channels lazily when the other end is not yet ready; this improves real world performance benchmarks (e.g. wasi-http request handling) considerably. - Instances of `{Stream,Future}Reader` can now be lifted and lowered directly; no need for `Host{Stream,Future}` anymore. - The type parameter for `Stream{Reader,Writer}` no longer refers to the buffer type -- just the payload type (i.e. `StreamReader<u8>` instead of `StreamReader<Vec<u8>>`), meaning any buffer type may be used for a given read or write operation. This also means the compiler needs help with type inference less often when calling `Instance::stream`. - Instances of `{Stream,Future}|{Reader,Writer}` now require access to the store in order to be disposed of properly. I've added RAII wrapper structs (`WithAccessor[AndValue]`) to help with this, and also updated `Store::drop` and `Instance::run_concurrent` to ensure the store thread-local is set when dropping futures closing over `&Accessor`s. - In order to ensure that resources containing `{Stream,Future}|{Reader,Writer}` instances are disposed of properly, I've added `LinkerInstance::resource_concurrent` and have updated `wasmtime-wit-bindgen` to use it. This gives resource drop functions access to a `StoreContextMut` via an `Accessor`, allowing the stream and future handles to be disposed of. - In order to make this work, I had to change `Accessor::instance` from a `Instance` to an `Option<Instance>`, which is awkward but temporary since we're planning to remove `Accessor::instance` entirely once we've moved concurrent state from `ComponentInstance` to `Store`. That problem of disposal is definitely the most awkward part of all this. In simple cases, it's easy enough to ensure that read and write handles are disposed of properly, but both `wasmtime-wasi` and `wasmtime-wasi-http` have some pretty complicated functions where handles are passed between tasks and/or stored inside resources, so it can be tricky to ensure proper disposal on all code paths. I'm open to ideas for improving this, but I suspect we'll need new Rust language features (e.g. linear types) to make it truly ergonomic, robust, and efficient. While testing the above, I discovered an issue with `Instance::poll_until` such that it would prematurely give up and return a "deadlock" trap error, believing that there was no further work to do, even though the future passed to it was ready to resolve the next time it was polled. I've fixed this by polling it one last time and only trapping if it returns pending. Note that I've moved a few associated functions from `ConcurrentState` to `Instance` (e.g. `guest_drop_writable` and others) since they now need access to the store; they're unchanged otherwise. Apologies for the diff noise. Finally, I've tweaked how `wasmtime serve` to poll the guest for content before handing the response to Hyper, which helps performance by ensuring the first content chunk can be sent with the same TCP packet as the beginning of the response. Signed-off-by: Joel Dice <joel.dice@fermyon.com> fix wasi p3 build and test failures Signed-off-by: Joel Dice <joel.dice@fermyon.com> use `ManuallyDrop` instead of `Option` in `Dropper` This allows us to drop its `value` field in-place, i.e. without moving it, thereby upholding the `Pin` guarantee. Signed-off-by: Joel Dice <joel.dice@fermyon.com> address review comments - Remove `DropWithStoreAndValue` and friends; go back to taking a `fn() -> T` parameter in `Instance::future` instead - Make `DropWithStore::drop[_with]` take `&mut self` instead of `self` - Make `WithAccessor` and `DropWithStore` private - Instead, I've added public `Guarded{Stream,Future}{Reader,Writer}` types for RAII - and also `{Stream,Future}{Reader,Writer}::close[_with]` methods - Use RAII in `FutureReader::read` and `FutureWriter::write` to ensure handles are dropped if the `Future` is dropped Signed-off-by: Joel Dice <joel.dice@fermyon.com>
1 parent 7074afb commit 4194e0a

File tree

15 files changed

+1528
-1391
lines changed

15 files changed

+1528
-1391
lines changed

crates/misc/component-async-tests/src/resource_stream.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use anyhow::Result;
2-
use wasmtime::component::{Accessor, AccessorTask, HostStream, Resource, StreamWriter};
2+
use wasmtime::component::{
3+
Accessor, AccessorTask, GuardedStreamWriter, Resource, StreamReader, StreamWriter,
4+
};
35
use wasmtime_wasi::p2::IoView;
46

57
use super::Ctx;
@@ -36,31 +38,31 @@ impl bindings::local::local::resource_stream::HostWithStore for Ctx {
3638
async fn foo<T: 'static>(
3739
accessor: &Accessor<T, Self>,
3840
count: u32,
39-
) -> wasmtime::Result<HostStream<Resource<ResourceStreamX>>> {
41+
) -> wasmtime::Result<StreamReader<Resource<ResourceStreamX>>> {
4042
struct Task {
41-
tx: StreamWriter<Option<Resource<ResourceStreamX>>>,
43+
tx: StreamWriter<Resource<ResourceStreamX>>,
4244

4345
count: u32,
4446
}
4547

4648
impl<T> AccessorTask<T, Ctx, Result<()>> for Task {
4749
async fn run(self, accessor: &Accessor<T, Ctx>) -> Result<()> {
48-
let mut tx = self.tx;
50+
let mut tx = GuardedStreamWriter::new(accessor, self.tx);
4951
for _ in 0..self.count {
5052
let item =
5153
accessor.with(|mut view| view.get().table().push(ResourceStreamX))?;
52-
tx.write_all(accessor, Some(item)).await;
54+
tx.write_all(Some(item)).await;
5355
}
5456
Ok(())
5557
}
5658
}
5759

5860
let (tx, rx) = accessor.with(|mut view| {
5961
let instance = view.instance();
60-
instance.stream::<_, _, Option<_>>(&mut view)
62+
instance.stream(&mut view)
6163
})?;
6264
accessor.spawn(Task { tx, count });
63-
Ok(rx.into())
65+
Ok(rx)
6466
}
6567
}
6668

0 commit comments

Comments
 (0)