Skip to content

Commit ed62e2e

Browse files
committed
p3: refactor future producers/consumers
Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
1 parent f3d7256 commit ed62e2e

File tree

3 files changed

+43
-29
lines changed

3 files changed

+43
-29
lines changed

crates/wasi-http/src/p3/body.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -372,9 +372,9 @@ impl http_body::Body for ConsumedBody {
372372
}
373373
}
374374

375-
pub(crate) struct GuestTrailerConsumer<T> {
376-
pub(crate) tx: Option<oneshot::Sender<Result<Option<Arc<HeaderMap>>, ErrorCode>>>,
377-
pub(crate) getter: fn(&mut T) -> WasiHttpCtxView<'_>,
375+
struct GuestTrailerConsumer<T> {
376+
tx: Option<oneshot::Sender<Result<Option<Arc<HeaderMap>>, ErrorCode>>>,
377+
getter: fn(&mut T) -> WasiHttpCtxView<'_>,
378378
}
379379

380380
impl<D> FutureConsumer<D> for GuestTrailerConsumer<D>
@@ -387,12 +387,13 @@ where
387387
mut self: Pin<&mut Self>,
388388
_: &mut Context<'_>,
389389
mut store: StoreContextMut<D>,
390-
mut source: Source<'_, Self::Item>,
390+
mut src: Source<'_, Self::Item>,
391391
_: bool,
392392
) -> Poll<wasmtime::Result<()>> {
393-
let value = &mut None;
394-
source.read(store.as_context_mut(), value)?;
395-
let res = match value.take().unwrap() {
393+
let mut result = None;
394+
src.read(store.as_context_mut(), &mut result)
395+
.context("failed to read result")?;
396+
let res = match result.context("result value missing")? {
396397
Ok(Some(trailers)) => {
397398
let WasiHttpCtxView { table, .. } = (self.getter)(store.data_mut());
398399
let trailers = table

crates/wasi-http/src/p3/host/types.rs

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -129,34 +129,41 @@ fn parse_header_value(
129129
}
130130
}
131131

132-
struct GuestBodyResultProducer(
133-
Pin<Box<dyn Future<Output = wasmtime::Result<Result<(), ErrorCode>>> + Send>>,
134-
);
135-
136-
impl GuestBodyResultProducer {
137-
fn new(rx: oneshot::Receiver<Box<dyn Future<Output = Result<(), ErrorCode>> + Send>>) -> Self {
138-
Self(Box::pin(async move {
139-
let Ok(fut) = rx.await else {
140-
return Ok(Ok(()));
141-
};
142-
Ok(Box::into_pin(fut).await)
143-
}))
144-
}
132+
enum GuestBodyResultProducer {
133+
Receiver(oneshot::Receiver<Box<dyn Future<Output = Result<(), ErrorCode>> + Send>>),
134+
Future(Pin<Box<dyn Future<Output = Result<(), ErrorCode>> + Send>>),
145135
}
146136

147137
impl<D> FutureProducer<D> for GuestBodyResultProducer {
148138
type Item = Result<(), ErrorCode>;
149139

150140
fn poll_produce(
151-
self: Pin<&mut Self>,
141+
mut self: Pin<&mut Self>,
152142
cx: &mut Context<'_>,
153143
_: StoreContextMut<D>,
154144
finish: bool,
155145
) -> Poll<wasmtime::Result<Option<Self::Item>>> {
156-
match Pin::new(&mut self.get_mut().0).poll(cx) {
157-
Poll::Pending if finish => Poll::Ready(Ok(None)),
158-
Poll::Pending => Poll::Pending,
159-
Poll::Ready(result) => Poll::Ready(Ok(Some(result?))),
146+
match &mut *self {
147+
Self::Receiver(rx) => match Pin::new(rx).poll(cx) {
148+
Poll::Ready(Ok(fut)) => {
149+
let mut fut = Box::into_pin(fut);
150+
match fut.as_mut().poll(cx) {
151+
Poll::Ready(res) => Poll::Ready(Ok(Some(res))),
152+
Poll::Pending => {
153+
*self = Self::Future(fut);
154+
Poll::Pending
155+
}
156+
}
157+
}
158+
Poll::Ready(Err(..)) => Poll::Ready(Ok(Some(Ok(())))),
159+
Poll::Pending if finish => Poll::Ready(Ok(None)),
160+
Poll::Pending => Poll::Pending,
161+
},
162+
Self::Future(fut) => match fut.as_mut().poll(cx) {
163+
Poll::Ready(res) => Poll::Ready(Ok(Some(res))),
164+
Poll::Pending if finish => Poll::Ready(Ok(None)),
165+
Poll::Pending => Poll::Pending,
166+
},
160167
}
161168
}
162169
}
@@ -332,7 +339,7 @@ impl HostRequestWithStore for WasiHttp {
332339
FutureReader::new(
333340
instance,
334341
&mut store,
335-
GuestBodyResultProducer::new(result_rx),
342+
GuestBodyResultProducer::Receiver(result_rx),
336343
),
337344
))
338345
})
@@ -609,7 +616,7 @@ impl HostResponseWithStore for WasiHttp {
609616
FutureReader::new(
610617
instance,
611618
&mut store,
612-
GuestBodyResultProducer::new(result_rx),
619+
GuestBodyResultProducer::Receiver(result_rx),
613620
),
614621
))
615622
})

crates/wasi/src/p3/mod.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,12 @@ where
6767
_: StoreContextMut<D>,
6868
_: bool,
6969
) -> Poll<wasmtime::Result<Option<T>>> {
70-
Poll::Ready(Ok(Some(self.get_mut().0.take().unwrap())))
70+
let v = self
71+
.get_mut()
72+
.0
73+
.take()
74+
.context("polled after returning `Ready`")?;
75+
Poll::Ready(Ok(Some(v)))
7176
}
7277
}
7378

@@ -92,9 +97,10 @@ where
9297
finish: bool,
9398
) -> Poll<wasmtime::Result<Option<T>>> {
9499
match Pin::new(&mut self.get_mut().0).poll(cx) {
100+
Poll::Ready(Ok(v)) => Poll::Ready(Ok(Some(v))),
101+
Poll::Ready(Err(err)) => Poll::Ready(Err(err).context("oneshot sender dropped")),
95102
Poll::Pending if finish => Poll::Ready(Ok(None)),
96103
Poll::Pending => Poll::Pending,
97-
Poll::Ready(result) => Poll::Ready(Ok(Some(result.context("oneshot sender dropped")?))),
98104
}
99105
}
100106
}

0 commit comments

Comments
 (0)