Skip to content

Commit 548e398

Browse files
committed
Revamp component model stream/future host API (again)
This changes the host APIs for dealing with futures and streams from a "rendezvous"-style API to a callback-oriented one. Previously you would create e.g. a `StreamReader`/`StreamWriter` pair and call their `read` and `write` methods, respectively, and those methods would return `Future`s that resolved when the operation was matched with a corresponding `write` or `read` operation on the other end. With the new API, you instead provide a `StreamProducer` trait implementation whe creating the stream, whose `produce` method will be called as soon as a read happens, giving the implementation a chance to respond immediately without making the reader wait for a rendezvous. Likewise, you can match the read end of a stream to a `StreamConsumer` to respond immediately to writes. This model should reduce scheduling overhead and make it easier to e.g. pipe items to/from `AsyncWrite`/`AsyncRead` or `Sink`/`Stream` implementations without needing to explicitly spawn background tasks. In addition, the new API provides direct access to guest read and write buffers for `stream<u8>` operations, enabling zero-copy operations. Other changes: - I've removed the `HostTaskOutput`; we were using it to run extra code with access to the store after a host task completes, but we can do that more elegantly inside the future using `tls::get`. This also allowed me to simplify `Instance::poll_until` a bit. - I've removed the `watch_{reader,writer}` functionality; it's not needed now given that the runtime will automatically dispose of the producer or consumer when the other end of the stream or future is closed -- no need for embedder code to manage that. - In order to make `UntypedWriteBuffer` `Send`, I had to wrap its raw pointer `buf` field in a `SendSyncPtr`. - I've removed `{Future,Stream}Writer` entirely and moved `Instance::{future,stream}` to `{Future,Stream}Reader::new`, respectively. - I've added a bounds check to the beginnings of `Instance::guest_read` and `Instance::guest_write` so that we need not do it later in `Guest{Source,Destination}::remaining`, meaning those functions can be infallible. Note that I haven't updated `wasmtime-wasi` yet to match; that will happen in one or more follow-up commits. Signed-off-by: Joel Dice <joel.dice@fermyon.com>
1 parent 3aa3923 commit 548e398

File tree

12 files changed

+1510
-2003
lines changed

12 files changed

+1510
-2003
lines changed

crates/misc/component-async-tests/src/resource_stream.rs

Lines changed: 11 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1+
use crate::util::MpscProducer;
12
use anyhow::Result;
2-
use wasmtime::component::{
3-
Accessor, AccessorTask, GuardedStreamWriter, Resource, StreamReader, StreamWriter,
4-
};
3+
use futures::channel::mpsc;
4+
use wasmtime::component::{Accessor, Resource, StreamReader};
55

66
use super::Ctx;
77

@@ -38,29 +38,15 @@ impl bindings::local::local::resource_stream::HostWithStore for Ctx {
3838
accessor: &Accessor<T, Self>,
3939
count: u32,
4040
) -> wasmtime::Result<StreamReader<Resource<ResourceStreamX>>> {
41-
struct Task {
42-
tx: StreamWriter<Resource<ResourceStreamX>>,
43-
44-
count: u32,
45-
}
46-
47-
impl<T> AccessorTask<T, Ctx, Result<()>> for Task {
48-
async fn run(self, accessor: &Accessor<T, Ctx>) -> Result<()> {
49-
let mut tx = GuardedStreamWriter::new(accessor, self.tx);
50-
for _ in 0..self.count {
51-
let item = accessor.with(|mut view| view.get().table.push(ResourceStreamX))?;
52-
tx.write_all(Some(item)).await;
53-
}
54-
Ok(())
41+
accessor.with(|mut access| {
42+
let (mut tx, rx) = mpsc::channel(usize::try_from(count).unwrap());
43+
for _ in 0..count {
44+
tx.try_send(access.get().table.push(ResourceStreamX)?)
45+
.unwrap()
5546
}
56-
}
57-
58-
let (tx, rx) = accessor.with(|mut view| {
59-
let instance = view.instance();
60-
instance.stream(&mut view)
61-
})?;
62-
accessor.spawn(Task { tx, count });
63-
Ok(rx)
47+
let instance = access.instance();
48+
Ok(StreamReader::new(instance, access, MpscProducer::new(rx)))
49+
})
6450
}
6551
}
6652

crates/misc/component-async-tests/src/util.rs

Lines changed: 117 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
1-
use futures::channel::oneshot;
1+
use anyhow::Result;
2+
use futures::{
3+
SinkExt, StreamExt,
4+
channel::{mpsc, oneshot},
5+
};
26
use std::thread;
7+
use wasmtime::component::{
8+
Accessor, Destination, FutureConsumer, FutureProducer, Lift, Lower, Source, StreamConsumer,
9+
StreamProducer, StreamState,
10+
};
311

412
pub async fn sleep(duration: std::time::Duration) {
513
if cfg!(miri) {
@@ -21,3 +29,111 @@ pub async fn sleep(duration: std::time::Duration) {
2129
tokio::time::sleep(duration).await;
2230
}
2331
}
32+
33+
pub struct MpscProducer<T> {
34+
rx: mpsc::Receiver<T>,
35+
closed: bool,
36+
}
37+
38+
impl<T: Send + Sync + 'static> MpscProducer<T> {
39+
pub fn new(rx: mpsc::Receiver<T>) -> Self {
40+
Self { rx, closed: false }
41+
}
42+
43+
fn state(&self) -> StreamState {
44+
if self.closed {
45+
StreamState::Closed
46+
} else {
47+
StreamState::Open
48+
}
49+
}
50+
}
51+
52+
impl<D, T: Send + Sync + Lower + 'static> StreamProducer<D, T> for MpscProducer<T> {
53+
async fn produce(
54+
&mut self,
55+
accessor: &Accessor<D>,
56+
destination: &mut Destination<T>,
57+
) -> Result<StreamState> {
58+
if let Some(item) = self.rx.next().await {
59+
let item = destination.write(accessor, Some(item)).await?;
60+
assert!(item.is_none());
61+
} else {
62+
self.closed = true;
63+
}
64+
65+
Ok(self.state())
66+
}
67+
68+
async fn when_ready(&mut self, _: &Accessor<D>) -> Result<StreamState> {
69+
Ok(self.state())
70+
}
71+
}
72+
73+
pub struct MpscConsumer<T> {
74+
tx: mpsc::Sender<T>,
75+
}
76+
77+
impl<T> MpscConsumer<T> {
78+
pub fn new(tx: mpsc::Sender<T>) -> Self {
79+
Self { tx }
80+
}
81+
82+
fn state(&self) -> StreamState {
83+
if self.tx.is_closed() {
84+
StreamState::Closed
85+
} else {
86+
StreamState::Open
87+
}
88+
}
89+
}
90+
91+
impl<D, T: Lift + 'static> StreamConsumer<D, T> for MpscConsumer<T> {
92+
async fn consume(
93+
&mut self,
94+
accessor: &Accessor<D>,
95+
source: &mut Source<'_, T>,
96+
) -> Result<StreamState> {
97+
let item = &mut None;
98+
accessor.with(|access| source.read(access, item))?;
99+
_ = self.tx.send(item.take().unwrap()).await;
100+
Ok(self.state())
101+
}
102+
103+
async fn when_ready(&mut self, _: &Accessor<D>) -> Result<StreamState> {
104+
Ok(self.state())
105+
}
106+
}
107+
108+
pub struct OneshotProducer<T> {
109+
rx: oneshot::Receiver<T>,
110+
}
111+
112+
impl<T> OneshotProducer<T> {
113+
pub fn new(rx: oneshot::Receiver<T>) -> Self {
114+
Self { rx }
115+
}
116+
}
117+
118+
impl<D, T: Send + 'static> FutureProducer<D, T> for OneshotProducer<T> {
119+
async fn produce(self, _: &Accessor<D>) -> Result<T> {
120+
Ok(self.rx.await?)
121+
}
122+
}
123+
124+
pub struct OneshotConsumer<T> {
125+
tx: oneshot::Sender<T>,
126+
}
127+
128+
impl<T> OneshotConsumer<T> {
129+
pub fn new(tx: oneshot::Sender<T>) -> Self {
130+
Self { tx }
131+
}
132+
}
133+
134+
impl<D, T: Send + 'static> FutureConsumer<D, T> for OneshotConsumer<T> {
135+
async fn consume(self, _: &Accessor<D>, value: T) -> Result<()> {
136+
_ = self.tx.send(value);
137+
Ok(())
138+
}
139+
}

crates/misc/component-async-tests/tests/scenario/round_trip.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ pub async fn test_round_trip(
237237
component_async_tests::round_trip::bindings::RoundTrip::new(&mut store, &instance)?;
238238

239239
if call_style == 0 || !cfg!(miri) {
240-
// Now do it again using `Instance::run_concurrent`:
240+
// Run the test using `Instance::run_concurrent`:
241241
instance
242242
.run_concurrent(&mut store, {
243243
let inputs_and_outputs = inputs_and_outputs

0 commit comments

Comments
 (0)