Skip to content

Commit ced94fa

Browse files
committed
refactor(lambda-managed-instances): warn on run() with `AWS_LAMBDA_MAX_CONCURRENCY, rename feature experimental-concurrency -> concurrency->tokio
1 parent 079087d commit ced94fa

7 files changed

Lines changed: 83 additions & 49 deletions

File tree

examples/basic-lambda-concurrent/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,6 @@ version = "0.1.0"
44
edition = "2021"
55

66
[dependencies]
7-
lambda_runtime = { path = "../../lambda-runtime", features = ["experimental-concurrency"] }
7+
lambda_runtime = { path = "../../lambda-runtime", features = ["concurrency-tokio"] }
88
serde = "1.0.219"
99
tokio = { version = "1", features = ["macros"] }

lambda-http/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ opentelemetry = ["lambda_runtime/opentelemetry"] # enables access to the OpenTel
2929
anyhow = ["lambda_runtime/anyhow"] # enables From<T> for Diagnostic for anyhow error types, see README.md for more info
3030
eyre = ["lambda_runtime/eyre"] # enables From<T> for Diagnostic for eyre error types, see README.md for more info
3131
miette = ["lambda_runtime/miette"] # enables From<T> for Diagnostic for miette error types, see README.md for more info
32-
experimental-concurrency = ["lambda_runtime/experimental-concurrency"]
32+
concurrency-tokio = ["lambda_runtime/concurrency-tokio"]
3333

3434
[dependencies]
3535
bytes = { workspace = true }

lambda-http/src/lib.rs

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ use std::{
102102
};
103103

104104
mod streaming;
105-
#[cfg(feature = "experimental-concurrency")]
105+
#[cfg(feature = "concurrency-tokio")]
106106
pub use streaming::run_with_streaming_response_concurrent;
107107
pub use streaming::{run_with_streaming_response, StreamAdapter};
108108

@@ -213,7 +213,7 @@ where
213213
/// # Managed concurrency
214214
/// If `AWS_LAMBDA_MAX_CONCURRENCY` is set, this function returns an error because
215215
/// it does not enable concurrent polling. If your handler can satisfy `Clone`,
216-
/// prefer [`run_concurrent`] (requires the `experimental-concurrency` feature),
216+
/// prefer [`run_concurrent`] (requires the `concurrency-tokio` feature),
217217
/// which honors managed concurrency and falls back to sequential behavior when
218218
/// unset.
219219
pub async fn run<'a, R, S, E>(handler: S) -> Result<(), Error>
@@ -226,18 +226,32 @@ where
226226
lambda_runtime::run(Adapter::from(handler)).await
227227
}
228228

229-
/// Starts the Lambda Rust runtime in a mode that is compatible with
230-
/// Lambda Managed Instances (concurrent invocations).
229+
/// Starts the Lambda Rust runtime and begins polling for events on the [Lambda
230+
/// Runtime APIs](https://docs.aws.amazon.com/lambda/latest/dg/runtimes-api.html).
231231
///
232-
/// Requires the `experimental-concurrency` feature.
232+
/// This takes care of transforming the LambdaEvent into a [`Request`] and then
233+
/// converting the result into a `LambdaResponse`.
234+
///
235+
/// # Managed concurrency
233236
///
234237
/// When `AWS_LAMBDA_MAX_CONCURRENCY` is set to a value greater than 1, this
235-
/// will spawn `AWS_LAMBDA_MAX_CONCURRENCY` worker tasks, each running its own
236-
/// `/next` polling loop. When the environment variable is unset or `<= 1`,
237-
/// it falls back to the same sequential behavior as [`run`], so the same
238-
/// handler can run on both classic Lambda and Lambda Managed Instances.
239-
#[cfg(feature = "experimental-concurrency")]
240-
#[cfg_attr(docsrs, doc(cfg(feature = "experimental-concurrency")))]
238+
/// function spawns multiple worker tasks to handle concurrent invocations.
239+
/// When the environment variable is unset or `<= 1`, it falls back to
240+
/// sequential behavior, so the same handler can run on both classic Lambda
241+
/// and Lambda Managed Instances.
242+
///
243+
/// This feature requires `Clone + Send` bounds on the handler. If your handler
244+
/// cannot satisfy these bounds, use `run()` for sequential processing.
245+
///
246+
/// # Panics
247+
///
248+
/// This function panics if:
249+
/// - Called outside of a Tokio runtime with `AWS_LAMBDA_MAX_CONCURRENCY > 1`
250+
/// - Required Lambda environment variables are missing (`AWS_LAMBDA_FUNCTION_NAME`,
251+
/// `AWS_LAMBDA_FUNCTION_MEMORY_SIZE`, `AWS_LAMBDA_FUNCTION_VERSION`,
252+
/// `AWS_LAMBDA_RUNTIME_API`)
253+
#[cfg(feature = "concurrency-tokio")]
254+
#[cfg_attr(docsrs, doc(cfg(feature = "concurrency-tokio")))]
241255
pub async fn run_concurrent<R, S, E>(handler: S) -> Result<(), Error>
242256
where
243257
S: Service<Request, Response = R, Error = E> + Clone + Send + 'static,

lambda-http/src/streaming.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -114,10 +114,10 @@ where
114114

115115
/// Builds a streaming-aware Tower service from a `Service<Request>` that can be
116116
/// cloned and sent across tasks. This is used by the concurrent HTTP entrypoint.
117-
#[cfg(feature = "experimental-concurrency")]
117+
#[cfg(feature = "concurrency-tokio")]
118118
type EventToRequest = fn(LambdaEvent<LambdaRequest>) -> Request;
119119

120-
#[cfg(feature = "experimental-concurrency")]
120+
#[cfg(feature = "concurrency-tokio")]
121121
#[allow(clippy::type_complexity)]
122122
fn into_stream_service_cloneable<S, B, E>(
123123
handler: S,
@@ -178,7 +178,7 @@ fn event_to_request(req: LambdaEvent<LambdaRequest>) -> Request {
178178
/// # Managed concurrency
179179
/// If `AWS_LAMBDA_MAX_CONCURRENCY` is set, this function returns an error because
180180
/// it does not enable concurrent polling. Use [`run_with_streaming_response_concurrent`]
181-
/// (requires the `experimental-concurrency` feature) instead.
181+
/// (requires the `concurrency-tokio` feature) instead.
182182
///
183183
/// [AWS docs for response streaming]:
184184
/// https://docs.aws.amazon.com/lambda/latest/dg/configuration-response-streaming.html
@@ -197,13 +197,13 @@ where
197197
/// Runs the Lambda runtime with a handler that returns **streaming** HTTP
198198
/// responses, in a mode that is compatible with Lambda Managed Instances.
199199
///
200-
/// Requires the `experimental-concurrency` feature.
200+
/// Requires the `concurrency-tokio` feature.
201201
///
202202
/// This uses a cloneable, boxed service internally so it can be driven by the
203203
/// concurrent runtime. When `AWS_LAMBDA_MAX_CONCURRENCY` is not set or `<= 1`,
204204
/// it falls back to the same sequential behavior as [`run_with_streaming_response`].
205-
#[cfg(feature = "experimental-concurrency")]
206-
#[cfg_attr(docsrs, doc(cfg(feature = "experimental-concurrency")))]
205+
#[cfg(feature = "concurrency-tokio")]
206+
#[cfg_attr(docsrs, doc(cfg(feature = "concurrency-tokio")))]
207207
pub async fn run_with_streaming_response_concurrent<S, B, E>(handler: S) -> Result<(), Error>
208208
where
209209
S: Service<Request, Response = Response<B>, Error = E> + Clone + Send + 'static,

lambda-runtime/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ miette = ["dep:miette"] # enables From<T> for Diagnostic for miette error types,
2525
# as well as default features
2626
# https://github.com/aws/aws-lambda-rust-runtime/issues/984
2727
graceful-shutdown = ["tokio/rt", "tokio/signal", "dep:lambda-extension"]
28-
experimental-concurrency = []
28+
concurrency-tokio = []
2929

3030
[dependencies]
3131
anyhow = { version = "1.0.86", optional = true }

lambda-runtime/src/lib.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ where
9797
/// # Managed concurrency
9898
/// If `AWS_LAMBDA_MAX_CONCURRENCY` is set, this function returns an error because
9999
/// it does not enable concurrent polling. If your handler can satisfy `Clone`,
100-
/// prefer [`run_concurrent`] (requires the `experimental-concurrency` feature),
100+
/// prefer [`run_concurrent`] (requires the `concurrency-tokio` feature),
101101
/// which honors managed concurrency and falls back to sequential behavior when
102102
/// unset.
103103
///
@@ -136,7 +136,7 @@ where
136136
/// Starts the Lambda Rust runtime in a mode that is compatible with
137137
/// Lambda Managed Instances (concurrent invocations).
138138
///
139-
/// Requires the `experimental-concurrency` feature.
139+
/// Requires the `concurrency-tokio` feature.
140140
///
141141
/// When `AWS_LAMBDA_MAX_CONCURRENCY` is set to a value greater than 1, this
142142
/// will spawn `AWS_LAMBDA_MAX_CONCURRENCY` worker tasks, each running its own
@@ -163,8 +163,8 @@ where
163163
/// Ok(event.payload)
164164
/// }
165165
/// ```
166-
#[cfg(feature = "experimental-concurrency")]
167-
#[cfg_attr(docsrs, doc(cfg(feature = "experimental-concurrency")))]
166+
#[cfg(feature = "concurrency-tokio")]
167+
#[cfg_attr(docsrs, doc(cfg(feature = "concurrency-tokio")))]
168168
pub async fn run_concurrent<A, F, R, B, S, D, E>(handler: F) -> Result<(), Error>
169169
where
170170
F: Service<LambdaEvent<A>, Response = R> + Clone + Send + 'static,

lambda-runtime/src/runtime.rs

Lines changed: 45 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,18 @@ use crate::{
44
types::{invoke_request_id, IntoFunctionResponse, LambdaEvent},
55
Config, Context, Diagnostic,
66
};
7-
#[cfg(feature = "experimental-concurrency")]
7+
#[cfg(feature = "concurrency-tokio")]
88
use futures::stream::FuturesUnordered;
99
use http_body_util::BodyExt;
1010
use lambda_runtime_api_client::{BoxError, Client as ApiClient};
1111
use serde::{Deserialize, Serialize};
12-
#[cfg(feature = "experimental-concurrency")]
12+
#[cfg(feature = "concurrency-tokio")]
1313
use std::fmt;
1414
use std::{env, fmt::Debug, future::Future, io, sync::Arc};
1515
use tokio_stream::{Stream, StreamExt};
1616
use tower::{Layer, Service, ServiceExt};
1717
use tracing::trace;
18-
#[cfg(feature = "experimental-concurrency")]
18+
#[cfg(feature = "concurrency-tokio")]
1919
use tracing::{debug, error, info_span, warn, Instrument};
2020

2121
/* ----------------------------------------- INVOCATION ---------------------------------------- */
@@ -96,6 +96,13 @@ where
9696
/// Note that manually creating a [Runtime] does not add tracing to the executed handler
9797
/// as is done by [super::run]. If you want to add the default tracing functionality, call
9898
/// [Runtime::layer] with a [super::layers::TracingLayer].
99+
///
100+
///
101+
/// # Panics
102+
///
103+
/// This function panics if required Lambda environment variables are missing
104+
/// (`AWS_LAMBDA_FUNCTION_NAME`, `AWS_LAMBDA_FUNCTION_MEMORY_SIZE`,
105+
/// `AWS_LAMBDA_FUNCTION_VERSION`, `AWS_LAMBDA_RUNTIME_API`).
99106
pub fn new(handler: F) -> Self {
100107
trace!("Loading config from env");
101108
let config = Arc::new(Config::from_env());
@@ -154,18 +161,28 @@ impl<S> Runtime<S> {
154161
}
155162
}
156163

157-
#[cfg(feature = "experimental-concurrency")]
164+
#[cfg(feature = "concurrency-tokio")]
158165
impl<S> Runtime<S>
159166
where
160167
S: Service<LambdaInvocation, Response = (), Error = BoxError> + Clone + Send + 'static,
161168
S::Future: Send,
162169
{
163-
/// Start the runtime in concurrent mode when configured for Lambda managed-concurrency.
170+
/// Start the runtime and begin polling for events on the Lambda Runtime API, in a mode
171+
/// that supports Lambda-managed concurrency.
172+
///
173+
/// When `AWS_LAMBDA_MAX_CONCURRENCY` is set to a value greater than 1, this
174+
/// spawns multiple worker tasks to handle concurrent invocations. When the
175+
/// environment variable is unset or `<= 1`, it falls back to sequential
176+
/// behavior, so the same handler can run on both classic Lambda and Lambda
177+
/// Managed Instances.
164178
///
165-
/// If `AWS_LAMBDA_MAX_CONCURRENCY` is not set or is `<= 1`, this falls back to the
166-
/// sequential `run_with_incoming` loop so that the same handler can run on both
167-
/// classic Lambda and Lambda Managed Instances.
168-
#[cfg_attr(docsrs, doc(cfg(feature = "experimental-concurrency")))]
179+
/// This feature requires `Clone + Send + 'static` bounds on the service. If your handler
180+
/// cannot satisfy these bounds, use [Runtime::run] instead.
181+
///
182+
/// # Panics
183+
///
184+
/// This function panics if called outside of a Tokio runtime with `AWS_LAMBDA_MAX_CONCURRENCY > 1`
185+
#[cfg_attr(docsrs, doc(cfg(feature = "concurrency-tokio")))]
169186
pub async fn run_concurrent(self) -> Result<(), BoxError> {
170187
if self.concurrency_limit > 1 {
171188
trace!("Concurrent mode: _X_AMZN_TRACE_ID is not set; use context.xray_trace_id");
@@ -259,20 +276,20 @@ where
259276
}
260277
}
261278

262-
#[cfg(feature = "experimental-concurrency")]
279+
#[cfg(feature = "concurrency-tokio")]
263280
#[derive(Debug)]
264281
enum WorkerError {
265282
CleanExit(tokio::task::Id),
266283
Failure(tokio::task::Id, BoxError),
267284
}
268285

269-
#[cfg(feature = "experimental-concurrency")]
286+
#[cfg(feature = "concurrency-tokio")]
270287
#[derive(Debug)]
271288
struct ConcurrentWorkerErrors {
272289
errors: Vec<WorkerError>,
273290
}
274291

275-
#[cfg(feature = "experimental-concurrency")]
292+
#[cfg(feature = "concurrency-tokio")]
276293
#[derive(Serialize)]
277294
struct ConcurrentWorkerErrorsPayload<'a> {
278295
message: &'a str,
@@ -282,14 +299,14 @@ struct ConcurrentWorkerErrorsPayload<'a> {
282299
failures: Vec<WorkerFailurePayload>,
283300
}
284301

285-
#[cfg(feature = "experimental-concurrency")]
302+
#[cfg(feature = "concurrency-tokio")]
286303
#[derive(Serialize)]
287304
struct WorkerFailurePayload {
288305
id: String,
289306
err: String,
290307
}
291308

292-
#[cfg(feature = "experimental-concurrency")]
309+
#[cfg(feature = "concurrency-tokio")]
293310
impl fmt::Display for ConcurrentWorkerErrors {
294311
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
295312
let mut clean = Vec::new();
@@ -326,7 +343,7 @@ impl fmt::Display for ConcurrentWorkerErrors {
326343
}
327344
}
328345

329-
#[cfg(feature = "experimental-concurrency")]
346+
#[cfg(feature = "concurrency-tokio")]
330347
impl std::error::Error for ConcurrentWorkerErrors {}
331348

332349
impl<S> Runtime<S>
@@ -335,14 +352,17 @@ where
335352
{
336353
/// Start the runtime and begin polling for events on the Lambda Runtime API.
337354
///
338-
/// If `AWS_LAMBDA_MAX_CONCURRENCY` is set, this returns an error because it does not enable
339-
/// concurrent polling. Enable the `experimental-concurrency` feature and use
340-
/// [`Runtime::run_concurrent`] instead.
355+
/// The runtime will process requests sequentially.
356+
///
357+
/// If `AWS_LAMBDA_MAX_CONCURRENCY` is set, a warning is logged.
358+
/// Enable the `concurrency-tokio` feature and use [`Runtime::run_concurrent`]
359+
/// to enable concurrent request processing with Lambda Managed Instances.
341360
pub async fn run(self) -> Result<(), BoxError> {
342361
if let Some(raw) = concurrency_env_value() {
343-
return Err(Box::new(io::Error::other(format!(
344-
"AWS_LAMBDA_MAX_CONCURRENCY is set to '{raw}', but Runtime::run does not support concurrent polling; enable the experimental-concurrency feature and use Runtime::run_concurrent instead"
345-
))));
362+
tracing::warn!(
363+
"AWS_LAMBDA_MAX_CONCURRENCY is set to '{}', but the concurrency-tokio feature is not enabled; running sequentially",
364+
raw
365+
);
346366
}
347367
let incoming = incoming(&self.client);
348368
Self::run_with_incoming(self.service, self.config, incoming).await
@@ -412,7 +432,7 @@ fn incoming(
412432
}
413433

414434
/// Creates a future that polls the `/next` endpoint.
415-
#[cfg(feature = "experimental-concurrency")]
435+
#[cfg(feature = "concurrency-tokio")]
416436
async fn next_event_future(client: &ApiClient) -> Result<http::Response<hyper::body::Incoming>, BoxError> {
417437
let req = NextEventRequest.into_req()?;
418438
client.call(req).await
@@ -429,7 +449,7 @@ fn concurrency_env_value() -> Option<String> {
429449
env::var("AWS_LAMBDA_MAX_CONCURRENCY").ok()
430450
}
431451

432-
#[cfg(feature = "experimental-concurrency")]
452+
#[cfg(feature = "concurrency-tokio")]
433453
async fn concurrent_worker_loop<S>(mut service: S, config: Arc<Config>, client: Arc<ApiClient>) -> Result<(), BoxError>
434454
where
435455
S: Service<LambdaInvocation, Response = (), Error = BoxError>,
@@ -760,7 +780,7 @@ mod endpoint_tests {
760780
.await
761781
}
762782

763-
#[cfg(feature = "experimental-concurrency")]
783+
#[cfg(feature = "concurrency-tokio")]
764784
#[tokio::test]
765785
async fn concurrent_worker_crash_does_not_stop_other_workers() -> Result<(), Error> {
766786
let next_calls = Arc::new(AtomicUsize::new(0));
@@ -910,7 +930,7 @@ mod endpoint_tests {
910930
}
911931

912932
#[tokio::test]
913-
#[cfg(feature = "experimental-concurrency")]
933+
#[cfg(feature = "concurrency-tokio")]
914934
async fn test_concurrent_structured_logging_isolation() -> Result<(), Error> {
915935
use std::collections::HashSet;
916936
use tracing::info;

0 commit comments

Comments
 (0)