Skip to content
This repository was archived by the owner on Sep 8, 2025. It is now read-only.

Commit 0c2d68c

Browse files
authored
Merge pull request #232 from alexcrichton/chip-away-more
Replace trailers futures with actual trailers
2 parents f804310 + 47e3d0f commit 0c2d68c

File tree

4 files changed

+139
-180
lines changed

4 files changed

+139
-180
lines changed

crates/wasi-http/src/p3/body.rs

Lines changed: 47 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,33 @@
1-
use core::future::Future;
1+
use crate::p3::ResourceView;
2+
use crate::p3::bindings::http::types::ErrorCode;
3+
use anyhow::Context as _;
4+
use bytes::{Buf, Bytes, BytesMut};
25
use core::mem;
36
use core::pin::Pin;
47
use core::task::{Context, Poll, ready};
5-
6-
use bytes::{Buf, Bytes, BytesMut};
78
use http::HeaderMap;
89
use http_body::Frame;
910
use http_body_util::BodyExt as _;
1011
use http_body_util::combinators::BoxBody;
1112
use pin_project_lite::pin_project;
1213
use tokio::sync::mpsc;
13-
use wasmtime::component::{FutureWriter, Resource, StreamReader};
14+
use tokio::sync::oneshot;
15+
use wasmtime::component::{Accessor, FutureReader, FutureWriter, HasData, Resource, StreamReader};
1416
use wasmtime_wasi::p3::WithChildren;
1517

16-
use crate::p3::bindings::http::types::ErrorCode;
17-
18-
pub(crate) type OutgoingTrailerFuture = Pin<
19-
Box<
20-
dyn Future<Output = Option<Result<Option<Resource<WithChildren<HeaderMap>>>, ErrorCode>>>
21-
+ Send
22-
+ 'static,
23-
>,
24-
>;
25-
2618
pub(crate) fn empty_body() -> impl http_body::Body<Data = Bytes, Error = Option<ErrorCode>> {
2719
http_body_util::Empty::new().map_err(|_| None)
2820
}
2921

22+
/// Type for trailers that are received directly from the guest.
23+
pub type GuestTrailers = Result<Option<Resource<WithChildren<HeaderMap>>>, ErrorCode>;
24+
3025
/// A body frame
3126
pub enum BodyFrame {
3227
/// Data frame
3328
Data(Bytes),
3429
/// Trailer frame, this is the last frame of the body and it includes the transmit/receipt result
35-
Trailers(Result<Option<Resource<WithChildren<HeaderMap>>>, ErrorCode>),
30+
Trailers(GuestTrailers),
3631
}
3732

3833
/// Whether the body is a request or response body.
@@ -59,9 +54,9 @@ pub enum Body {
5954
/// Body constructed by the guest
6055
Guest {
6156
/// The body stream
62-
contents: BodyGuestContents,
57+
contents: MaybeTombstone<StreamReader<BytesMut>>,
6358
/// Future, on which guest will write result and optional trailers
64-
trailers: Option<OutgoingTrailerFuture>,
59+
trailers: MaybeTombstone<FutureReader<GuestTrailers>>,
6560
/// Buffered frame, if any
6661
buffer: Option<BodyFrame>,
6762
/// Future, on which transmission result will be written
@@ -81,16 +76,16 @@ pub enum Body {
8176
}
8277

8378
/// Variants of `Body::Guest::contents`.
84-
pub enum BodyGuestContents {
85-
/// The guest body is this provided stream.
86-
Some(StreamReader<BytesMut>),
79+
pub enum MaybeTombstone<T> {
80+
/// The provided value is available.
81+
Some(T),
8782

88-
/// The guest body was previously taken into a body task, and that body task
89-
/// has finished.
83+
/// The guest body item was previously taken into a body task, and that body
84+
/// task has finished.
9085
///
9186
/// In this situation the guest body can no longer be read due to a bug in
9287
/// Wasmtime where cancellation of an in-progress read is not yet supported.
93-
Taken,
88+
Tombstone,
9489

9590
/// The guest body is not provided.
9691
None,
@@ -411,3 +406,32 @@ impl http_body::Body for IncomingResponseBody {
411406
self.incoming.size_hint()
412407
}
413408
}
409+
410+
pub(crate) async fn handle_guest_trailers<T, D>(
411+
accessor: &Accessor<T, D>,
412+
guest_trailers: FutureReader<GuestTrailers>,
413+
host_trailers: oneshot::Sender<Result<http::HeaderMap, Option<ErrorCode>>>,
414+
) -> wasmtime::Result<()>
415+
where
416+
D: HasData,
417+
for<'a> D::Data<'a>: ResourceView,
418+
{
419+
match guest_trailers.read().await {
420+
Some(Ok(Some(trailers))) => {
421+
let trailers = accessor.with(|mut store| {
422+
let mut binding = store.get();
423+
let table = binding.table();
424+
table.delete(trailers).context("failed to delete trailers")
425+
})?;
426+
let trailers = trailers.unwrap_or_clone()?;
427+
_ = host_trailers.send(Ok(trailers));
428+
}
429+
Some(Err(err)) => {
430+
_ = host_trailers.send(Err(Some(err)));
431+
}
432+
// If no trailers were explicitly sent, or if nothing was sent at all,
433+
// then interpret that as no trailers.
434+
Some(Ok(None)) | None => {}
435+
}
436+
Ok(())
437+
}

crates/wasi-http/src/p3/host/handler.rs

Lines changed: 32 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -2,52 +2,24 @@ use super::{delete_request, get_fields_inner, push_response};
22
use crate::p3::bindings::http::handler;
33
use crate::p3::bindings::http::types::ErrorCode;
44
use crate::p3::{
5-
Body, BodyChannel, BodyFrame, BodyGuestContents, BodyWithContentLength, Client as _,
6-
ContentLength, DEFAULT_BUFFER_CAPACITY, OutgoingTrailerFuture, Request, Response, WasiHttp,
7-
WasiHttpImpl, WasiHttpView, empty_body,
5+
Body, BodyChannel, BodyFrame, BodyWithContentLength, Client as _, ContentLength,
6+
DEFAULT_BUFFER_CAPACITY, MaybeTombstone, Request, Response, WasiHttp, WasiHttpImpl,
7+
WasiHttpView, empty_body, handle_guest_trailers,
88
};
99
use anyhow::bail;
1010
use bytes::{Bytes, BytesMut};
1111
use core::iter;
1212
use futures::StreamExt as _;
1313
use http::header::HOST;
14-
use http::{HeaderMap, HeaderValue, Uri};
14+
use http::{HeaderValue, Uri};
1515
use http_body_util::{BodyExt as _, BodyStream, StreamBody};
1616
use std::sync::Arc;
1717
use tokio::sync::mpsc;
1818
use tokio::sync::oneshot;
1919
use tracing::debug;
20-
use wasmtime::component::{Accessor, AccessorTask, Resource};
20+
use wasmtime::component::{Accessor, Resource};
2121
use wasmtime_wasi::p3::{AbortOnDropHandle, ResourceView as _, SpawnExt};
2222

23-
struct TrailerTask {
24-
rx: OutgoingTrailerFuture,
25-
tx: oneshot::Sender<Result<Option<http::HeaderMap>, ErrorCode>>,
26-
}
27-
28-
impl<T, U: WasiHttpView> AccessorTask<T, WasiHttp<U>, wasmtime::Result<()>> for TrailerTask
29-
where
30-
U: 'static,
31-
{
32-
async fn run(self, store: &Accessor<T, WasiHttp<U>>) -> wasmtime::Result<()> {
33-
match self.rx.await {
34-
Some(Ok(trailers)) => store.with(|mut view| {
35-
let mut binding = view.get();
36-
let trailers = trailers
37-
.map(|trailers| get_fields_inner(binding.table(), &trailers))
38-
.transpose()?;
39-
_ = self.tx.send(Ok(trailers.as_deref().cloned()));
40-
Ok(())
41-
}),
42-
Some(Err(err)) => {
43-
_ = self.tx.send(Err(err));
44-
Ok(())
45-
}
46-
None => Ok(()),
47-
}
48-
}
49-
}
50-
5123
impl<T> handler::HostConcurrent for WasiHttp<T>
5224
where
5325
T: WasiHttpView + 'static,
@@ -125,7 +97,7 @@ where
12597
let (request, ()) = request.into_parts();
12698
let response = match body {
12799
Body::Guest {
128-
contents: BodyGuestContents::None,
100+
contents: MaybeTombstone::None,
129101
buffer: Some(BodyFrame::Trailers(Ok(None))) | None,
130102
tx,
131103
content_length: Some(ContentLength { limit, sent }),
@@ -139,8 +111,8 @@ where
139111
return Ok(Err(ErrorCode::HttpRequestBodySize(Some(sent))));
140112
}
141113
Body::Guest {
142-
contents: BodyGuestContents::None,
143-
trailers: None,
114+
contents: MaybeTombstone::None,
115+
trailers: MaybeTombstone::None,
144116
buffer: Some(BodyFrame::Trailers(Ok(None))),
145117
tx,
146118
content_length: None,
@@ -163,8 +135,8 @@ where
163135
}
164136
}
165137
Body::Guest {
166-
contents: BodyGuestContents::None,
167-
trailers: None,
138+
contents: MaybeTombstone::None,
139+
trailers: MaybeTombstone::None,
168140
buffer: Some(BodyFrame::Trailers(Ok(Some(trailers)))),
169141
tx,
170142
content_length: None,
@@ -191,8 +163,8 @@ where
191163
}
192164
}
193165
Body::Guest {
194-
contents: BodyGuestContents::None,
195-
trailers: None,
166+
contents: MaybeTombstone::None,
167+
trailers: MaybeTombstone::None,
196168
buffer: Some(BodyFrame::Trailers(Err(err))),
197169
tx,
198170
content_length: None,
@@ -207,19 +179,21 @@ where
207179
return Ok(Err(err));
208180
}
209181
Body::Guest {
210-
contents: BodyGuestContents::None,
211-
trailers: Some(trailers),
182+
contents: MaybeTombstone::None,
183+
trailers: MaybeTombstone::Some(trailers),
212184
buffer: None,
213185
tx,
214186
content_length: None,
215187
} => {
216188
let (trailers_tx, trailers_rx) = oneshot::channel();
217-
let task = store.spawn(TrailerTask {
218-
rx: trailers,
219-
tx: trailers_tx,
189+
let task = AbortOnDropHandle(store.spawn_fn_box(|store| {
190+
Box::pin(handle_guest_trailers(store, trailers, trailers_tx))
191+
}));
192+
let body = empty_body().with_trailers(async {
193+
let result = trailers_rx.await.ok();
194+
drop(task);
195+
result
220196
});
221-
let body = empty_body()
222-
.with_trailers(wait_for_trailers(trailers_rx, AbortOnDropHandle(task)));
223197
let request = http::Request::from_parts(request, body);
224198
match client.send_request(request, options).await? {
225199
Ok((response, io)) => {
@@ -237,26 +211,29 @@ where
237211
}
238212
}
239213
Body::Guest {
240-
contents: BodyGuestContents::Some(contents),
241-
trailers: Some(trailers),
214+
contents: MaybeTombstone::Some(contents),
215+
trailers: MaybeTombstone::Some(trailers),
242216
buffer,
243217
tx,
244218
content_length,
245219
} => {
246220
let (trailers_tx, trailers_rx) = oneshot::channel();
247221
let (body_tx, body_rx) = mpsc::channel(1);
248-
let task = store.spawn(TrailerTask {
249-
rx: trailers,
250-
tx: trailers_tx,
251-
});
222+
let task = AbortOnDropHandle(store.spawn_fn_box(|store| {
223+
Box::pin(handle_guest_trailers(store, trailers, trailers_tx))
224+
}));
252225
let buffer = match buffer {
253226
Some(BodyFrame::Data(buffer)) => buffer,
254227
Some(BodyFrame::Trailers(..)) => bail!("guest body is corrupted"),
255228
None => Bytes::default(),
256229
};
257230
let body = BodyChannel::new(body_rx);
258-
let body = BodyWithContentLength::new(body, content_length)
259-
.with_trailers(wait_for_trailers(trailers_rx, AbortOnDropHandle(task)));
231+
let body =
232+
BodyWithContentLength::new(body, content_length).with_trailers(async move {
233+
let result = trailers_rx.await.ok();
234+
drop(task);
235+
result
236+
});
260237
let request = http::Request::from_parts(request, body);
261238
let (response, io) = match client.send_request(request, options).await? {
262239
Ok(pair) => pair,
@@ -425,18 +402,3 @@ where
425402
}
426403

427404
impl<T> handler::Host for WasiHttpImpl<T> where T: WasiHttpView {}
428-
429-
async fn wait_for_trailers(
430-
trailers: oneshot::Receiver<Result<Option<HeaderMap>, ErrorCode>>,
431-
trailer_task: AbortOnDropHandle,
432-
) -> Option<Result<HeaderMap, Option<ErrorCode>>> {
433-
let result = match trailers.await {
434-
Ok(Ok(Some(trailers))) => Some(Ok(trailers)),
435-
Ok(Ok(None)) => None,
436-
Ok(Err(err)) => Some(Err(Some(err))),
437-
Err(..) => Some(Err(None)), // future was dropped without writing a result
438-
};
439-
440-
drop(trailer_task);
441-
result
442-
}

0 commit comments

Comments
 (0)