11use crate :: LambdaInvocation ;
22use futures:: { future:: BoxFuture , ready, FutureExt , TryFutureExt } ;
3+ use http:: StatusCode ;
4+ use http_body_util:: BodyExt ;
35use hyper:: body:: Incoming ;
46use lambda_runtime_api_client:: { body:: Body , BoxError , Client } ;
57use pin_project:: pin_project;
4042 fn call ( & mut self , req : LambdaInvocation ) -> Self :: Future {
4143 let request_fut = self . inner . call ( req) ;
4244 let client = self . client . clone ( ) ;
43- RuntimeApiClientFuture :: First ( request_fut, client)
45+ RuntimeApiClientFuture :: BuildingRequest ( request_fut, client)
4446 }
4547}
4648
5860
5961#[ pin_project( project = RuntimeApiClientFutureProj ) ]
6062pub enum RuntimeApiClientFuture < F > {
61- First ( #[ pin] F , Arc < Client > ) ,
62- Second ( #[ pin] BoxFuture < ' static , Result < http:: Response < Incoming > , BoxError > > ) ,
63+ /// The inner service is building the HTTP request to send to the Lambda Runtime API.
64+ BuildingRequest ( #[ pin] F , Arc < Client > ) ,
65+ /// The HTTP request is in-flight; waiting for the Lambda Runtime API response.
66+ SendingRequest ( #[ pin] BoxFuture < ' static , Result < http:: Response < Incoming > , BoxError > > ) ,
67+ /// The Lambda Runtime API returned a non-2xx response; reading the response body for logging.
68+ ReadingErrorBody ( StatusCode , #[ pin] BoxFuture < ' static , Result < Vec < u8 > , BoxError > > ) ,
6369}
6470
6571impl < F > Future for RuntimeApiClientFuture < F >
7278 // NOTE: We loop here to directly poll the second future once the first has finished.
7379 task:: Poll :: Ready ( loop {
7480 match self . as_mut ( ) . project ( ) {
75- RuntimeApiClientFutureProj :: First ( fut, client) => match ready ! ( fut. poll( cx) ) {
81+ RuntimeApiClientFutureProj :: BuildingRequest ( fut, client) => match ready ! ( fut. poll( cx) ) {
7682 Ok ( ok) => {
7783 // NOTE: We use 'client.call_boxed' here to obtain a future with static
7884 // lifetime. Otherwise, this future would need to be self-referential...
@@ -83,12 +89,127 @@ where
8389 err
8490 } )
8591 . boxed ( ) ;
86- self . set ( RuntimeApiClientFuture :: Second ( next_fut) ) ;
92+ self . set ( RuntimeApiClientFuture :: SendingRequest ( next_fut) ) ;
8793 }
8894 Err ( err) => break Err ( err) ,
8995 } ,
90- RuntimeApiClientFutureProj :: Second ( fut) => break ready ! ( fut. poll( cx) ) . map ( |_| ( ) ) ,
96+ RuntimeApiClientFutureProj :: SendingRequest ( fut) => match ready ! ( fut. poll( cx) ) {
97+ Ok ( resp) if !resp. status ( ) . is_success ( ) => {
98+ let status = resp. status ( ) ;
99+ let body_fut = resp
100+ . into_body ( )
101+ . collect ( )
102+ . map_ok ( |b| b. to_bytes ( ) . to_vec ( ) )
103+ . map_err ( BoxError :: from)
104+ . boxed ( ) ;
105+ self . set ( RuntimeApiClientFuture :: ReadingErrorBody ( status, body_fut) ) ;
106+ }
107+ Ok ( _) => break Ok ( ( ) ) ,
108+ Err ( err) => break Err ( err) ,
109+ } ,
110+ RuntimeApiClientFutureProj :: ReadingErrorBody ( status, fut) => {
111+ match ready ! ( fut. poll( cx) ) {
112+ Ok ( body) => {
113+ let body_str = String :: from_utf8_lossy ( & body) ;
114+ error ! ( status = %status, body = %body_str, "Lambda Runtime API returned non-200 response" ) ;
115+ }
116+ Err ( err) => {
117+ error ! ( status = %status, error = ?err, "Lambda Runtime API returned non-200 response; failed to read body" ) ;
118+ }
119+ }
120+ break Ok ( ( ) ) ;
121+ }
91122 }
92123 } )
93124 }
94125}
126+
127+ #[ cfg( test) ]
128+ mod tests {
129+ use super :: * ;
130+ use crate :: types:: Context ;
131+ use bytes:: Bytes ;
132+ use httpmock:: prelude:: * ;
133+ use std:: future;
134+ use tower:: Service ;
135+
136+ fn make_invocation ( ) -> LambdaInvocation {
137+ let ( parts, _) = http:: Response :: new ( ( ) ) . into_parts ( ) ;
138+ LambdaInvocation {
139+ parts,
140+ body : Bytes :: new ( ) ,
141+ context : Context :: default ( ) ,
142+ }
143+ }
144+
145+ fn make_client ( server : & MockServer ) -> Arc < Client > {
146+ let uri = format ! ( "http://{}:{}" , server. host( ) , server. port( ) ) . parse ( ) . unwrap ( ) ;
147+ Arc :: new ( Client :: builder ( ) . with_endpoint ( uri) . build ( ) . unwrap ( ) )
148+ }
149+
150+ /// Builds a mock inner service that always returns the given request.
151+ fn mock_inner (
152+ req : http:: Request < Body > ,
153+ ) -> impl Service <
154+ LambdaInvocation ,
155+ Response = http:: Request < Body > ,
156+ Error = BoxError ,
157+ Future = impl Future < Output = Result < http:: Request < Body > , BoxError > > ,
158+ > {
159+ tower:: service_fn ( move |_inv : LambdaInvocation | {
160+ let req = http:: Request :: builder ( )
161+ . uri ( req. uri ( ) . clone ( ) )
162+ . method ( req. method ( ) . clone ( ) )
163+ . body ( Body :: empty ( ) )
164+ . unwrap ( ) ;
165+ future:: ready ( Ok :: < _ , BoxError > ( req) )
166+ } )
167+ }
168+
169+ #[ tokio:: test]
170+ async fn test_2xx_response_succeeds ( ) {
171+ let server = MockServer :: start ( ) ;
172+ server. mock ( |when, then| {
173+ when. any_request ( ) ;
174+ then. status ( 200 ) . body ( "ok" ) ;
175+ } ) ;
176+
177+ let client = make_client ( & server) ;
178+ let req = http:: Request :: builder ( ) . uri ( "/some/path" ) . body ( Body :: empty ( ) ) . unwrap ( ) ;
179+ let mut svc = RuntimeApiClientService :: new ( mock_inner ( req) , client) ;
180+
181+ let result = svc. call ( make_invocation ( ) ) . await ;
182+ assert ! ( result. is_ok( ) ) ;
183+ }
184+
185+ #[ tokio:: test]
186+ async fn test_non_2xx_response_is_logged_and_succeeds ( ) {
187+ let server = MockServer :: start ( ) ;
188+ server. mock ( |when, then| {
189+ when. any_request ( ) ;
190+ then. status ( 500 ) . body ( "internal server error" ) ;
191+ } ) ;
192+
193+ let client = make_client ( & server) ;
194+ let req = http:: Request :: builder ( ) . uri ( "/some/path" ) . body ( Body :: empty ( ) ) . unwrap ( ) ;
195+ let mut svc = RuntimeApiClientService :: new ( mock_inner ( req) , client) ;
196+
197+ // Non-2xx should still resolve Ok(()) — the error is logged, not propagated.
198+ let result = svc. call ( make_invocation ( ) ) . await ;
199+ assert ! ( result. is_ok( ) ) ;
200+ }
201+
202+ #[ tokio:: test]
203+ async fn test_inner_service_error_propagates ( ) {
204+ let server = MockServer :: start ( ) ;
205+ let client = make_client ( & server) ;
206+
207+ let failing_inner = tower:: service_fn ( |_: LambdaInvocation | {
208+ future:: ready ( Err :: < http:: Request < Body > , BoxError > ( "inner error" . into ( ) ) )
209+ } ) ;
210+ let mut svc = RuntimeApiClientService :: new ( failing_inner, client) ;
211+
212+ let result = svc. call ( make_invocation ( ) ) . await ;
213+ assert ! ( result. is_err( ) ) ;
214+ }
215+ }
0 commit comments