Skip to content
This repository was archived by the owner on Sep 8, 2025. It is now read-only.

Commit 275499c

Browse files
authored
Merge pull request #230 from alexcrichton/more-http-refactoring
Chip away at streams-without-accessors in wasi-http
2 parents 73d715e + e6ca346 commit 275499c

File tree

7 files changed

+220
-154
lines changed

7 files changed

+220
-154
lines changed

crates/test-programs/src/bin/p3_http_outbound_request_response_build.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,11 @@ impl test_programs::p3::exports::wasi::cli::run::Guest for Component {
2525
request
2626
.set_authority(Some("www.example.com"))
2727
.expect("setting authority");
28-
let remaining = contents_tx.write_all(b"request-body".to_vec()).await;
29-
assert!(remaining.is_empty());
28+
let (remaining, ()) =
29+
futures::join!(contents_tx.write_all(b"request-body".to_vec()), async {
30+
drop(request);
31+
},);
32+
assert!(!remaining.is_empty());
3033
}
3134
{
3235
let headers = Headers::from_list(&[(
@@ -38,7 +41,7 @@ impl test_programs::p3::exports::wasi::cli::run::Guest for Component {
3841
let (_, trailers_rx) = wit_future::new(|| Ok(None));
3942
let _ = Response::new(headers, Some(contents_rx), trailers_rx);
4043
let remaining = contents_tx.write_all(b"response-body".to_vec()).await;
41-
assert!(remaining.is_empty());
44+
assert!(!remaining.is_empty());
4245
}
4346

4447
{

crates/wasi-http/src/p3/body.rs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,6 @@ use wasmtime_wasi::p3::WithChildren;
1515

1616
use crate::p3::bindings::http::types::ErrorCode;
1717

18-
pub(crate) type OutgoingContentsStreamFuture =
19-
Pin<Box<dyn Future<Output = (Option<StreamReader<BytesMut>>, BytesMut)> + Send + 'static>>;
20-
2118
pub(crate) type OutgoingTrailerFuture = Pin<
2219
Box<
2320
dyn Future<Output = Option<Result<Option<Resource<WithChildren<HeaderMap>>>, ErrorCode>>>
@@ -62,7 +59,7 @@ pub enum Body {
6259
/// Body constructed by the guest
6360
Guest {
6461
/// The body stream
65-
contents: Option<OutgoingContentsStreamFuture>,
62+
contents: BodyGuestContents,
6663
/// Future, on which guest will write result and optional trailers
6764
trailers: Option<OutgoingTrailerFuture>,
6865
/// Buffered frame, if any
@@ -83,6 +80,22 @@ pub enum Body {
8380
Consumed,
8481
}
8582

83+
/// Variants of `Body::Guest::contents`.
84+
pub enum BodyGuestContents {
85+
/// The guest body is this provided stream.
86+
Some(StreamReader<BytesMut>),
87+
88+
/// The guest body was previously taken into a body task, and that body task
89+
/// has finished.
90+
///
91+
/// In this situation the guest body can no longer be read due to a bug in
92+
/// Wasmtime where cancellation of an in-progress read is not yet supported.
93+
Taken,
94+
95+
/// The guest body is not provided.
96+
None,
97+
}
98+
8699
impl Body {
87100
/// Construct a new [Body]
88101
pub fn new<T>(body: T) -> Self

crates/wasi-http/src/p3/host/handler.rs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@ use super::{delete_request, get_fields_inner, push_response};
22
use crate::p3::bindings::http::handler;
33
use crate::p3::bindings::http::types::ErrorCode;
44
use crate::p3::{
5-
Body, BodyChannel, BodyFrame, BodyWithContentLength, Client as _, ContentLength,
6-
DEFAULT_BUFFER_CAPACITY, OutgoingTrailerFuture, Request, Response, WasiHttp, WasiHttpImpl,
7-
WasiHttpView, empty_body,
5+
Body, BodyChannel, BodyFrame, BodyGuestContents, BodyWithContentLength, Client as _,
6+
ContentLength, DEFAULT_BUFFER_CAPACITY, OutgoingTrailerFuture, Request, Response, WasiHttp,
7+
WasiHttpImpl, WasiHttpView, empty_body,
88
};
99
use anyhow::bail;
10-
use bytes::Bytes;
10+
use bytes::{Bytes, BytesMut};
1111
use core::iter;
1212
use futures::StreamExt as _;
1313
use http::header::HOST;
@@ -125,7 +125,7 @@ where
125125
let (request, ()) = request.into_parts();
126126
let response = match body {
127127
Body::Guest {
128-
contents: None,
128+
contents: BodyGuestContents::None,
129129
buffer: Some(BodyFrame::Trailers(Ok(None))) | None,
130130
tx,
131131
content_length: Some(ContentLength { limit, sent }),
@@ -139,7 +139,7 @@ where
139139
return Ok(Err(ErrorCode::HttpRequestBodySize(Some(sent))));
140140
}
141141
Body::Guest {
142-
contents: None,
142+
contents: BodyGuestContents::None,
143143
trailers: None,
144144
buffer: Some(BodyFrame::Trailers(Ok(None))),
145145
tx,
@@ -163,7 +163,7 @@ where
163163
}
164164
}
165165
Body::Guest {
166-
contents: None,
166+
contents: BodyGuestContents::None,
167167
trailers: None,
168168
buffer: Some(BodyFrame::Trailers(Ok(Some(trailers)))),
169169
tx,
@@ -191,7 +191,7 @@ where
191191
}
192192
}
193193
Body::Guest {
194-
contents: None,
194+
contents: BodyGuestContents::None,
195195
trailers: None,
196196
buffer: Some(BodyFrame::Trailers(Err(err))),
197197
tx,
@@ -207,7 +207,7 @@ where
207207
return Ok(Err(err));
208208
}
209209
Body::Guest {
210-
contents: None,
210+
contents: BodyGuestContents::None,
211211
trailers: Some(trailers),
212212
buffer: None,
213213
tx,
@@ -237,7 +237,7 @@ where
237237
}
238238
}
239239
Body::Guest {
240-
contents: Some(contents),
240+
contents: BodyGuestContents::Some(contents),
241241
trailers: Some(trailers),
242242
buffer,
243243
tx,
@@ -267,7 +267,9 @@ where
267267
io,
268268
async {
269269
body_tx.send(Ok(buffer)).await?;
270-
let (mut tail, mut rx_buffer) = contents.await;
270+
let (mut tail, mut rx_buffer) = contents
271+
.read(BytesMut::with_capacity(DEFAULT_BUFFER_CAPACITY))
272+
.await;
271273
loop {
272274
let buffer = rx_buffer.split();
273275
body_tx.send(Ok(buffer.freeze())).await?;

crates/wasi-http/src/p3/host/types.rs

Lines changed: 34 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,16 @@ use crate::p3::host::{
99
push_request, push_response,
1010
};
1111
use crate::p3::{
12-
Body, BodyContext, BodyFrame, ContentLength, DEFAULT_BUFFER_CAPACITY, Request, RequestOptions,
13-
Response, WasiHttp, WasiHttpImpl, WasiHttpView,
12+
Body, BodyContext, BodyFrame, BodyGuestContents, ContentLength, DEFAULT_BUFFER_CAPACITY,
13+
Request, RequestOptions, Response, WasiHttp, WasiHttpImpl, WasiHttpView,
1414
};
1515
use anyhow::{Context as _, bail};
1616
use bytes::{Bytes, BytesMut};
1717
use core::future::Future;
1818
use core::future::poll_fn;
1919
use core::mem;
2020
use core::ops::{Deref, DerefMut};
21-
use core::pin::Pin;
21+
use core::pin::{Pin, pin};
2222
use core::str;
2323
use core::task::Poll;
2424
use futures::join;
@@ -129,7 +129,7 @@ where
129129
};
130130
match body {
131131
Body::Guest {
132-
contents: None,
132+
contents: BodyGuestContents::None,
133133
buffer: Some(BodyFrame::Trailers(Ok(None))) | None,
134134
tx,
135135
content_length: Some(ContentLength { limit, sent }),
@@ -149,7 +149,7 @@ where
149149
return Ok(());
150150
}
151151
Body::Guest {
152-
contents: None,
152+
contents: BodyGuestContents::None,
153153
trailers: Some(mut trailers_rx),
154154
buffer: None,
155155
tx,
@@ -168,7 +168,7 @@ where
168168
bail!("lock poisoned");
169169
};
170170
*body = Body::Guest {
171-
contents: None,
171+
contents: BodyGuestContents::None,
172172
trailers: Some(trailers_rx),
173173
buffer: None,
174174
tx,
@@ -182,7 +182,7 @@ where
182182
bail!("lock poisoned");
183183
};
184184
*body = Body::Guest {
185-
contents: None,
185+
contents: BodyGuestContents::None,
186186
trailers: None,
187187
buffer: Some(BodyFrame::Trailers(res)),
188188
tx,
@@ -194,7 +194,7 @@ where
194194
Ok(())
195195
}
196196
Body::Guest {
197-
contents: None,
197+
contents: BodyGuestContents::None,
198198
trailers: None,
199199
buffer: Some(BodyFrame::Trailers(res)),
200200
tx,
@@ -206,7 +206,7 @@ where
206206
bail!("lock poisoned");
207207
};
208208
*body = Body::Guest {
209-
contents: None,
209+
contents: BodyGuestContents::None,
210210
trailers: None,
211211
buffer: Some(BodyFrame::Trailers(res)),
212212
tx,
@@ -218,7 +218,7 @@ where
218218
Ok(())
219219
}
220220
Body::Guest {
221-
contents: Some(mut contents_rx),
221+
contents: BodyGuestContents::Some(mut contents_rx),
222222
trailers: Some(mut trailers_rx),
223223
buffer,
224224
tx,
@@ -235,7 +235,7 @@ where
235235
let pos = buffer.position().try_into()?;
236236
let buffer = buffer.into_inner().split_off(pos);
237237
*body = Body::Guest {
238-
contents: Some(contents_rx),
238+
contents: BodyGuestContents::Some(contents_rx),
239239
trailers: Some(trailers_rx),
240240
buffer: Some(BodyFrame::Data(buffer)),
241241
tx,
@@ -250,11 +250,13 @@ where
250250
}
251251
let (contents_tx_drop, mut contents_tx) = contents_tx.watch_reader();
252252
let mut contents_tx_drop = Box::pin(contents_tx_drop);
253+
let mut rx_buffer = BytesMut::with_capacity(DEFAULT_BUFFER_CAPACITY);
253254
loop {
254-
let Some((rx_tail, mut rx_buffer)) =
255+
let mut next_chunk = pin!(contents_rx.read(rx_buffer));
256+
let Some((rx_tail, b)) =
255257
poll_fn(|cx| match contents_tx_drop.as_mut().poll(cx) {
256258
Poll::Ready(()) => return Poll::Ready(None),
257-
Poll::Pending => contents_rx.as_mut().poll(cx).map(Some),
259+
Poll::Pending => next_chunk.as_mut().poll(cx).map(Some),
258260
})
259261
.await
260262
else {
@@ -263,14 +265,18 @@ where
263265
bail!("lock poisoned");
264266
};
265267
*body = Body::Guest {
266-
contents: Some(contents_rx),
268+
// FIXME: cancellation support should be added to
269+
// reads in Wasmtime to fully support this to avoid
270+
// needing `Taken` at all.
271+
contents: BodyGuestContents::Taken,
267272
trailers: Some(trailers_rx),
268273
buffer: None,
269274
tx,
270275
content_length,
271276
};
272277
return Ok(());
273278
};
279+
rx_buffer = b;
274280
let tx_tail = contents_tx.into_inner();
275281
let Some(rx_tail) = rx_tail else {
276282
debug_assert!(rx_buffer.is_empty());
@@ -322,7 +328,7 @@ where
322328
}
323329
let buffer = rx_buffer.split().freeze();
324330
rx_buffer.reserve(DEFAULT_BUFFER_CAPACITY);
325-
contents_rx = Box::pin(rx_tail.read(rx_buffer));
331+
contents_rx = rx_tail;
326332
let (tx_tail, buffer) = tx_tail.write_all(Cursor::new(buffer)).await;
327333
let Some(tx_tail) = tx_tail else {
328334
let Ok(mut body) = self.body.lock() else {
@@ -331,7 +337,7 @@ where
331337
let pos = buffer.position().try_into()?;
332338
let buffer = buffer.into_inner().split_off(pos);
333339
*body = Body::Guest {
334-
contents: Some(contents_rx),
340+
contents: BodyGuestContents::Some(contents_rx),
335341
trailers: Some(trailers_rx),
336342
buffer: Some(BodyFrame::Data(buffer)),
337343
tx,
@@ -356,7 +362,7 @@ where
356362
bail!("lock poisoned");
357363
};
358364
*body = Body::Guest {
359-
contents: None,
365+
contents: BodyGuestContents::None,
360366
trailers: Some(trailers_rx),
361367
buffer: None,
362368
tx,
@@ -370,7 +376,7 @@ where
370376
bail!("lock poisoned");
371377
};
372378
*body = Body::Guest {
373-
contents: None,
379+
contents: BodyGuestContents::None,
374380
trailers: None,
375381
buffer: Some(BodyFrame::Trailers(res)),
376382
tx,
@@ -751,11 +757,10 @@ where
751757
let (res_tx, res_rx) = instance
752758
.future(|| Ok(()), &mut view)
753759
.context("failed to create future")?;
754-
let contents = contents.map(|contents| {
755-
contents
756-
.into_reader(&mut view)
757-
.read(BytesMut::with_capacity(DEFAULT_BUFFER_CAPACITY))
758-
});
760+
let contents = match contents {
761+
Some(contents) => BodyGuestContents::Some(contents.into_reader(&mut view)),
762+
None => BodyGuestContents::None,
763+
};
759764
let trailers = trailers.into_reader(&mut view).read();
760765
let mut binding = view.get();
761766
let table = binding.table();
@@ -769,7 +774,7 @@ where
769774
})
770775
.transpose()?;
771776
let body = Body::Guest {
772-
contents: contents.map(|v| Box::pin(v) as _),
777+
contents,
773778
trailers: Some(Box::pin(trailers)),
774779
buffer: None,
775780
tx: res_tx,
@@ -1077,19 +1082,18 @@ where
10771082
let (res_tx, res_rx) = instance
10781083
.future(|| Ok(()), &mut view)
10791084
.context("failed to create future")?;
1080-
let contents = contents.map(|contents| {
1081-
contents
1082-
.into_reader(&mut view)
1083-
.read(BytesMut::with_capacity(DEFAULT_BUFFER_CAPACITY))
1084-
});
1085+
let contents = match contents {
1086+
Some(contents) => BodyGuestContents::Some(contents.into_reader(&mut view)),
1087+
None => BodyGuestContents::None,
1088+
};
10851089
let trailers = trailers.into_reader(&mut view).read();
10861090
let mut binding = view.get();
10871091
let table = binding.table();
10881092
let headers = delete_fields(table, headers)?;
10891093
let headers = headers.unwrap_or_clone()?;
10901094
let content_length = get_content_length(&headers)?;
10911095
let body = Body::Guest {
1092-
contents: contents.map(|v| Box::pin(v) as _),
1096+
contents,
10931097
trailers: Some(Box::pin(trailers)),
10941098
buffer: None,
10951099
tx: res_tx,

0 commit comments

Comments
 (0)