Skip to content

Commit 8ed50a3

Browse files
feat: gate concurrent APIs behind experimental feature
- add experimental-concurrency feature in lambda-runtime and lambda-http - cfg-gate concurrent entrypoints and streaming helpers - keep run() guard and X-Ray header source on default path (partial gating) - fix default clippy by gating concurrent-only imports - update docs and concurrent example dependency
1 parent e2ec3b0 commit 8ed50a3

7 files changed

Lines changed: 54 additions & 19 deletions

File tree

examples/basic-lambda-concurrent/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,6 @@ version = "0.1.0"
44
edition = "2021"
55

66
[dependencies]
7-
lambda_runtime = { path = "../../lambda-runtime" }
7+
lambda_runtime = { path = "../../lambda-runtime", features = ["experimental-concurrency"] }
88
serde = "1.0.219"
99
tokio = { version = "1", features = ["macros"] }

lambda-http/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ opentelemetry = ["lambda_runtime/opentelemetry"] # enables access to the OpenTel
2929
anyhow = ["lambda_runtime/anyhow"] # enables From<T> for Diagnostic for anyhow error types, see README.md for more info
3030
eyre = ["lambda_runtime/eyre"] # enables From<T> for Diagnostic for eyre error types, see README.md for more info
3131
miette = ["lambda_runtime/miette"] # enables From<T> for Diagnostic for miette error types, see README.md for more info
32+
experimental-concurrency = ["lambda_runtime/experimental-concurrency"]
3233

3334
[dependencies]
3435
bytes = { workspace = true }

lambda-http/src/lib.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,9 @@ use std::{
102102
};
103103

104104
mod streaming;
105-
pub use streaming::{run_with_streaming_response, run_with_streaming_response_concurrent, StreamAdapter};
105+
#[cfg(feature = "experimental-concurrency")]
106+
pub use streaming::run_with_streaming_response_concurrent;
107+
pub use streaming::{run_with_streaming_response, StreamAdapter};
106108

107109
/// Type alias for `http::Request`s with a fixed [`Body`](enum.Body.html) type
108110
pub type Request = http::Request<Body>;
@@ -195,7 +197,7 @@ where
195197
let LambdaEvent { payload, context } = req;
196198
let request_origin = payload.request_origin();
197199
let mut event: Request = payload.into();
198-
update_xray_trace_id_header_from_context(event.headers_mut(), &context);
200+
update_xray_trace_id_header(event.headers_mut(), &context);
199201
let fut = Box::pin(self.service.call(event.with_lambda_context(context)));
200202

201203
TransformResponse::Request(request_origin, fut)
@@ -211,8 +213,9 @@ where
211213
/// # Managed concurrency
212214
/// If `AWS_LAMBDA_MAX_CONCURRENCY` is set, this function returns an error because
213215
/// it does not enable concurrent polling. If your handler can satisfy `Clone`,
214-
/// prefer [`run_concurrent`], which honors managed concurrency and falls back to
215-
/// sequential behavior when unset.
216+
/// prefer [`run_concurrent`] (requires the `experimental-concurrency` feature),
217+
/// which honors managed concurrency and falls back to sequential behavior when
218+
/// unset.
216219
pub async fn run<'a, R, S, E>(handler: S) -> Result<(), Error>
217220
where
218221
S: Service<Request, Response = R, Error = E>,
@@ -226,11 +229,15 @@ where
226229
/// Starts the Lambda Rust runtime in a mode that is compatible with
227230
/// Lambda Managed Instances (concurrent invocations).
228231
///
232+
/// Requires the `experimental-concurrency` feature.
233+
///
229234
/// When `AWS_LAMBDA_MAX_CONCURRENCY` is set to a value greater than 1, this
230235
/// will spawn `AWS_LAMBDA_MAX_CONCURRENCY` worker tasks, each running its own
231236
/// `/next` polling loop. When the environment variable is unset or `<= 1`,
232237
/// it falls back to the same sequential behavior as [`run`], so the same
233238
/// handler can run on both classic Lambda and Lambda Managed Instances.
239+
#[cfg(feature = "experimental-concurrency")]
240+
#[cfg_attr(docsrs, doc(cfg(feature = "experimental-concurrency")))]
234241
pub async fn run_concurrent<R, S, E>(handler: S) -> Result<(), Error>
235242
where
236243
S: Service<Request, Response = R, Error = E> + Clone + Send + 'static,
@@ -241,8 +248,8 @@ where
241248
lambda_runtime::run_concurrent(Adapter::from(handler)).await
242249
}
243250

244-
// Replaces update_xray_trace_id_header (env var), now set from Context
245-
fn update_xray_trace_id_header_from_context(headers: &mut http::HeaderMap, context: &Context) {
251+
// In concurrent mode we must use the per-request context.
252+
fn update_xray_trace_id_header(headers: &mut http::HeaderMap, context: &Context) {
246253
if let Some(trace_id) = context.xray_trace_id.as_deref() {
247254
if let Ok(header_value) = http::HeaderValue::from_str(trace_id) {
248255
headers.insert(http::header::HeaderName::from_static("x-amzn-trace-id"), header_value);

lambda-http/src/streaming.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
use crate::{
2-
http::header::SET_COOKIE, request::LambdaRequest, update_xray_trace_id_header_from_context, Request, RequestExt,
3-
};
1+
use crate::{http::header::SET_COOKIE, request::LambdaRequest, update_xray_trace_id_header, Request, RequestExt};
42
use bytes::Bytes;
53
use core::{
64
fmt::Debug,
@@ -76,7 +74,7 @@ where
7674
fn call(&mut self, req: LambdaEvent<LambdaRequest>) -> Self::Future {
7775
let LambdaEvent { payload, context } = req;
7876
let mut event: Request = payload.into();
79-
update_xray_trace_id_header_from_context(event.headers_mut(), &context);
77+
update_xray_trace_id_header(event.headers_mut(), &context);
8078
Box::pin(
8179
self.service
8280
.call(event.with_lambda_context(context))
@@ -116,8 +114,10 @@ where
116114

117115
/// Builds a streaming-aware Tower service from a `Service<Request>` that can be
118116
/// cloned and sent across tasks. This is used by the concurrent HTTP entrypoint.
117+
#[cfg(feature = "experimental-concurrency")]
119118
type EventToRequest = fn(LambdaEvent<LambdaRequest>) -> Request;
120119

120+
#[cfg(feature = "experimental-concurrency")]
121121
#[allow(clippy::type_complexity)]
122122
fn into_stream_service_cloneable<S, B, E>(
123123
handler: S,
@@ -166,7 +166,7 @@ where
166166
fn event_to_request(req: LambdaEvent<LambdaRequest>) -> Request {
167167
let LambdaEvent { payload, context } = req;
168168
let mut event: Request = payload.into();
169-
update_xray_trace_id_header_from_context(event.headers_mut(), &context);
169+
update_xray_trace_id_header(event.headers_mut(), &context);
170170
event.with_lambda_context(context)
171171
}
172172

@@ -178,7 +178,7 @@ fn event_to_request(req: LambdaEvent<LambdaRequest>) -> Request {
178178
/// # Managed concurrency
179179
/// If `AWS_LAMBDA_MAX_CONCURRENCY` is set, this function returns an error because
180180
/// it does not enable concurrent polling. Use [`run_with_streaming_response_concurrent`]
181-
/// instead.
181+
/// (requires the `experimental-concurrency` feature) instead.
182182
///
183183
/// [AWS docs for response streaming]:
184184
/// https://docs.aws.amazon.com/lambda/latest/dg/configuration-response-streaming.html
@@ -197,9 +197,13 @@ where
197197
/// Runs the Lambda runtime with a handler that returns **streaming** HTTP
198198
/// responses, in a mode that is compatible with Lambda Managed Instances.
199199
///
200+
/// Requires the `experimental-concurrency` feature.
201+
///
200202
/// This uses a cloneable, boxed service internally so it can be driven by the
201203
/// concurrent runtime. When `AWS_LAMBDA_MAX_CONCURRENCY` is not set or `<= 1`,
202204
/// it falls back to the same sequential behavior as [`run_with_streaming_response`].
205+
#[cfg(feature = "experimental-concurrency")]
206+
#[cfg_attr(docsrs, doc(cfg(feature = "experimental-concurrency")))]
203207
pub async fn run_with_streaming_response_concurrent<S, B, E>(handler: S) -> Result<(), Error>
204208
where
205209
S: Service<Request, Response = Response<B>, Error = E> + Clone + Send + 'static,

lambda-runtime/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ miette = ["dep:miette"] # enables From<T> for Diagnostic for miette error types,
2525
# as well as default features
2626
# https://github.com/aws/aws-lambda-rust-runtime/issues/984
2727
graceful-shutdown = ["tokio/rt", "tokio/signal", "dep:lambda-extension"]
28+
experimental-concurrency = []
2829

2930
[dependencies]
3031
anyhow = { version = "1.0.86", optional = true }

lambda-runtime/src/lib.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,9 @@ where
9797
/// # Managed concurrency
9898
/// If `AWS_LAMBDA_MAX_CONCURRENCY` is set, this function returns an error because
9999
/// it does not enable concurrent polling. If your handler can satisfy `Clone`,
100-
/// prefer [`run_concurrent`], which honors managed concurrency and falls back to
101-
/// sequential behavior when unset.
100+
/// prefer [`run_concurrent`] (requires the `experimental-concurrency` feature),
101+
/// which honors managed concurrency and falls back to sequential behavior when
102+
/// unset.
102103
///
103104
/// # Example
104105
/// ```no_run
@@ -135,6 +136,8 @@ where
135136
/// Starts the Lambda Rust runtime in a mode that is compatible with
136137
/// Lambda Managed Instances (concurrent invocations).
137138
///
139+
/// Requires the `experimental-concurrency` feature.
140+
///
138141
/// When `AWS_LAMBDA_MAX_CONCURRENCY` is set to a value greater than 1, this
139142
/// will spawn `AWS_LAMBDA_MAX_CONCURRENCY` worker tasks, each running its own
140143
/// `/next` polling loop. When the environment variable is unset or `<= 1`, it
@@ -160,6 +163,8 @@ where
160163
/// Ok(event.payload)
161164
/// }
162165
/// ```
166+
#[cfg(feature = "experimental-concurrency")]
167+
#[cfg_attr(docsrs, doc(cfg(feature = "experimental-concurrency")))]
163168
pub async fn run_concurrent<A, F, R, B, S, D, E>(handler: F) -> Result<(), Error>
164169
where
165170
F: Service<LambdaEvent<A>, Response = R> + Clone + Send + 'static,

lambda-runtime/src/runtime.rs

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,19 @@ use crate::{
44
types::{invoke_request_id, IntoFunctionResponse, LambdaEvent},
55
Config, Context, Diagnostic,
66
};
7+
#[cfg(feature = "experimental-concurrency")]
78
use futures::stream::FuturesUnordered;
89
use http_body_util::BodyExt;
910
use lambda_runtime_api_client::{BoxError, Client as ApiClient};
1011
use serde::{Deserialize, Serialize};
11-
use std::{env, fmt, fmt::Debug, future::Future, io, sync::Arc};
12+
#[cfg(feature = "experimental-concurrency")]
13+
use std::fmt;
14+
use std::{env, fmt::Debug, future::Future, io, sync::Arc};
1215
use tokio_stream::{Stream, StreamExt};
1316
use tower::{Layer, Service, ServiceExt};
14-
use tracing::{debug, error, info_span, trace, warn, Instrument};
17+
use tracing::trace;
18+
#[cfg(feature = "experimental-concurrency")]
19+
use tracing::{debug, error, info_span, warn, Instrument};
1520

1621
/* ----------------------------------------- INVOCATION ---------------------------------------- */
1722

@@ -149,6 +154,7 @@ impl<S> Runtime<S> {
149154
}
150155
}
151156

157+
#[cfg(feature = "experimental-concurrency")]
152158
impl<S> Runtime<S>
153159
where
154160
S: Service<LambdaInvocation, Response = (), Error = BoxError> + Clone + Send + 'static,
@@ -159,6 +165,7 @@ where
159165
/// If `AWS_LAMBDA_MAX_CONCURRENCY` is not set or is `<= 1`, this falls back to the
160166
/// sequential `run_with_incoming` loop so that the same handler can run on both
161167
/// classic Lambda and Lambda Managed Instances.
168+
#[cfg_attr(docsrs, doc(cfg(feature = "experimental-concurrency")))]
162169
pub async fn run_concurrent(self) -> Result<(), BoxError> {
163170
if self.concurrency_limit > 1 {
164171
trace!("Concurrent mode: _X_AMZN_TRACE_ID is not set; use context.xray_trace_id");
@@ -252,17 +259,20 @@ where
252259
}
253260
}
254261

262+
#[cfg(feature = "experimental-concurrency")]
255263
#[derive(Debug)]
256264
enum WorkerError {
257265
CleanExit(tokio::task::Id),
258266
Failure(tokio::task::Id, BoxError),
259267
}
260268

269+
#[cfg(feature = "experimental-concurrency")]
261270
#[derive(Debug)]
262271
struct ConcurrentWorkerErrors {
263272
errors: Vec<WorkerError>,
264273
}
265274

275+
#[cfg(feature = "experimental-concurrency")]
266276
#[derive(Serialize)]
267277
struct ConcurrentWorkerErrorsPayload<'a> {
268278
message: &'a str,
@@ -272,12 +282,14 @@ struct ConcurrentWorkerErrorsPayload<'a> {
272282
failures: Vec<WorkerFailurePayload>,
273283
}
274284

285+
#[cfg(feature = "experimental-concurrency")]
275286
#[derive(Serialize)]
276287
struct WorkerFailurePayload {
277288
id: String,
278289
err: String,
279290
}
280291

292+
#[cfg(feature = "experimental-concurrency")]
281293
impl fmt::Display for ConcurrentWorkerErrors {
282294
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
283295
let mut clean = Vec::new();
@@ -314,6 +326,7 @@ impl fmt::Display for ConcurrentWorkerErrors {
314326
}
315327
}
316328

329+
#[cfg(feature = "experimental-concurrency")]
317330
impl std::error::Error for ConcurrentWorkerErrors {}
318331

319332
impl<S> Runtime<S>
@@ -323,11 +336,12 @@ where
323336
/// Start the runtime and begin polling for events on the Lambda Runtime API.
324337
///
325338
/// If `AWS_LAMBDA_MAX_CONCURRENCY` is set, this returns an error because it does not enable
326-
/// concurrent polling. Use [`Runtime::run_concurrent`] instead.
339+
/// concurrent polling. Enable the `experimental-concurrency` feature and use
340+
/// [`Runtime::run_concurrent`] instead.
327341
pub async fn run(self) -> Result<(), BoxError> {
328342
if let Some(raw) = concurrency_env_value() {
329343
return Err(Box::new(io::Error::other(format!(
330-
"AWS_LAMBDA_MAX_CONCURRENCY is set to '{raw}', but Runtime::run does not support concurrent polling; use Runtime::run_concurrent instead"
344+
"AWS_LAMBDA_MAX_CONCURRENCY is set to '{raw}', but Runtime::run does not support concurrent polling; enable the experimental-concurrency feature and use Runtime::run_concurrent instead"
331345
))));
332346
}
333347
let incoming = incoming(&self.client);
@@ -398,6 +412,7 @@ fn incoming(
398412
}
399413

400414
/// Creates a future that polls the `/next` endpoint.
415+
#[cfg(feature = "experimental-concurrency")]
401416
async fn next_event_future(client: &ApiClient) -> Result<http::Response<hyper::body::Incoming>, BoxError> {
402417
let req = NextEventRequest.into_req()?;
403418
client.call(req).await
@@ -414,6 +429,7 @@ fn concurrency_env_value() -> Option<String> {
414429
env::var("AWS_LAMBDA_MAX_CONCURRENCY").ok()
415430
}
416431

432+
#[cfg(feature = "experimental-concurrency")]
417433
async fn concurrent_worker_loop<S>(mut service: S, config: Arc<Config>, client: Arc<ApiClient>) -> Result<(), BoxError>
418434
where
419435
S: Service<LambdaInvocation, Response = (), Error = BoxError>,
@@ -744,6 +760,7 @@ mod endpoint_tests {
744760
.await
745761
}
746762

763+
#[cfg(feature = "experimental-concurrency")]
747764
#[tokio::test]
748765
async fn concurrent_worker_crash_does_not_stop_other_workers() -> Result<(), Error> {
749766
let next_calls = Arc::new(AtomicUsize::new(0));

0 commit comments

Comments
 (0)