@@ -37,55 +37,72 @@ pub(crate) enum Body {
3737 /// Channel, on which transmission result will be written
3838 result_tx : oneshot:: Sender < Box < dyn Future < Output = Result < ( ) , ErrorCode > > + Send > > ,
3939 } ,
40- /// Body is consumed.
41- Consumed ,
40+ }
41+
42+ /// [FutureConsumer] implementation for future passed to `consume-body`.
43+ struct BodyResultConsumer (
44+ Option < oneshot:: Sender < Box < dyn Future < Output = Result < ( ) , ErrorCode > > + Send > > > ,
45+ ) ;
46+
47+ impl < D > FutureConsumer < D > for BodyResultConsumer
48+ where
49+ D : ' static ,
50+ {
51+ type Item = Result < ( ) , ErrorCode > ;
52+
53+ fn poll_consume (
54+ mut self : Pin < & mut Self > ,
55+ _: & mut Context < ' _ > ,
56+ store : StoreContextMut < D > ,
57+ mut src : Source < ' _ , Self :: Item > ,
58+ _: bool ,
59+ ) -> Poll < wasmtime:: Result < ( ) > > {
60+ let mut res = None ;
61+ src. read ( store, & mut res) . context ( "failed to read result" ) ?;
62+ let res = res. context ( "result value missing" ) ?;
63+ let tx = self . 0 . take ( ) . context ( "polled after returning `Ready`" ) ?;
64+ _ = tx. send ( Box :: new ( async { res } ) ) ;
65+ Poll :: Ready ( Ok ( ( ) ) )
66+ }
4267}
4368
4469impl Body {
4570 /// Implementation of `consume-body` shared between requests and responses
4671 pub ( crate ) fn consume < T > (
4772 self ,
4873 mut store : Access < ' _ , T , WasiHttp > ,
74+ fut : FutureReader < Result < ( ) , ErrorCode > > ,
4975 getter : fn ( & mut T ) -> WasiHttpCtxView < ' _ > ,
50- ) -> Result <
51- (
52- StreamReader < u8 > ,
53- FutureReader < Result < Option < Resource < Trailers > > , ErrorCode > > ,
54- ) ,
55- ( ) ,
56- > {
76+ ) -> (
77+ StreamReader < u8 > ,
78+ FutureReader < Result < Option < Resource < Trailers > > , ErrorCode > > ,
79+ ) {
5780 match self {
5881 Body :: Guest {
5982 contents_rx : Some ( contents_rx) ,
6083 trailers_rx,
6184 result_tx,
6285 } => {
63- // TODO: Use a result specified by the caller
64- // https://github.com/WebAssembly/wasi-http/issues/176
65- _ = result_tx. send ( Box :: new ( async { Ok ( ( ) ) } ) ) ;
66- Ok ( ( contents_rx, trailers_rx) )
86+ fut. pipe ( & mut store, BodyResultConsumer ( Some ( result_tx) ) ) ;
87+ ( contents_rx, trailers_rx)
6788 }
6889 Body :: Guest {
6990 contents_rx : None ,
7091 trailers_rx,
7192 result_tx,
7293 } => {
94+ fut. pipe ( & mut store, BodyResultConsumer ( Some ( result_tx) ) ) ;
7395 let instance = store. instance ( ) ;
74- // TODO: Use a result specified by the caller
75- // https://github.com/WebAssembly/wasi-http/issues/176
76- _ = result_tx. send ( Box :: new ( async { Ok ( ( ) ) } ) ) ;
77- Ok ( (
96+ (
7897 StreamReader :: new ( instance, & mut store, StreamEmptyProducer :: default ( ) ) ,
7998 trailers_rx,
80- ) )
99+ )
81100 }
82101 Body :: Host { body, result_tx } => {
102+ fut. pipe ( & mut store, BodyResultConsumer ( Some ( result_tx) ) ) ;
83103 let instance = store. instance ( ) ;
84- // TODO: Use a result specified by the caller
85- // https://github.com/WebAssembly/wasi-http/issues/176
86- _ = result_tx. send ( Box :: new ( async { Ok ( ( ) ) } ) ) ;
87104 let ( trailers_tx, trailers_rx) = oneshot:: channel ( ) ;
88- Ok ( (
105+ (
89106 StreamReader :: new (
90107 instance,
91108 & mut store,
@@ -100,9 +117,8 @@ impl Body {
100117 & mut store,
101118 FutureOneshotProducer :: from ( trailers_rx) ,
102119 ) ,
103- ) )
120+ )
104121 }
105- Body :: Consumed => Err ( ( ) ) ,
106122 }
107123 }
108124
@@ -394,31 +410,6 @@ impl http_body::Body for GuestBody {
394410 }
395411}
396412
397- /// [http_body::Body] that has been consumed.
398- pub ( crate ) struct ConsumedBody ;
399-
400- impl http_body:: Body for ConsumedBody {
401- type Data = Bytes ;
402- type Error = ErrorCode ;
403-
404- fn poll_frame (
405- self : Pin < & mut Self > ,
406- _cx : & mut Context < ' _ > ,
407- ) -> Poll < Option < Result < http_body:: Frame < Self :: Data > , Self :: Error > > > {
408- Poll :: Ready ( Some ( Err ( ErrorCode :: InternalError ( Some (
409- "body consumed" . into ( ) ,
410- ) ) ) ) )
411- }
412-
413- fn is_end_stream ( & self ) -> bool {
414- true
415- }
416-
417- fn size_hint ( & self ) -> http_body:: SizeHint {
418- http_body:: SizeHint :: with_exact ( 0 )
419- }
420- }
421-
422413/// [FutureConsumer] implementation for trailers originating in the guest.
423414struct GuestTrailerConsumer < T > {
424415 tx : Option < oneshot:: Sender < Result < Option < Arc < HeaderMap > > , ErrorCode > > > ,
@@ -438,10 +429,10 @@ where
438429 mut src : Source < ' _ , Self :: Item > ,
439430 _: bool ,
440431 ) -> Poll < wasmtime:: Result < ( ) > > {
441- let mut result = None ;
442- src. read ( store. as_context_mut ( ) , & mut result )
432+ let mut res = None ;
433+ src. read ( & mut store, & mut res )
443434 . context ( "failed to read result" ) ?;
444- let res = match result . context ( "result value missing" ) ? {
435+ let res = match res . context ( "result value missing" ) ? {
445436 Ok ( Some ( trailers) ) => {
446437 let WasiHttpCtxView { table, .. } = ( self . getter ) ( store. data_mut ( ) ) ;
447438 let trailers = table
0 commit comments