Skip to content

Commit c933bdb

Browse files
committed
feat(p3-http): implement consume-body changes
Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
1 parent d1139ae commit c933bdb

File tree

10 files changed

+88
-122
lines changed

10 files changed

+88
-122
lines changed

ci/vendor-wit.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ make_vendor "wasi-http/src/p3" "
8181
cli@v0.3.0-rc-2025-08-15@wit-0.3.0-draft
8282
clocks@v0.3.0-rc-2025-08-15@wit-0.3.0-draft
8383
filesystem@v0.3.0-rc-2025-08-15@wit-0.3.0-draft
84-
http@v0.3.0-rc-2025-08-15@wit-0.3.0-draft
84+
http@9cbc38b73433d1b0d08174aaabf0d5d0421ba87e@wit-0.3.0-draft
8585
random@v0.3.0-rc-2025-08-15@wit-0.3.0-draft
8686
sockets@v0.3.0-rc-2025-08-15@wit-0.3.0-draft
8787
"

crates/test-programs/src/bin/p3_http_echo.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,8 @@ impl Handler for Component {
1515
/// Return a response which echoes the request headers, body, and trailers.
1616
async fn handle(request: Request) -> Result<Response, ErrorCode> {
1717
let headers = request.get_headers();
18-
let (body, trailers) = request.consume_body().unwrap();
19-
20-
// let (headers, body) = Request::into_parts(request);
18+
let (_, result_rx) = wit_future::new(|| Ok(()));
19+
let (body, trailers) = Request::consume_body(request, result_rx);
2120

2221
let (response, _result) = if false {
2322
// This is the easy and efficient way to do it...
@@ -47,7 +46,6 @@ impl Handler for Component {
4746
drop(pipe_tx);
4847

4948
trailers_tx.write(trailers.await).await.unwrap();
50-
drop(request);
5149
});
5250

5351
Response::new(headers, Some(pipe_rx), trailers_rx)

crates/test-programs/src/bin/p3_http_middleware.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ impl Handler for Component {
4848
}
4949
_ => true,
5050
});
51-
let (mut body, trailers) = request.consume_body().unwrap();
51+
let (_, result_rx) = wit_future::new(|| Ok(()));
52+
let (mut body, trailers) = Request::consume_body(request, result_rx);
5253

5354
let (body, trailers) = if content_deflated {
5455
// Next, spawn a task to pipe and decode the original request body and trailers into a new request
@@ -77,8 +78,6 @@ impl Handler for Component {
7778
}
7879

7980
trailers_tx.write(trailers.await).await.unwrap();
80-
81-
drop(request);
8281
});
8382

8483
(pipe_rx, trailers_rx)
@@ -110,7 +109,8 @@ impl Handler for Component {
110109
headers.push(("content-encoding".into(), b"deflate".into()));
111110
}
112111

113-
let (mut body, trailers) = response.consume_body().unwrap();
112+
let (_, result_rx) = wit_future::new(|| Ok(()));
113+
let (mut body, trailers) = Response::consume_body(response, result_rx);
114114
let (body, trailers) = if accept_deflated {
115115
headers.retain(|(name, _value)| name != "content-length");
116116

@@ -141,7 +141,6 @@ impl Handler for Component {
141141
}
142142

143143
trailers_tx.write(trailers.await).await.unwrap();
144-
drop(response);
145144
});
146145

147146
(pipe_rx, trailers_rx)

crates/test-programs/src/p3/http.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,8 @@ pub async fn request(
9494
let response = handler::handle(request).await?;
9595
let status = response.get_status_code();
9696
let headers = response.get_headers().copy_all();
97-
let (body_rx, trailers_rx) = response
98-
.consume_body()
99-
.expect("failed to get response body");
97+
let (_, result_rx) = wit_future::new(|| Ok(()));
98+
let (body_rx, trailers_rx) = types::Response::consume_body(response, result_rx);
10099
let ((), rx) = join!(
101100
async {
102101
if let Some(buf) = body {

crates/wasi-http/src/p3/bindings.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@ mod generated {
99
"wasi:http/handler/[async]handle": async | store | trappable | tracing,
1010
"wasi:http/types/[drop]request": store | trappable | tracing,
1111
"wasi:http/types/[drop]response": store | trappable | tracing,
12-
"wasi:http/types/[method]request.consume-body": async | store | trappable | tracing,
13-
"wasi:http/types/[method]response.consume-body": async | store | trappable | tracing,
12+
"wasi:http/types/[static]request.consume-body": async | store | trappable | tracing,
1413
"wasi:http/types/[static]request.new": async | store | trappable | tracing,
14+
"wasi:http/types/[static]response.consume-body": async | store | trappable | tracing,
1515
"wasi:http/types/[static]response.new": async | store | trappable | tracing,
1616
default: trappable | tracing,
1717
},

crates/wasi-http/src/p3/body.rs

Lines changed: 43 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -37,55 +37,72 @@ pub(crate) enum Body {
3737
/// Channel, on which transmission result will be written
3838
result_tx: oneshot::Sender<Box<dyn Future<Output = Result<(), ErrorCode>> + Send>>,
3939
},
40-
/// Body is consumed.
41-
Consumed,
40+
}
41+
42+
/// [FutureConsumer] implementation for future passed to `consume-body`.
43+
struct BodyResultConsumer(
44+
Option<oneshot::Sender<Box<dyn Future<Output = Result<(), ErrorCode>> + Send>>>,
45+
);
46+
47+
impl<D> FutureConsumer<D> for BodyResultConsumer
48+
where
49+
D: 'static,
50+
{
51+
type Item = Result<(), ErrorCode>;
52+
53+
fn poll_consume(
54+
mut self: Pin<&mut Self>,
55+
_: &mut Context<'_>,
56+
store: StoreContextMut<D>,
57+
mut src: Source<'_, Self::Item>,
58+
_: bool,
59+
) -> Poll<wasmtime::Result<()>> {
60+
let mut res = None;
61+
src.read(store, &mut res).context("failed to read result")?;
62+
let res = res.context("result value missing")?;
63+
let tx = self.0.take().context("polled after returning `Ready`")?;
64+
_ = tx.send(Box::new(async { res }));
65+
Poll::Ready(Ok(()))
66+
}
4267
}
4368

4469
impl Body {
4570
/// Implementation of `consume-body` shared between requests and responses
4671
pub(crate) fn consume<T>(
4772
self,
4873
mut store: Access<'_, T, WasiHttp>,
74+
fut: FutureReader<Result<(), ErrorCode>>,
4975
getter: fn(&mut T) -> WasiHttpCtxView<'_>,
50-
) -> Result<
51-
(
52-
StreamReader<u8>,
53-
FutureReader<Result<Option<Resource<Trailers>>, ErrorCode>>,
54-
),
55-
(),
56-
> {
76+
) -> (
77+
StreamReader<u8>,
78+
FutureReader<Result<Option<Resource<Trailers>>, ErrorCode>>,
79+
) {
5780
match self {
5881
Body::Guest {
5982
contents_rx: Some(contents_rx),
6083
trailers_rx,
6184
result_tx,
6285
} => {
63-
// TODO: Use a result specified by the caller
64-
// https://github.com/WebAssembly/wasi-http/issues/176
65-
_ = result_tx.send(Box::new(async { Ok(()) }));
66-
Ok((contents_rx, trailers_rx))
86+
fut.pipe(&mut store, BodyResultConsumer(Some(result_tx)));
87+
(contents_rx, trailers_rx)
6788
}
6889
Body::Guest {
6990
contents_rx: None,
7091
trailers_rx,
7192
result_tx,
7293
} => {
94+
fut.pipe(&mut store, BodyResultConsumer(Some(result_tx)));
7395
let instance = store.instance();
74-
// TODO: Use a result specified by the caller
75-
// https://github.com/WebAssembly/wasi-http/issues/176
76-
_ = result_tx.send(Box::new(async { Ok(()) }));
77-
Ok((
96+
(
7897
StreamReader::new(instance, &mut store, StreamEmptyProducer::default()),
7998
trailers_rx,
80-
))
99+
)
81100
}
82101
Body::Host { body, result_tx } => {
102+
fut.pipe(&mut store, BodyResultConsumer(Some(result_tx)));
83103
let instance = store.instance();
84-
// TODO: Use a result specified by the caller
85-
// https://github.com/WebAssembly/wasi-http/issues/176
86-
_ = result_tx.send(Box::new(async { Ok(()) }));
87104
let (trailers_tx, trailers_rx) = oneshot::channel();
88-
Ok((
105+
(
89106
StreamReader::new(
90107
instance,
91108
&mut store,
@@ -100,9 +117,8 @@ impl Body {
100117
&mut store,
101118
FutureOneshotProducer::from(trailers_rx),
102119
),
103-
))
120+
)
104121
}
105-
Body::Consumed => Err(()),
106122
}
107123
}
108124

@@ -367,31 +383,6 @@ impl http_body::Body for GuestBody {
367383
}
368384
}
369385

370-
/// [http_body::Body] that has been consumed.
371-
pub(crate) struct ConsumedBody;
372-
373-
impl http_body::Body for ConsumedBody {
374-
type Data = Bytes;
375-
type Error = ErrorCode;
376-
377-
fn poll_frame(
378-
self: Pin<&mut Self>,
379-
_cx: &mut Context<'_>,
380-
) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
381-
Poll::Ready(Some(Err(ErrorCode::InternalError(Some(
382-
"body consumed".into(),
383-
)))))
384-
}
385-
386-
fn is_end_stream(&self) -> bool {
387-
true
388-
}
389-
390-
fn size_hint(&self) -> http_body::SizeHint {
391-
http_body::SizeHint::with_exact(0)
392-
}
393-
}
394-
395386
/// [FutureConsumer] implementation for trailers originating in the guest.
396387
struct GuestTrailerConsumer<T> {
397388
tx: Option<oneshot::Sender<Result<Option<Arc<HeaderMap>>, ErrorCode>>>,
@@ -411,10 +402,10 @@ where
411402
mut src: Source<'_, Self::Item>,
412403
_: bool,
413404
) -> Poll<wasmtime::Result<()>> {
414-
let mut result = None;
415-
src.read(store.as_context_mut(), &mut result)
405+
let mut res = None;
406+
src.read(&mut store, &mut res)
416407
.context("failed to read result")?;
417-
let res = match result.context("result value missing")? {
408+
let res = match res.context("result value missing")? {
418409
Ok(Some(trailers)) => {
419410
let WasiHttpCtxView { table, .. } = (self.getter)(store.data_mut());
420411
let trailers = table

crates/wasi-http/src/p3/host/handler.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::p3::bindings::http::handler::{Host, HostWithStore};
22
use crate::p3::bindings::http::types::{ErrorCode, Request, Response};
3-
use crate::p3::body::{Body, BodyKind, ConsumedBody, GuestBody};
3+
use crate::p3::body::{Body, BodyKind, GuestBody};
44
use crate::p3::{HttpError, HttpResult, WasiHttp, WasiHttpCtxView, get_content_length};
55
use anyhow::Context as _;
66
use core::pin::Pin;
@@ -160,7 +160,6 @@ impl HostWithStore for WasiHttp {
160160
_ = result_tx.send(Box::new(io_task_result(io_result_rx)));
161161
body.with_state(io_task_rx).boxed()
162162
}
163-
Body::Consumed => ConsumedBody.boxed(),
164163
};
165164

166165
let WasiHttpCtxView { ctx, .. } = store.get();

crates/wasi-http/src/p3/host/types.rs

Lines changed: 22 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use crate::p3::bindings::http::types::{
77
use crate::p3::body::Body;
88
use crate::p3::{HeaderResult, HttpError, RequestOptionsResult, WasiHttp, WasiHttpCtxView};
99
use anyhow::Context as _;
10-
use core::mem;
1110
use core::pin::Pin;
1211
use core::task::{Context, Poll};
1312
use http::header::CONTENT_LENGTH;
@@ -347,20 +346,19 @@ impl HostRequestWithStore for WasiHttp {
347346
async fn consume_body<T>(
348347
store: &Accessor<T, Self>,
349348
req: Resource<Request>,
350-
) -> wasmtime::Result<
351-
Result<
352-
(
353-
StreamReader<u8>,
354-
FutureReader<Result<Option<Resource<Trailers>>, ErrorCode>>,
355-
),
356-
(),
357-
>,
358-
> {
349+
fut: FutureReader<Result<(), ErrorCode>>,
350+
) -> wasmtime::Result<(
351+
StreamReader<u8>,
352+
FutureReader<Result<Option<Resource<Trailers>>, ErrorCode>>,
353+
)> {
359354
let getter = store.getter();
360355
store.with(|mut store| {
361-
let Request { body, .. } = get_request_mut(store.get().table, &req)?;
362-
let body = mem::replace(body, Body::Consumed);
363-
Ok(body.consume(store, getter))
356+
let Request { body, .. } = store
357+
.get()
358+
.table
359+
.delete(req)
360+
.context("failed to delete request from table")?;
361+
Ok(body.consume(store, fut, getter))
364362
})
365363
}
366364

@@ -624,20 +622,19 @@ impl HostResponseWithStore for WasiHttp {
624622
async fn consume_body<T>(
625623
store: &Accessor<T, Self>,
626624
res: Resource<Response>,
627-
) -> wasmtime::Result<
628-
Result<
629-
(
630-
StreamReader<u8>,
631-
FutureReader<Result<Option<Resource<Trailers>>, ErrorCode>>,
632-
),
633-
(),
634-
>,
635-
> {
625+
fut: FutureReader<Result<(), ErrorCode>>,
626+
) -> wasmtime::Result<(
627+
StreamReader<u8>,
628+
FutureReader<Result<Option<Resource<Trailers>>, ErrorCode>>,
629+
)> {
636630
let getter = store.getter();
637631
store.with(|mut store| {
638-
let Response { body, .. } = get_response_mut(store.get().table, &res)?;
639-
let body = mem::replace(body, Body::Consumed);
640-
Ok(body.consume(store, getter))
632+
let Response { body, .. } = store
633+
.get()
634+
.table
635+
.delete(res)
636+
.context("failed to delete response from table")?;
637+
Ok(body.consume(store, fut, getter))
641638
})
642639
}
643640

crates/wasi-http/src/p3/response.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::p3::bindings::http::types::ErrorCode;
2-
use crate::p3::body::{Body, BodyKind, ConsumedBody, GuestBody};
2+
use crate::p3::body::{Body, BodyKind, GuestBody};
33
use crate::p3::{WasiHttpView, get_content_length};
44
use anyhow::Context as _;
55
use bytes::Bytes;
@@ -81,7 +81,6 @@ impl Response {
8181
_ = result_tx.send(Box::new(fut));
8282
body
8383
}
84-
Body::Consumed => ConsumedBody.boxed(),
8584
};
8685
Ok(http::Response::from_parts(res, body))
8786
}

crates/wasi-http/src/p3/wit/deps/http/types.wit

Lines changed: 10 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -316,20 +316,12 @@ interface types {
316316
/// future to determine whether the body was received successfully.
317317
/// The future will only resolve after the stream is reported as closed.
318318
///
319-
/// The stream and future returned by this method are children:
320-
/// they should be closed or consumed before the parent `response`
321-
/// is dropped, or its ownership is transferred to another component
322-
/// by e.g. `handler.handle`.
319+
/// This function takes a `res` future as a parameter, which can be used to
320+
/// communicate an error in handling of the request.
323321
///
324-
/// This method may be called multiple times.
325-
///
326-
/// This method will return an error if it is called while either:
327-
/// - a stream or future returned by a previous call to this method is still open
328-
/// - a stream returned by a previous call to this method has reported itself as closed
329-
/// Thus there will always be at most one readable stream open for a given body.
330-
/// Each subsequent stream picks up where the previous one left off,
331-
/// continuing until the entire body has been consumed.
332-
consume-body: func() -> result<tuple<stream<u8>, future<result<option<trailers>, error-code>>>>;
322+
/// Note that function will move the `request`, but references to headers or
323+
/// request options acquired from it previously will remain valid.
324+
consume-body: static func(this: request, res: future<result<_, error-code>>) -> tuple<stream<u8>, future<result<option<trailers>, error-code>>>;
333325
}
334326

335327
/// Parameters for making an HTTP Request. Each of these parameters is
@@ -417,19 +409,11 @@ interface types {
417409
/// future to determine whether the body was received successfully.
418410
/// The future will only resolve after the stream is reported as closed.
419411
///
420-
/// The stream and future returned by this method are children:
421-
/// they should be closed or consumed before the parent `response`
422-
/// is dropped, or its ownership is transferred to another component
423-
/// by e.g. `handler.handle`.
424-
///
425-
/// This method may be called multiple times.
412+
/// This function takes a `res` future as a parameter, which can be used to
413+
/// communicate an error in handling of the response.
426414
///
427-
/// This method will return an error if it is called while either:
428-
/// - a stream or future returned by a previous call to this method is still open
429-
/// - a stream returned by a previous call to this method has reported itself as closed
430-
/// Thus there will always be at most one readable stream open for a given body.
431-
/// Each subsequent stream picks up where the previous one left off,
432-
/// continuing until the entire body has been consumed.
433-
consume-body: func() -> result<tuple<stream<u8>, future<result<option<trailers>, error-code>>>>;
415+
/// Note that function will move the `response`, but references to headers
416+
/// acquired from it previously will remain valid.
417+
consume-body: static func(this: response, res: future<result<_, error-code>>) -> tuple<stream<u8>, future<result<option<trailers>, error-code>>>;
434418
}
435419
}

0 commit comments

Comments
 (0)