Skip to content

Commit 263e690

Browse files
committed
feat(p3-http): implement consume-body changes
Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
1 parent 625a3e3 commit 263e690

File tree

10 files changed

+88
-122
lines changed

10 files changed

+88
-122
lines changed

ci/vendor-wit.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ make_vendor "wasi-http/src/p3" "
8181
cli@v0.3.0-rc-2025-08-15@wit-0.3.0-draft
8282
clocks@v0.3.0-rc-2025-08-15@wit-0.3.0-draft
8383
filesystem@v0.3.0-rc-2025-08-15@wit-0.3.0-draft
84-
http@v0.3.0-rc-2025-08-15@wit-0.3.0-draft
84+
http@9cbc38b73433d1b0d08174aaabf0d5d0421ba87e@wit-0.3.0-draft
8585
random@v0.3.0-rc-2025-08-15@wit-0.3.0-draft
8686
sockets@v0.3.0-rc-2025-08-15@wit-0.3.0-draft
8787
"

crates/test-programs/src/bin/p3_http_echo.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,8 @@ impl Handler for Component {
1515
/// Return a response which echoes the request headers, body, and trailers.
1616
async fn handle(request: Request) -> Result<Response, ErrorCode> {
1717
let headers = request.get_headers();
18-
let (body, trailers) = request.consume_body().unwrap();
19-
20-
// let (headers, body) = Request::into_parts(request);
18+
let (_, result_rx) = wit_future::new(|| Ok(()));
19+
let (body, trailers) = Request::consume_body(request, result_rx);
2120

2221
let (response, _result) = if false {
2322
// This is the easy and efficient way to do it...
@@ -47,7 +46,6 @@ impl Handler for Component {
4746
drop(pipe_tx);
4847

4948
trailers_tx.write(trailers.await).await.unwrap();
50-
drop(request);
5149
});
5250

5351
Response::new(headers, Some(pipe_rx), trailers_rx)

crates/test-programs/src/bin/p3_http_middleware.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ impl Handler for Component {
4848
}
4949
_ => true,
5050
});
51-
let (mut body, trailers) = request.consume_body().unwrap();
51+
let (_, result_rx) = wit_future::new(|| Ok(()));
52+
let (mut body, trailers) = Request::consume_body(request, result_rx);
5253

5354
let (body, trailers) = if content_deflated {
5455
// Next, spawn a task to pipe and decode the original request body and trailers into a new request
@@ -77,8 +78,6 @@ impl Handler for Component {
7778
}
7879

7980
trailers_tx.write(trailers.await).await.unwrap();
80-
81-
drop(request);
8281
});
8382

8483
(pipe_rx, trailers_rx)
@@ -110,7 +109,8 @@ impl Handler for Component {
110109
headers.push(("content-encoding".into(), b"deflate".into()));
111110
}
112111

113-
let (mut body, trailers) = response.consume_body().unwrap();
112+
let (_, result_rx) = wit_future::new(|| Ok(()));
113+
let (mut body, trailers) = Response::consume_body(response, result_rx);
114114
let (body, trailers) = if accept_deflated {
115115
headers.retain(|(name, _value)| name != "content-length");
116116

@@ -141,7 +141,6 @@ impl Handler for Component {
141141
}
142142

143143
trailers_tx.write(trailers.await).await.unwrap();
144-
drop(response);
145144
});
146145

147146
(pipe_rx, trailers_rx)

crates/test-programs/src/p3/http.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,8 @@ pub async fn request(
9494
let response = handler::handle(request).await?;
9595
let status = response.get_status_code();
9696
let headers = response.get_headers().copy_all();
97-
let (body_rx, trailers_rx) = response
98-
.consume_body()
99-
.expect("failed to get response body");
97+
let (_, result_rx) = wit_future::new(|| Ok(()));
98+
let (body_rx, trailers_rx) = types::Response::consume_body(response, result_rx);
10099
let ((), rx) = join!(
101100
async {
102101
if let Some(buf) = body {

crates/wasi-http/src/p3/bindings.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@ mod generated {
99
"wasi:http/handler/[async]handle": async | store | trappable | tracing,
1010
"wasi:http/types/[drop]request": store | trappable | tracing,
1111
"wasi:http/types/[drop]response": store | trappable | tracing,
12-
"wasi:http/types/[method]request.consume-body": async | store | trappable | tracing,
13-
"wasi:http/types/[method]response.consume-body": async | store | trappable | tracing,
12+
"wasi:http/types/[static]request.consume-body": async | store | trappable | tracing,
1413
"wasi:http/types/[static]request.new": async | store | trappable | tracing,
14+
"wasi:http/types/[static]response.consume-body": async | store | trappable | tracing,
1515
"wasi:http/types/[static]response.new": async | store | trappable | tracing,
1616
default: trappable | tracing,
1717
},

crates/wasi-http/src/p3/body.rs

Lines changed: 43 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -37,55 +37,72 @@ pub(crate) enum Body {
3737
/// Channel, on which transmission result will be written
3838
result_tx: oneshot::Sender<Box<dyn Future<Output = Result<(), ErrorCode>> + Send>>,
3939
},
40-
/// Body is consumed.
41-
Consumed,
40+
}
41+
42+
/// [FutureConsumer] implementation for future passed to `consume-body`.
43+
struct BodyResultConsumer(
44+
Option<oneshot::Sender<Box<dyn Future<Output = Result<(), ErrorCode>> + Send>>>,
45+
);
46+
47+
impl<D> FutureConsumer<D> for BodyResultConsumer
48+
where
49+
D: 'static,
50+
{
51+
type Item = Result<(), ErrorCode>;
52+
53+
fn poll_consume(
54+
mut self: Pin<&mut Self>,
55+
_: &mut Context<'_>,
56+
store: StoreContextMut<D>,
57+
mut src: Source<'_, Self::Item>,
58+
_: bool,
59+
) -> Poll<wasmtime::Result<()>> {
60+
let mut res = None;
61+
src.read(store, &mut res).context("failed to read result")?;
62+
let res = res.context("result value missing")?;
63+
let tx = self.0.take().context("polled after returning `Ready`")?;
64+
_ = tx.send(Box::new(async { res }));
65+
Poll::Ready(Ok(()))
66+
}
4267
}
4368

4469
impl Body {
4570
/// Implementation of `consume-body` shared between requests and responses
4671
pub(crate) fn consume<T>(
4772
self,
4873
mut store: Access<'_, T, WasiHttp>,
74+
fut: FutureReader<Result<(), ErrorCode>>,
4975
getter: fn(&mut T) -> WasiHttpCtxView<'_>,
50-
) -> Result<
51-
(
52-
StreamReader<u8>,
53-
FutureReader<Result<Option<Resource<Trailers>>, ErrorCode>>,
54-
),
55-
(),
56-
> {
76+
) -> (
77+
StreamReader<u8>,
78+
FutureReader<Result<Option<Resource<Trailers>>, ErrorCode>>,
79+
) {
5780
match self {
5881
Body::Guest {
5982
contents_rx: Some(contents_rx),
6083
trailers_rx,
6184
result_tx,
6285
} => {
63-
// TODO: Use a result specified by the caller
64-
// https://github.com/WebAssembly/wasi-http/issues/176
65-
_ = result_tx.send(Box::new(async { Ok(()) }));
66-
Ok((contents_rx, trailers_rx))
86+
fut.pipe(&mut store, BodyResultConsumer(Some(result_tx)));
87+
(contents_rx, trailers_rx)
6788
}
6889
Body::Guest {
6990
contents_rx: None,
7091
trailers_rx,
7192
result_tx,
7293
} => {
94+
fut.pipe(&mut store, BodyResultConsumer(Some(result_tx)));
7395
let instance = store.instance();
74-
// TODO: Use a result specified by the caller
75-
// https://github.com/WebAssembly/wasi-http/issues/176
76-
_ = result_tx.send(Box::new(async { Ok(()) }));
77-
Ok((
96+
(
7897
StreamReader::new(instance, &mut store, iter::empty()),
7998
trailers_rx,
80-
))
99+
)
81100
}
82101
Body::Host { body, result_tx } => {
102+
fut.pipe(&mut store, BodyResultConsumer(Some(result_tx)));
83103
let instance = store.instance();
84-
// TODO: Use a result specified by the caller
85-
// https://github.com/WebAssembly/wasi-http/issues/176
86-
_ = result_tx.send(Box::new(async { Ok(()) }));
87104
let (trailers_tx, trailers_rx) = oneshot::channel();
88-
Ok((
105+
(
89106
StreamReader::new(
90107
instance,
91108
&mut store,
@@ -96,9 +113,8 @@ impl Body {
96113
},
97114
),
98115
FutureReader::new(instance, &mut store, trailers_rx),
99-
))
116+
)
100117
}
101-
Body::Consumed => Err(()),
102118
}
103119
}
104120

@@ -390,31 +406,6 @@ impl http_body::Body for GuestBody {
390406
}
391407
}
392408

393-
/// [http_body::Body] that has been consumed.
394-
pub(crate) struct ConsumedBody;
395-
396-
impl http_body::Body for ConsumedBody {
397-
type Data = Bytes;
398-
type Error = ErrorCode;
399-
400-
fn poll_frame(
401-
self: Pin<&mut Self>,
402-
_cx: &mut Context<'_>,
403-
) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
404-
Poll::Ready(Some(Err(ErrorCode::InternalError(Some(
405-
"body consumed".into(),
406-
)))))
407-
}
408-
409-
fn is_end_stream(&self) -> bool {
410-
true
411-
}
412-
413-
fn size_hint(&self) -> http_body::SizeHint {
414-
http_body::SizeHint::with_exact(0)
415-
}
416-
}
417-
418409
/// [FutureConsumer] implementation for trailers originating in the guest.
419410
struct GuestTrailerConsumer<T> {
420411
tx: Option<oneshot::Sender<Result<Option<Arc<HeaderMap>>, ErrorCode>>>,
@@ -434,10 +425,10 @@ where
434425
mut src: Source<'_, Self::Item>,
435426
_: bool,
436427
) -> Poll<wasmtime::Result<()>> {
437-
let mut result = None;
438-
src.read(store.as_context_mut(), &mut result)
428+
let mut res = None;
429+
src.read(&mut store, &mut res)
439430
.context("failed to read result")?;
440-
let res = match result.context("result value missing")? {
431+
let res = match res.context("result value missing")? {
441432
Ok(Some(trailers)) => {
442433
let WasiHttpCtxView { table, .. } = (self.getter)(store.data_mut());
443434
let trailers = table

crates/wasi-http/src/p3/host/handler.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::get_content_length;
22
use crate::p3::bindings::http::handler::{Host, HostWithStore};
33
use crate::p3::bindings::http::types::{ErrorCode, Request, Response};
4-
use crate::p3::body::{Body, ConsumedBody, GuestBody};
4+
use crate::p3::body::{Body, GuestBody};
55
use crate::p3::{HttpError, HttpResult, WasiHttp, WasiHttpCtxView};
66
use anyhow::Context as _;
77
use bytes::Bytes;
@@ -275,7 +275,6 @@ impl HostWithStore for WasiHttp {
275275
body.with_state(io_task_rx).boxed()
276276
}
277277
}
278-
Body::Consumed => ConsumedBody.boxed(),
279278
};
280279

281280
let WasiHttpCtxView { ctx, .. } = store.get();

crates/wasi-http/src/p3/host/types.rs

Lines changed: 22 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use crate::p3::bindings::http::types::{
77
use crate::p3::body::Body;
88
use crate::p3::{HeaderResult, HttpError, RequestOptionsResult, WasiHttp, WasiHttpCtxView};
99
use anyhow::Context as _;
10-
use core::mem;
1110
use core::pin::Pin;
1211
use core::task::{Context, Poll, ready};
1312
use http::header::CONTENT_LENGTH;
@@ -356,20 +355,19 @@ impl HostRequestWithStore for WasiHttp {
356355
async fn consume_body<T>(
357356
store: &Accessor<T, Self>,
358357
req: Resource<Request>,
359-
) -> wasmtime::Result<
360-
Result<
361-
(
362-
StreamReader<u8>,
363-
FutureReader<Result<Option<Resource<Trailers>>, ErrorCode>>,
364-
),
365-
(),
366-
>,
367-
> {
358+
fut: FutureReader<Result<(), ErrorCode>>,
359+
) -> wasmtime::Result<(
360+
StreamReader<u8>,
361+
FutureReader<Result<Option<Resource<Trailers>>, ErrorCode>>,
362+
)> {
368363
let getter = store.getter();
369364
store.with(|mut store| {
370-
let Request { body, .. } = get_request_mut(store.get().table, &req)?;
371-
let body = mem::replace(body, Body::Consumed);
372-
Ok(body.consume(store, getter))
365+
let Request { body, .. } = store
366+
.get()
367+
.table
368+
.delete(req)
369+
.context("failed to delete request from table")?;
370+
Ok(body.consume(store, fut, getter))
373371
})
374372
}
375373

@@ -633,20 +631,19 @@ impl HostResponseWithStore for WasiHttp {
633631
async fn consume_body<T>(
634632
store: &Accessor<T, Self>,
635633
res: Resource<Response>,
636-
) -> wasmtime::Result<
637-
Result<
638-
(
639-
StreamReader<u8>,
640-
FutureReader<Result<Option<Resource<Trailers>>, ErrorCode>>,
641-
),
642-
(),
643-
>,
644-
> {
634+
fut: FutureReader<Result<(), ErrorCode>>,
635+
) -> wasmtime::Result<(
636+
StreamReader<u8>,
637+
FutureReader<Result<Option<Resource<Trailers>>, ErrorCode>>,
638+
)> {
645639
let getter = store.getter();
646640
store.with(|mut store| {
647-
let Response { body, .. } = get_response_mut(store.get().table, &res)?;
648-
let body = mem::replace(body, Body::Consumed);
649-
Ok(body.consume(store, getter))
641+
let Response { body, .. } = store
642+
.get()
643+
.table
644+
.delete(res)
645+
.context("failed to delete response from table")?;
646+
Ok(body.consume(store, fut, getter))
650647
})
651648
}
652649

crates/wasi-http/src/p3/response.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::get_content_length;
22
use crate::p3::bindings::http::types::ErrorCode;
3-
use crate::p3::body::{Body, ConsumedBody, GuestBody};
3+
use crate::p3::body::{Body, GuestBody};
44
use crate::p3::{WasiHttpCtxView, WasiHttpView};
55
use anyhow::Context as _;
66
use bytes::Bytes;
@@ -86,7 +86,6 @@ impl Response {
8686
_ = result_tx.send(Box::new(fut));
8787
body
8888
}
89-
Body::Consumed => ConsumedBody.boxed(),
9089
};
9190
Ok(http::Response::from_parts(res, body))
9291
}

crates/wasi-http/src/p3/wit/deps/http/types.wit

Lines changed: 10 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -316,20 +316,12 @@ interface types {
316316
/// future to determine whether the body was received successfully.
317317
/// The future will only resolve after the stream is reported as closed.
318318
///
319-
/// The stream and future returned by this method are children:
320-
/// they should be closed or consumed before the parent `response`
321-
/// is dropped, or its ownership is transferred to another component
322-
/// by e.g. `handler.handle`.
319+
/// This function takes a `res` future as a parameter, which can be used to
320+
/// communicate an error in handling of the request.
323321
///
324-
/// This method may be called multiple times.
325-
///
326-
/// This method will return an error if it is called while either:
327-
/// - a stream or future returned by a previous call to this method is still open
328-
/// - a stream returned by a previous call to this method has reported itself as closed
329-
/// Thus there will always be at most one readable stream open for a given body.
330-
/// Each subsequent stream picks up where the previous one left off,
331-
/// continuing until the entire body has been consumed.
332-
consume-body: func() -> result<tuple<stream<u8>, future<result<option<trailers>, error-code>>>>;
322+
/// Note that function will move the `request`, but references to headers or
323+
/// request options acquired from it previously will remain valid.
324+
consume-body: static func(this: request, res: future<result<_, error-code>>) -> tuple<stream<u8>, future<result<option<trailers>, error-code>>>;
333325
}
334326

335327
/// Parameters for making an HTTP Request. Each of these parameters is
@@ -417,19 +409,11 @@ interface types {
417409
/// future to determine whether the body was received successfully.
418410
/// The future will only resolve after the stream is reported as closed.
419411
///
420-
/// The stream and future returned by this method are children:
421-
/// they should be closed or consumed before the parent `response`
422-
/// is dropped, or its ownership is transferred to another component
423-
/// by e.g. `handler.handle`.
424-
///
425-
/// This method may be called multiple times.
412+
/// This function takes a `res` future as a parameter, which can be used to
413+
/// communicate an error in handling of the response.
426414
///
427-
/// This method will return an error if it is called while either:
428-
/// - a stream or future returned by a previous call to this method is still open
429-
/// - a stream returned by a previous call to this method has reported itself as closed
430-
/// Thus there will always be at most one readable stream open for a given body.
431-
/// Each subsequent stream picks up where the previous one left off,
432-
/// continuing until the entire body has been consumed.
433-
consume-body: func() -> result<tuple<stream<u8>, future<result<option<trailers>, error-code>>>>;
415+
/// Note that function will move the `response`, but references to headers
416+
/// acquired from it previously will remain valid.
417+
consume-body: static func(this: response, res: future<result<_, error-code>>) -> tuple<stream<u8>, future<result<option<trailers>, error-code>>>;
434418
}
435419
}

0 commit comments

Comments
 (0)