@@ -42,6 +42,7 @@ pub(crate) enum Body {
4242}
4343
4444impl Body {
45+ /// Implementation of `consume-body` shared between requests and responses
4546 pub ( crate ) fn consume < T > (
4647 self ,
4748 mut store : Access < ' _ , T , WasiHttp > ,
@@ -105,6 +106,7 @@ impl Body {
105106 }
106107 }
107108
109+ /// Implementation of `drop` shared between requests and responses
108110 pub ( crate ) fn drop ( self , mut store : impl AsContextMut ) {
109111 if let Body :: Guest {
110112 contents_rx,
@@ -120,7 +122,8 @@ impl Body {
120122 }
121123}
122124
123- pub ( crate ) enum GuestBodyKind {
125+ /// The kind of body, used for error reporting
126+ pub ( crate ) enum BodyKind {
124127 Request ,
125128 Response ,
126129}
@@ -141,20 +144,22 @@ impl ContentLength {
141144 }
142145}
143146
147+ /// [StreamConsumer] implementation for bodies originating in the guest.
144148struct GuestBodyConsumer {
145149 contents_tx : PollSender < Result < Bytes , ErrorCode > > ,
146150 result_tx : Option < oneshot:: Sender < Result < ( ) , ErrorCode > > > ,
147151 content_length : Option < ContentLength > ,
148- kind : GuestBodyKind ,
152+ kind : BodyKind ,
149153 // `true` when the other side of `contents_tx` was unexpectedly closed
150154 closed : bool ,
151155}
152156
153157impl GuestBodyConsumer {
158+ /// Constructs the approprite body size error given the [BodyKind]
154159 fn body_size_error ( & self , n : Option < u64 > ) -> ErrorCode {
155160 match self . kind {
156- GuestBodyKind :: Request => ErrorCode :: HttpRequestBodySize ( n) ,
157- GuestBodyKind :: Response => ErrorCode :: HttpResponseBodySize ( n) ,
161+ BodyKind :: Request => ErrorCode :: HttpRequestBodySize ( n) ,
162+ BodyKind :: Response => ErrorCode :: HttpResponseBodySize ( n) ,
158163 }
159164 }
160165
@@ -235,20 +240,22 @@ impl<D> StreamConsumer<D> for GuestBodyConsumer {
235240 }
236241}
237242
243+ /// [http_body::Body] implementation for bodies originating in the guest.
238244pub ( crate ) struct GuestBody {
239245 contents_rx : Option < mpsc:: Receiver < Result < Bytes , ErrorCode > > > ,
240246 trailers_rx : Option < oneshot:: Receiver < Result < Option < Arc < http:: HeaderMap > > , ErrorCode > > > ,
241247 content_length : Option < u64 > ,
242248}
243249
244250impl GuestBody {
251+ /// Construct a new [GuestBody]
245252 pub ( crate ) fn new < T : ' static > (
246253 mut store : impl AsContextMut < Data = T > ,
247254 contents_rx : Option < StreamReader < u8 > > ,
248255 trailers_rx : FutureReader < Result < Option < Resource < Trailers > > , ErrorCode > > ,
249256 result_tx : oneshot:: Sender < Result < ( ) , ErrorCode > > ,
250257 content_length : Option < u64 > ,
251- kind : GuestBodyKind ,
258+ kind : BodyKind ,
252259 getter : fn ( & mut T ) -> WasiHttpCtxView < ' _ > ,
253260 ) -> Self {
254261 let ( trailers_http_tx, trailers_http_rx) = oneshot:: channel ( ) ;
@@ -290,10 +297,15 @@ impl http_body::Body for GuestBody {
290297 cx : & mut Context < ' _ > ,
291298 ) -> Poll < Option < Result < http_body:: Frame < Self :: Data > , Self :: Error > > > {
292299 if let Some ( contents_rx) = self . contents_rx . as_mut ( ) {
300+ // `contents_rx` has not been closed yet, poll it
293301 while let Some ( res) = ready ! ( contents_rx. poll_recv( cx) ) {
294302 match res {
295303 Ok ( buf) => {
296304 if let Some ( n) = self . content_length . as_mut ( ) {
305+ // Substract frame length from `content_length`,
306+ // [GuestBodyConsumer] already performs the validation, so
307+ // just keep count as optimization for
308+ // `is_end_stream` and `size_hint`
297309 * n = n. saturating_sub ( buf. len ( ) . try_into ( ) . unwrap_or ( u64:: MAX ) ) ;
298310 }
299311 return Poll :: Ready ( Some ( Ok ( http_body:: Frame :: data ( buf) ) ) ) ;
@@ -303,14 +315,17 @@ impl http_body::Body for GuestBody {
303315 }
304316 }
305317 }
318+ // Record that `contents_rx` is closed
306319 self . contents_rx = None ;
307320 }
308321
309322 let Some ( trailers_rx) = self . trailers_rx . as_mut ( ) else {
323+ // `trailers_rx` has already terminated - this is the end of stream
310324 return Poll :: Ready ( None ) ;
311325 } ;
312326
313327 let res = ready ! ( Pin :: new( trailers_rx) . poll( cx) ) ;
328+ // Record that `trailers_rx` has terminated
314329 self . trailers_rx = None ;
315330 match res {
316331 Ok ( Ok ( Some ( trailers) ) ) => Poll :: Ready ( Some ( Ok ( http_body:: Frame :: trailers (
@@ -328,14 +343,18 @@ impl http_body::Body for GuestBody {
328343 || !contents_rx. is_closed ( )
329344 || self . content_length . is_some_and ( |n| n > 0 )
330345 {
346+ // `contents_rx` might still produce data frames
331347 return false ;
332348 }
333349 }
334350 if let Some ( trailers_rx) = self . trailers_rx . as_ref ( ) {
335351 if !trailers_rx. is_terminated ( ) {
352+ // `trailers_rx` has not terminated yet
336353 return false ;
337354 }
338355 }
356+
357+ // no data left
339358 return true ;
340359 }
341360
@@ -348,6 +367,7 @@ impl http_body::Body for GuestBody {
348367 }
349368}
350369
370+ /// [http_body::Body] that has been consumed.
351371pub ( crate ) struct ConsumedBody ;
352372
353373impl http_body:: Body for ConsumedBody {
@@ -372,6 +392,7 @@ impl http_body::Body for ConsumedBody {
372392 }
373393}
374394
395+ /// [FutureConsumer] implementation for trailers originating in the guest.
375396struct GuestTrailerConsumer < T > {
376397 tx : Option < oneshot:: Sender < Result < Option < Arc < HeaderMap > > , ErrorCode > > > ,
377398 getter : fn ( & mut T ) -> WasiHttpCtxView < ' _ > ,
@@ -409,6 +430,7 @@ where
409430 }
410431}
411432
433+ /// [StreamProducer] implementation for bodies originating in the host.
412434struct HostBodyStreamProducer < T > {
413435 body : BoxBody < Bytes , ErrorCode > ,
414436 trailers : Option < oneshot:: Sender < Result < Option < Resource < Trailers > > , ErrorCode > > > ,
@@ -447,6 +469,8 @@ where
447469 let cap = match dst. remaining ( & mut store) . map ( NonZeroUsize :: new) {
448470 Some ( Some ( cap) ) => Some ( cap) ,
449471 Some ( None ) => {
472+ // On 0-length the best we can do is check that underlying stream has not
473+ // reached the end yet
450474 if self . body . is_end_stream ( ) {
451475 break ' result Ok ( None ) ;
452476 } else {
@@ -463,11 +487,13 @@ where
463487 let n = frame. len ( ) ;
464488 let cap = cap. into ( ) ;
465489 if n > cap {
490+ // data frame does not fit in destination, fill it and buffer the rest
466491 dst. set_buffer ( Cursor :: new ( frame. split_off ( cap) ) ) ;
467492 let mut dst = dst. as_direct ( store, cap) ;
468493 dst. remaining ( ) . copy_from_slice ( & frame) ;
469494 dst. mark_written ( cap) ;
470495 } else {
496+ // copy the whole frame into the destination
471497 let mut dst = dst. as_direct ( store, n) ;
472498 dst. remaining ( ) [ ..n] . copy_from_slice ( & frame) ;
473499 dst. mark_written ( n) ;
0 commit comments