@@ -4,18 +4,18 @@ use crate::{
44 types:: { invoke_request_id, IntoFunctionResponse , LambdaEvent } ,
55 Config , Context , Diagnostic ,
66} ;
7- #[ cfg( feature = "experimental- concurrency" ) ]
7+ #[ cfg( feature = "concurrency-tokio " ) ]
88use futures:: stream:: FuturesUnordered ;
99use http_body_util:: BodyExt ;
1010use lambda_runtime_api_client:: { BoxError , Client as ApiClient } ;
1111use serde:: { Deserialize , Serialize } ;
12- #[ cfg( feature = "experimental- concurrency" ) ]
12+ #[ cfg( feature = "concurrency-tokio " ) ]
1313use std:: fmt;
14- use std:: { env, fmt:: Debug , future:: Future , io , sync:: Arc } ;
14+ use std:: { env, fmt:: Debug , future:: Future , sync:: Arc } ;
1515use tokio_stream:: { Stream , StreamExt } ;
1616use tower:: { Layer , Service , ServiceExt } ;
1717use tracing:: trace;
18- #[ cfg( feature = "experimental- concurrency" ) ]
18+ #[ cfg( feature = "concurrency-tokio " ) ]
1919use tracing:: { debug, error, info_span, warn, Instrument } ;
2020
2121/* ----------------------------------------- INVOCATION ---------------------------------------- */
9696 /// Note that manually creating a [Runtime] does not add tracing to the executed handler
9797 /// as is done by [super::run]. If you want to add the default tracing functionality, call
9898 /// [Runtime::layer] with a [super::layers::TracingLayer].
99+ ///
100+ ///
101+ /// # Panics
102+ ///
103+ /// This function panics if required Lambda environment variables are missing
104+ /// (`AWS_LAMBDA_FUNCTION_NAME`, `AWS_LAMBDA_FUNCTION_MEMORY_SIZE`,
105+ /// `AWS_LAMBDA_FUNCTION_VERSION`, `AWS_LAMBDA_RUNTIME_API`).
99106 pub fn new ( handler : F ) -> Self {
100107 trace ! ( "Loading config from env" ) ;
101108 let config = Arc :: new ( Config :: from_env ( ) ) ;
@@ -154,19 +161,30 @@ impl<S> Runtime<S> {
154161 }
155162}
156163
157- #[ cfg( feature = "experimental- concurrency" ) ]
164+ #[ cfg( feature = "concurrency-tokio " ) ]
158165impl < S > Runtime < S >
159166where
160167 S : Service < LambdaInvocation , Response = ( ) , Error = BoxError > + Clone + Send + ' static ,
161168 S :: Future : Send ,
162169{
163- /// Start the runtime in concurrent mode when configured for Lambda managed-concurrency.
170+ /// Start the runtime and begin polling for events on the Lambda Runtime API,
171+ /// in a mode that is compatible with Lambda Managed Instances.
172+ ///
173+ /// When `AWS_LAMBDA_MAX_CONCURRENCY` is set to a value greater than 1, this
174+ /// spawns multiple tokio worker tasks to handle concurrent invocations. When the
175+ /// environment variable is unset or `<= 1`, it falls back to sequential
176+ /// behavior, so the same handler can run on both classic Lambda and Lambda
177+ /// Managed Instances.
178+ ///
179+ /// # Panics
164180 ///
165- /// If `AWS_LAMBDA_MAX_CONCURRENCY` is not set or is `<= 1`, this falls back to the
166- /// sequential `run_with_incoming` loop so that the same handler can run on both
167- /// classic Lambda and Lambda Managed Instances.
168- #[ cfg_attr( docsrs, doc( cfg( feature = "experimental-concurrency" ) ) ) ]
181+ /// This function panics if called outside of a Tokio runtime.
182+ #[ cfg_attr( docsrs, doc( cfg( feature = "concurrency-tokio" ) ) ) ]
169183 pub async fn run_concurrent ( self ) -> Result < ( ) , BoxError > {
184+ if tokio:: runtime:: Handle :: try_current ( ) . is_err ( ) {
185+ panic ! ( "`run_concurrent` must be called from within a Tokio runtime" ) ;
186+ }
187+
170188 if self . concurrency_limit > 1 {
171189 trace ! ( "Concurrent mode: _X_AMZN_TRACE_ID is not set; use context.xray_trace_id" ) ;
172190 Self :: run_concurrent_inner ( self . service , self . config , self . client , self . concurrency_limit ) . await
@@ -259,20 +277,20 @@ where
259277 }
260278}
261279
262- #[ cfg( feature = "experimental- concurrency" ) ]
280+ #[ cfg( feature = "concurrency-tokio " ) ]
263281#[ derive( Debug ) ]
264282enum WorkerError {
265283 CleanExit ( tokio:: task:: Id ) ,
266284 Failure ( tokio:: task:: Id , BoxError ) ,
267285}
268286
269- #[ cfg( feature = "experimental- concurrency" ) ]
287+ #[ cfg( feature = "concurrency-tokio " ) ]
270288#[ derive( Debug ) ]
271289struct ConcurrentWorkerErrors {
272290 errors : Vec < WorkerError > ,
273291}
274292
275- #[ cfg( feature = "experimental- concurrency" ) ]
293+ #[ cfg( feature = "concurrency-tokio " ) ]
276294#[ derive( Serialize ) ]
277295struct ConcurrentWorkerErrorsPayload < ' a > {
278296 message : & ' a str ,
@@ -282,14 +300,14 @@ struct ConcurrentWorkerErrorsPayload<'a> {
282300 failures : Vec < WorkerFailurePayload > ,
283301}
284302
285- #[ cfg( feature = "experimental- concurrency" ) ]
303+ #[ cfg( feature = "concurrency-tokio " ) ]
286304#[ derive( Serialize ) ]
287305struct WorkerFailurePayload {
288306 id : String ,
289307 err : String ,
290308}
291309
292- #[ cfg( feature = "experimental- concurrency" ) ]
310+ #[ cfg( feature = "concurrency-tokio " ) ]
293311impl fmt:: Display for ConcurrentWorkerErrors {
294312 fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
295313 let mut clean = Vec :: new ( ) ;
@@ -326,7 +344,7 @@ impl fmt::Display for ConcurrentWorkerErrors {
326344 }
327345}
328346
329- #[ cfg( feature = "experimental- concurrency" ) ]
347+ #[ cfg( feature = "concurrency-tokio " ) ]
330348impl std:: error:: Error for ConcurrentWorkerErrors { }
331349
332350impl < S > Runtime < S >
@@ -335,14 +353,20 @@ where
335353{
336354 /// Start the runtime and begin polling for events on the Lambda Runtime API.
337355 ///
338- /// If `AWS_LAMBDA_MAX_CONCURRENCY` is set, this returns an error because it does not enable
339- /// concurrent polling. Enable the `experimental-concurrency` feature and use
340- /// [`Runtime::run_concurrent`] instead.
356+ /// The runtime will process requests sequentially.
357+ ///
358+ /// # Managed concurrency
359+ /// If `AWS_LAMBDA_MAX_CONCURRENCY` is set, a warning is logged.
360+ /// If your handler can satisfy `Clone + Send + 'static`,
361+ /// prefer [`Runtime::run_concurrent`] (requires the `concurrency-tokio` feature),
362+ /// which honors managed concurrency and falls back to sequential behavior when
363+ /// unset.
341364 pub async fn run ( self ) -> Result < ( ) , BoxError > {
342365 if let Some ( raw) = concurrency_env_value ( ) {
343- return Err ( Box :: new ( io:: Error :: other ( format ! (
344- "AWS_LAMBDA_MAX_CONCURRENCY is set to '{raw}', but Runtime::run does not support concurrent polling; enable the experimental-concurrency feature and use Runtime::run_concurrent instead"
345- ) ) ) ) ;
366+ tracing:: warn!(
367+ "AWS_LAMBDA_MAX_CONCURRENCY is set to '{}', but the concurrency-tokio feature is not enabled; running sequentially" ,
368+ raw
369+ ) ;
346370 }
347371 let incoming = incoming ( & self . client ) ;
348372 Self :: run_with_incoming ( self . service , self . config , incoming) . await
@@ -412,7 +436,7 @@ fn incoming(
412436}
413437
414438/// Creates a future that polls the `/next` endpoint.
415- #[ cfg( feature = "experimental- concurrency" ) ]
439+ #[ cfg( feature = "concurrency-tokio " ) ]
416440async fn next_event_future ( client : & ApiClient ) -> Result < http:: Response < hyper:: body:: Incoming > , BoxError > {
417441 let req = NextEventRequest . into_req ( ) ?;
418442 client. call ( req) . await
@@ -429,7 +453,7 @@ fn concurrency_env_value() -> Option<String> {
429453 env:: var ( "AWS_LAMBDA_MAX_CONCURRENCY" ) . ok ( )
430454}
431455
432- #[ cfg( feature = "experimental- concurrency" ) ]
456+ #[ cfg( feature = "concurrency-tokio " ) ]
433457async fn concurrent_worker_loop < S > ( mut service : S , config : Arc < Config > , client : Arc < ApiClient > ) -> Result < ( ) , BoxError >
434458where
435459 S : Service < LambdaInvocation , Response = ( ) , Error = BoxError > ,
@@ -760,7 +784,7 @@ mod endpoint_tests {
760784 . await
761785 }
762786
763- #[ cfg( feature = "experimental- concurrency" ) ]
787+ #[ cfg( feature = "concurrency-tokio " ) ]
764788 #[ tokio:: test]
765789 async fn concurrent_worker_crash_does_not_stop_other_workers ( ) -> Result < ( ) , Error > {
766790 let next_calls = Arc :: new ( AtomicUsize :: new ( 0 ) ) ;
@@ -910,7 +934,7 @@ mod endpoint_tests {
910934 }
911935
912936 #[ tokio:: test]
913- #[ cfg( feature = "experimental- concurrency" ) ]
937+ #[ cfg( feature = "concurrency-tokio " ) ]
914938 async fn test_concurrent_structured_logging_isolation ( ) -> Result < ( ) , Error > {
915939 use std:: collections:: HashSet ;
916940 use tracing:: info;
0 commit comments