|
| 1 | +use crate::p3::WasiHttpView; |
| 2 | +use crate::p3::bindings::http::types::{ErrorCode, Trailers}; |
| 3 | +use anyhow::Context as _; |
| 4 | +use bytes::{Bytes, BytesMut}; |
| 5 | +use core::future::poll_fn; |
| 6 | +use core::pin::{Pin, pin}; |
| 7 | +use core::task::{Context, Poll, ready}; |
| 8 | +use http_body_util::combinators::BoxBody; |
| 9 | +use std::sync::Arc; |
| 10 | +use tokio::sync::{mpsc, oneshot}; |
| 11 | +use wasmtime::component::{ |
| 12 | + Accessor, AccessorTask, FutureReader, FutureWriter, GuardedFutureReader, GuardedFutureWriter, |
| 13 | + GuardedStreamReader, HasData, Resource, StreamReader, |
| 14 | +}; |
| 15 | + |
| 16 | +/// The concrete type behind a `wasi:http/types/body` resource. |
| 17 | +pub(crate) enum Body { |
| 18 | + /// Body constructed by the guest |
| 19 | + Guest(GuestBodyContext), |
| 20 | + /// Body constructed by the host. |
| 21 | + Host(BoxBody<Bytes, ErrorCode>), |
| 22 | + /// Body is consumed. |
| 23 | + Consumed, |
| 24 | +} |
| 25 | + |
| 26 | +/// Context of a body constructed by the guest |
| 27 | +pub struct GuestBodyContext { |
| 28 | + /// The body stream |
| 29 | + pub(crate) contents_rx: Option<StreamReader<u8>>, |
| 30 | + /// Future, on which guest will write result and optional trailers |
| 31 | + pub(crate) trailers_rx: FutureReader<Result<Option<Resource<Trailers>>, ErrorCode>>, |
| 32 | + /// Future, on which transmission result will be written |
| 33 | + pub(crate) result_tx: FutureWriter<Result<(), ErrorCode>>, |
| 34 | +} |
| 35 | + |
| 36 | +pub struct GuestBodyTaskContext { |
| 37 | + pub(crate) cx: GuestBodyContext, |
| 38 | + pub(crate) contents_tx: mpsc::Sender<Bytes>, |
| 39 | + pub(crate) trailers_tx: oneshot::Sender<Result<Option<Arc<http::HeaderMap>>, ErrorCode>>, |
| 40 | +} |
| 41 | + |
| 42 | +impl GuestBodyTaskContext { |
| 43 | + /// Consume the body given an I/O operation `io`. |
| 44 | + /// |
| 45 | + /// This function returns a [GuestBodyTask], which implements a [AccessorTask] and |
| 46 | + /// must be run using the engine's event loop. |
| 47 | + pub fn consume<Fut>(self, io: Fut) -> GuestBodyTask<Fut> |
| 48 | + where |
| 49 | + Fut: Future<Output = Result<(), ErrorCode>>, |
| 50 | + { |
| 51 | + GuestBodyTask { cx: self, io } |
| 52 | + } |
| 53 | +} |
| 54 | + |
| 55 | +pub struct GuestBodyTask<T> { |
| 56 | + cx: GuestBodyTaskContext, |
| 57 | + io: T, |
| 58 | +} |
| 59 | + |
| 60 | +impl<T, U, Fut> AccessorTask<T, U, wasmtime::Result<()>> for GuestBodyTask<Fut> |
| 61 | +where |
| 62 | + T: WasiHttpView, |
| 63 | + U: HasData, |
| 64 | + Fut: Future<Output = Result<(), ErrorCode>> + Send + 'static, |
| 65 | +{ |
| 66 | + async fn run(self, store: &Accessor<T, U>) -> wasmtime::Result<()> { |
| 67 | + let Self { |
| 68 | + cx: |
| 69 | + GuestBodyTaskContext { |
| 70 | + cx: |
| 71 | + GuestBodyContext { |
| 72 | + contents_rx, |
| 73 | + trailers_rx, |
| 74 | + result_tx, |
| 75 | + }, |
| 76 | + contents_tx, |
| 77 | + mut trailers_tx, |
| 78 | + }, |
| 79 | + io, |
| 80 | + } = self; |
| 81 | + let trailers_rx = GuardedFutureReader::new(store, trailers_rx); |
| 82 | + let mut result_tx = GuardedFutureWriter::new(store, result_tx); |
| 83 | + if let Some(contents_rx) = contents_rx { |
| 84 | + let mut contents_rx = GuardedStreamReader::new(store, contents_rx); |
| 85 | + // TODO: use content-length |
| 86 | + let mut buf = BytesMut::with_capacity(8192); |
| 87 | + while !contents_rx.is_closed() { |
| 88 | + let mut tx = pin!(contents_tx.reserve()); |
| 89 | + let Some(Ok(tx)) = ({ |
| 90 | + let mut contents_tx_dropped = pin!(contents_rx.watch_writer()); |
| 91 | + poll_fn(|cx| match contents_tx_dropped.as_mut().poll(cx) { |
| 92 | + Poll::Ready(()) => return Poll::Ready(None), |
| 93 | + Poll::Pending => tx.as_mut().poll(cx).map(Some), |
| 94 | + }) |
| 95 | + .await |
| 96 | + }) else { |
| 97 | + // Either: |
| 98 | + // - body receiver has been closed |
| 99 | + // - guest writer has been closed |
| 100 | + break; |
| 101 | + }; |
| 102 | + buf = contents_rx.read(buf).await; |
| 103 | + if !buf.is_empty() { |
| 104 | + tx.send(buf.split().freeze()); |
| 105 | + } |
| 106 | + } |
| 107 | + } |
| 108 | + drop(contents_tx); |
| 109 | + |
| 110 | + let mut rx = pin!(trailers_rx.read()); |
| 111 | + match poll_fn(|cx| match trailers_tx.poll_closed(cx) { |
| 112 | + Poll::Ready(()) => return Poll::Ready(None), |
| 113 | + Poll::Pending => rx.as_mut().poll(cx).map(Some), |
| 114 | + }) |
| 115 | + .await |
| 116 | + { |
| 117 | + Some(Some(Ok(Some(trailers)))) => { |
| 118 | + let trailers = store.with(|mut store| { |
| 119 | + store |
| 120 | + .data_mut() |
| 121 | + .http() |
| 122 | + .table |
| 123 | + .delete(trailers) |
| 124 | + .context("failed to delete trailers") |
| 125 | + })?; |
| 126 | + _ = trailers_tx.send(Ok(Some(trailers.into()))); |
| 127 | + } |
| 128 | + Some(Some(Ok(None))) => { |
| 129 | + _ = trailers_tx.send(Ok(None)); |
| 130 | + } |
| 131 | + Some(Some(Err(err))) => { |
| 132 | + _ = trailers_tx.send(Err(err)); |
| 133 | + } |
| 134 | + Some(None) | None => { |
| 135 | + // Either: |
| 136 | + // - trailer receiver has been closed |
| 137 | + // - guest writer has been closed |
| 138 | + drop(trailers_tx); |
| 139 | + } |
| 140 | + } |
| 141 | + |
| 142 | + let mut io = pin!(io); |
| 143 | + if let Some(res) = { |
| 144 | + let mut result_rx_dropped = pin!(result_tx.watch_reader()); |
| 145 | + poll_fn(|cx| match result_rx_dropped.as_mut().poll(cx) { |
| 146 | + Poll::Ready(()) => return Poll::Ready(None), |
| 147 | + Poll::Pending => io.as_mut().poll(cx).map(Some), |
| 148 | + }) |
| 149 | + .await |
| 150 | + } { |
| 151 | + result_tx.write(res).await; |
| 152 | + } |
| 153 | + Ok(()) |
| 154 | + } |
| 155 | +} |
| 156 | + |
| 157 | +pub(crate) struct GuestBody { |
| 158 | + pub(crate) contents_rx: Option<mpsc::Receiver<Bytes>>, |
| 159 | + pub(crate) trailers_rx: |
| 160 | + Option<oneshot::Receiver<Result<Option<Arc<http::HeaderMap>>, ErrorCode>>>, |
| 161 | +} |
| 162 | + |
| 163 | +impl http_body::Body for GuestBody { |
| 164 | + type Data = Bytes; |
| 165 | + type Error = ErrorCode; |
| 166 | + |
| 167 | + fn poll_frame( |
| 168 | + mut self: Pin<&mut Self>, |
| 169 | + cx: &mut Context<'_>, |
| 170 | + ) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> { |
| 171 | + if let Some(contents_rx) = self.contents_rx.as_mut() { |
| 172 | + while let Some(buf) = ready!(contents_rx.poll_recv(cx)) { |
| 173 | + return Poll::Ready(Some(Ok(http_body::Frame::data(buf)))); |
| 174 | + } |
| 175 | + self.contents_rx = None; |
| 176 | + } |
| 177 | + |
| 178 | + let Some(trailers_rx) = self.trailers_rx.as_mut() else { |
| 179 | + return Poll::Ready(None); |
| 180 | + }; |
| 181 | + |
| 182 | + let res = ready!(Pin::new(trailers_rx).poll(cx)); |
| 183 | + self.trailers_rx = None; |
| 184 | + match res { |
| 185 | + Ok(Ok(Some(trailers))) => Poll::Ready(Some(Ok(http_body::Frame::trailers( |
| 186 | + Arc::unwrap_or_clone(trailers), |
| 187 | + )))), |
| 188 | + Ok(Ok(None)) => Poll::Ready(None), |
| 189 | + Ok(Err(err)) => Poll::Ready(Some(Err(err))), |
| 190 | + Err(..) => Poll::Ready(None), |
| 191 | + } |
| 192 | + } |
| 193 | + |
| 194 | + fn is_end_stream(&self) -> bool { |
| 195 | + if let Some(contents_rx) = self.contents_rx.as_ref() { |
| 196 | + if !contents_rx.is_empty() || !contents_rx.is_closed() { |
| 197 | + return false; |
| 198 | + } |
| 199 | + } |
| 200 | + if let Some(trailers_rx) = self.trailers_rx.as_ref() { |
| 201 | + if !trailers_rx.is_terminated() { |
| 202 | + return false; |
| 203 | + } |
| 204 | + } |
| 205 | + return true; |
| 206 | + } |
| 207 | + |
| 208 | + fn size_hint(&self) -> http_body::SizeHint { |
| 209 | + // TODO: use content-length |
| 210 | + http_body::SizeHint::default() |
| 211 | + } |
| 212 | +} |
| 213 | + |
| 214 | +pub(crate) struct ConsumedBody; |
| 215 | + |
| 216 | +impl http_body::Body for ConsumedBody { |
| 217 | + type Data = Bytes; |
| 218 | + type Error = ErrorCode; |
| 219 | + |
| 220 | + fn poll_frame( |
| 221 | + self: Pin<&mut Self>, |
| 222 | + _cx: &mut Context<'_>, |
| 223 | + ) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> { |
| 224 | + Poll::Ready(Some(Err(ErrorCode::InternalError(Some( |
| 225 | + "body consumed".into(), |
| 226 | + ))))) |
| 227 | + } |
| 228 | + |
| 229 | + fn is_end_stream(&self) -> bool { |
| 230 | + true |
| 231 | + } |
| 232 | + |
| 233 | + fn size_hint(&self) -> http_body::SizeHint { |
| 234 | + http_body::SizeHint::with_exact(0) |
| 235 | + } |
| 236 | +} |
0 commit comments