Skip to content

Commit 5b4ef94

Browse files
alexcrichtondicejrvolosatovs
authored
[37.0.0] More WASIp3 backports (#11654)
* support non-async `{stream,future}.cancel-{read,write}` (#11625) * support non-async `{stream,future}.cancel-{read,write}` During my earlier stream API refactoring, I had forgotten to support or test synchronous cancellation; this commit does both. In the process, I realized the future API ought to be updated to support blocking cancellation just like the stream API, so I made that change as well. This also adds `{Source,Destination}::reborrow` functions, allowing instances of those types to be reborrowed, such that they may be passed as parameters but also used again. Note that I had to move some functions from `impl ConcurrentState` to `impl Instance` in order to access the store and suspend the current fiber when synchronously cancelling. Signed-off-by: Joel Dice <joel.dice@fermyon.com> * reduce code duplication Signed-off-by: Joel Dice <joel.dice@fermyon.com> --------- Signed-off-by: Joel Dice <joel.dice@fermyon.com> * support and test synchronous `{stream,future}.cancel-{read,write}` (#11645) * support and test synchronous `{stream,future}.cancel-{read,write}` Previously, we only supported async calls to those intrinsics; now we support blocking, synchronous calls as well. Signed-off-by: Joel Dice <joel.dice@fermyon.com> * update future-read.wast test Signed-off-by: Joel Dice <joel.dice@fermyon.com> --------- Signed-off-by: Joel Dice <joel.dice@fermyon.com> * p3-http: finish `wasi:http@0.3` implementation (#11636) * refactor(p3-http): use trappable errors Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> * feat(p3-http): implement `content-length` handling Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> * refactor(p3-http): remove a few resource utilities Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> * remove unused test import Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> * fix(p3-http): close stream handles on drop Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> * test(p3-http): stream responses back This is something we've been doing in wasip3, but I forgot to port this over Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> * doc(p3-http): add missing docs, internalize more, simplify Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> * refactor(p3-http): extract `Body::consume` Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> * refactor(p3-http): clean-up `content-length` error reporting Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> * refactor(p3-http): drop elided lifetime Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> * fix(p3-http): avoid guest body deadlock hazard Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> * refactor(p3-http): add more docs, clean-up Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> * doc(p3-http): add more docs Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> * fix(p3-http): rework result future handling Most importantly this avoids a race condition between `content-length` error observed by `GuestBody` and hyper I/O driver Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> * add new imports after rebase Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> * clean-up `poll_consume` Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> * assert content-length `handle` results Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> * relax `content_length` test `handle` assert Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> --------- Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> * p3-http: implementation follow-up (#11649) * p3: refactor future producers/consumers Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> * p3-http: tie lifetime of the spawned task to the bodies Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> * p3-http: improve docs Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> --------- Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> * Ignore a wasip3 http test temporarily (#11657) Filed #11656 to track the eventual resolution. * don't delete sync-lowered subtasks unless they've exited (#11655) Previously, we were unconditionally deleting the callee subtask once it returned a value to a sync-lowered call, but that's only appropriate if the subtask has exited. Otherwise, it needs to keep running and only be deleted once it actually exits. Thanks to Luke for the `sync-streams.wast` test that uncovered this, which I've copied from the `component-model` repo. This also makes a couple of debug logging tweaks that proved useful while investigating the above issue. Signed-off-by: Joel Dice <joel.dice@fermyon.com> --------- Signed-off-by: Joel Dice <joel.dice@fermyon.com> Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> Co-authored-by: Joel Dice <joel.dice@fermyon.com> Co-authored-by: Roman Volosatovs <rvolosatovs@users.noreply.github.com>
1 parent e9b656c commit 5b4ef94

File tree

29 files changed

+2826
-1208
lines changed

29 files changed

+2826
-1208
lines changed

crates/misc/component-async-tests/src/util.rs

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use std::{
99
use wasmtime::{
1010
StoreContextMut,
1111
component::{
12-
Accessor, Destination, FutureConsumer, FutureProducer, Lift, Lower, Source, StreamConsumer,
12+
Destination, FutureConsumer, FutureProducer, Lift, Lower, Source, StreamConsumer,
1313
StreamProducer, StreamResult,
1414
},
1515
};
@@ -139,24 +139,41 @@ impl<T> OneshotProducer<T> {
139139
impl<D, T: Send + 'static> FutureProducer<D> for OneshotProducer<T> {
140140
type Item = T;
141141

142-
async fn produce(self, _: &Accessor<D>) -> Result<T> {
143-
Ok(self.0.await?)
142+
fn poll_produce(
143+
self: Pin<&mut Self>,
144+
cx: &mut Context<'_>,
145+
_: StoreContextMut<D>,
146+
finish: bool,
147+
) -> Poll<Result<Option<T>>> {
148+
match Pin::new(&mut self.get_mut().0).poll(cx) {
149+
Poll::Pending if finish => Poll::Ready(Ok(None)),
150+
Poll::Pending => Poll::Pending,
151+
Poll::Ready(result) => Poll::Ready(Ok(Some(result?))),
152+
}
144153
}
145154
}
146155

147-
pub struct OneshotConsumer<T>(oneshot::Sender<T>);
156+
pub struct OneshotConsumer<T>(Option<oneshot::Sender<T>>);
148157

149158
impl<T> OneshotConsumer<T> {
150159
pub fn new(tx: oneshot::Sender<T>) -> Self {
151-
Self(tx)
160+
Self(Some(tx))
152161
}
153162
}
154163

155-
impl<D, T: Send + 'static> FutureConsumer<D> for OneshotConsumer<T> {
164+
impl<D, T: Lift + Send + 'static> FutureConsumer<D> for OneshotConsumer<T> {
156165
type Item = T;
157166

158-
async fn consume(self, _: &Accessor<D>, value: T) -> Result<()> {
159-
_ = self.0.send(value);
160-
Ok(())
167+
fn poll_consume(
168+
self: Pin<&mut Self>,
169+
_: &mut Context<'_>,
170+
store: StoreContextMut<D>,
171+
mut source: Source<'_, T>,
172+
_: bool,
173+
) -> Poll<Result<()>> {
174+
let value = &mut None;
175+
source.read(store, value)?;
176+
_ = self.get_mut().0.take().unwrap().send(value.take().unwrap());
177+
Poll::Ready(Ok(()))
161178
}
162179
}

0 commit comments

Comments
 (0)