Skip to content

Commit 3ddc948

Browse files
runtime: improve concurrent worker supervision
- aggregate worker exits with task IDs and surface clean exits in errors - add per-worker spans and log unexpected exits at error level - guard run() when AWS_LAMBDA_MAX_CONCURRENCY is set; docs prefer run_concurrent - make streaming adapter cloneable for concurrent HTTP path - document pool size guidance and LMI test target; log example via tracing
1 parent 949c002 commit 3ddc948

7 files changed

Lines changed: 164 additions & 58 deletions

File tree

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,12 @@ make test-rie EXAMPLE=basic-sqs
405405
make test-rie EXAMPLE=http-basic-lambda
406406
```
407407

408+
To test Lambda Managed Instances (concurrent polling), use:
409+
410+
```bash
411+
make test-rie-lmi EXAMPLE=basic-lambda-concurrent
412+
```
413+
408414
This command will:
409415
1. Build a Docker image with Rust toolchain and RIE
410416
2. Compile the specified example inside the Linux container

examples/basic-lambda-concurrent/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ async fn main() -> Result<(), Error> {
2929

3030
let func = service_fn(my_handler);
3131
if let Err(err) = lambda_runtime::run_concurrent(func).await {
32-
eprintln!("run error: {:?}", err);
32+
tracing::error!(error = %err, "run error");
3333
return Err(err);
3434
}
3535
Ok(())

lambda-http/src/lib.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,12 @@ where
207207
///
208208
/// This takes care of transforming the LambdaEvent into a [`Request`] and then
209209
/// converting the result into a `LambdaResponse`.
210+
///
211+
/// # Managed concurrency
212+
/// If `AWS_LAMBDA_MAX_CONCURRENCY` is set, this function returns an error because
213+
/// it does not enable concurrent polling. If your handler can satisfy `Clone`,
214+
/// prefer [`run_concurrent`], which honors managed concurrency and falls back to
215+
/// sequential behavior when unset.
210216
pub async fn run<'a, R, S, E>(handler: S) -> Result<(), Error>
211217
where
212218
S: Service<Request, Response = R, Error = E>,

lambda-http/src/streaming.rs

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ pub use http::{self, Response};
1212
use http_body::Body;
1313
use lambda_runtime::{
1414
tower::{
15-
util::{BoxCloneService, MapRequest, MapResponse},
15+
util::{MapRequest, MapResponse},
1616
ServiceBuilder, ServiceExt,
1717
},
1818
Diagnostic,
@@ -29,6 +29,18 @@ pub struct StreamAdapter<'a, S, B> {
2929
_phantom_data: PhantomData<&'a B>,
3030
}
3131

32+
impl<'a, S, B> Clone for StreamAdapter<'a, S, B>
33+
where
34+
S: Clone,
35+
{
36+
fn clone(&self) -> Self {
37+
Self {
38+
service: self.service.clone(),
39+
_phantom_data: PhantomData,
40+
}
41+
}
42+
}
43+
3244
impl<'a, S, B, E> From<S> for StreamAdapter<'a, S, B>
3345
where
3446
S: Service<Request, Response = Response<B>, Error = E>,
@@ -104,10 +116,12 @@ where
104116

105117
/// Builds a streaming-aware Tower service from a `Service<Request>` that can be
106118
/// cloned and sent across tasks. This is used by the concurrent HTTP entrypoint.
119+
type EventToRequest = fn(LambdaEvent<LambdaRequest>) -> Request;
120+
107121
#[allow(clippy::type_complexity)]
108-
fn into_stream_service_boxed<S, B, E>(
122+
fn into_stream_service_cloneable<S, B, E>(
109123
handler: S,
110-
) -> BoxCloneService<LambdaEvent<LambdaRequest>, StreamResponse<BodyStream<B>>, E>
124+
) -> MapResponse<MapRequest<S, EventToRequest>, impl FnOnce(Response<B>) -> StreamResponse<BodyStream<B>> + Clone>
111125
where
112126
S: Service<Request, Response = Response<B>, Error = E> + Clone + Send + 'static,
113127
S::Future: Send + 'static,
@@ -116,12 +130,10 @@ where
116130
B::Data: Into<Bytes> + Send,
117131
B::Error: Into<Error> + Send + Debug,
118132
{
119-
let svc = ServiceBuilder::new()
120-
.map_request(event_to_request as fn(LambdaEvent<LambdaRequest>) -> Request)
133+
ServiceBuilder::new()
134+
.map_request(event_to_request as EventToRequest)
121135
.service(handler)
122-
.map_response(into_stream_response);
123-
124-
BoxCloneService::new(svc)
136+
.map_response(into_stream_response)
125137
}
126138

127139
/// Converts an `http::Response<B>` into a streaming Lambda response.
@@ -163,6 +175,11 @@ fn event_to_request(req: LambdaEvent<LambdaRequest>) -> Request {
163175
///
164176
/// See the [AWS docs for response streaming].
165177
///
178+
/// # Managed concurrency
179+
/// If `AWS_LAMBDA_MAX_CONCURRENCY` is set, this function returns an error because
180+
/// it does not enable concurrent polling. Use [`run_with_streaming_response_concurrent`]
181+
/// instead.
182+
///
166183
/// [AWS docs for response streaming]:
167184
/// https://docs.aws.amazon.com/lambda/latest/dg/configuration-response-streaming.html
168185
pub async fn run_with_streaming_response<'a, S, B, E>(handler: S) -> Result<(), Error>
@@ -192,7 +209,7 @@ where
192209
B::Data: Into<Bytes> + Send,
193210
B::Error: Into<Error> + Send + Debug,
194211
{
195-
lambda_runtime::run_concurrent(into_stream_service_boxed(handler)).await
212+
lambda_runtime::run_concurrent(into_stream_service_cloneable(handler)).await
196213
}
197214

198215
pin_project_lite::pin_project! {

lambda-runtime-api-client/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,10 @@ impl ClientBuilder {
120120
}
121121

122122
/// Provide a pool size hint for the underlying Hyper client.
123+
///
124+
/// When using concurrent polling, this should be at least the maximum
125+
/// concurrency (e.g., `AWS_LAMBDA_MAX_CONCURRENCY`) to avoid connection
126+
/// starvation.
123127
pub fn with_pool_size(self, pool_size: usize) -> Self {
124128
Self {
125129
pool_size: Some(pool_size),

lambda-runtime/src/lib.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,12 @@ where
9494
/// If you need more control over the runtime and add custom middleware, use the
9595
/// [Runtime] type directly.
9696
///
97+
/// # Managed concurrency
98+
/// If `AWS_LAMBDA_MAX_CONCURRENCY` is set, this function returns an error because
99+
/// it does not enable concurrent polling. If your handler can satisfy `Clone`,
100+
/// prefer [`run_concurrent`], which honors managed concurrency and falls back to
101+
/// sequential behavior when unset.
102+
///
97103
/// # Example
98104
/// ```no_run
99105
/// use lambda_runtime::{Error, service_fn, LambdaEvent};

lambda-runtime/src/runtime.rs

Lines changed: 115 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,14 @@ use crate::{
44
types::{invoke_request_id, IntoFunctionResponse, LambdaEvent},
55
Config, Context, Diagnostic,
66
};
7-
use futures::stream::FuturesUnordered;
7+
use futures::{future::BoxFuture, stream::FuturesUnordered};
88
use http_body_util::BodyExt;
99
use lambda_runtime_api_client::{BoxError, Client as ApiClient};
1010
use serde::{Deserialize, Serialize};
11-
use std::{
12-
env,
13-
fmt::Debug,
14-
future::Future,
15-
io,
16-
sync::{Arc, OnceLock},
17-
};
11+
use std::{env, fmt, fmt::Debug, future::Future, io, sync::Arc};
1812
use tokio_stream::{Stream, StreamExt};
1913
use tower::{Layer, Service, ServiceExt};
20-
use tracing::{error, trace, warn};
14+
use tracing::{error, info_span, trace, warn, Instrument};
2115

2216
/* ----------------------------------------- INVOCATION ---------------------------------------- */
2317

@@ -63,9 +57,6 @@ pub struct Runtime<S> {
6357
concurrency_limit: u32,
6458
}
6559

66-
/// One-time marker to log X-Ray behavior in concurrent mode.
67-
static XRAY_LOGGED: OnceLock<()> = OnceLock::new();
68-
6960
impl<F, EventPayload, Response, BufferedResponse, StreamingResponse, StreamItem, StreamError>
7061
Runtime<
7162
RuntimeApiClientService<
@@ -170,6 +161,7 @@ where
170161
/// classic Lambda and Lambda Managed Instances.
171162
pub async fn run_concurrent(self) -> Result<(), BoxError> {
172163
if self.concurrency_limit > 1 {
164+
trace!("Concurrent mode: _X_AMZN_TRACE_ID is not set; use context.xray_trace_id");
173165
Self::run_concurrent_inner(self.service, self.config, self.client, self.concurrency_limit).await
174166
} else {
175167
let incoming = incoming(&self.client);
@@ -186,72 +178,144 @@ where
186178
) -> Result<(), BoxError> {
187179
let limit = concurrency_limit as usize;
188180

189-
let mut workers = FuturesUnordered::new();
181+
// Use FuturesUnordered so we can observe worker exits as they happen,
182+
// rather than waiting for all workers to finish (join_all).
183+
let mut workers: FuturesUnordered<WorkerJoinFuture> = FuturesUnordered::new();
184+
let spawn_worker = |service: S, config: Arc<Config>, client: Arc<ApiClient>| -> WorkerJoinFuture {
185+
let handle = tokio::spawn(concurrent_worker_loop(service, config, client));
186+
let task_id = handle.id();
187+
Box::pin(async move { (task_id, handle.await) })
188+
};
189+
// Spawn one worker per concurrency slot; the last uses the owned service to avoid an extra clone.
190190
for _ in 1..limit {
191-
workers.push(tokio::spawn(concurrent_worker_loop(
192-
service.clone(),
193-
config.clone(),
194-
client.clone(),
195-
)));
191+
workers.push(spawn_worker(service.clone(), config.clone(), client.clone()));
196192
}
197-
workers.push(tokio::spawn(concurrent_worker_loop(service, config, client)));
193+
workers.push(spawn_worker(service, config, client));
198194

199-
// Track the first infrastructure error. A single worker failing should
200-
// not terminate the whole runtime (LMI keeps running with the remaining
195+
// Track worker exits across tasks. A single worker failing should not
196+
// terminate the whole runtime (LMI keeps running with the remaining
201197
// healthy workers). We only return an error once there are no workers
202198
// left (i.e., we cannot keep at least 1 worker alive).
203199
//
204200
// Note: Handler errors (Err returned from user code) do NOT trigger this.
205201
// They are reported to Lambda via /invocation/{id}/error and the worker
206202
// continues. This only captures unrecoverable runtime failures like
207203
// API client failures, runtime panics, etc.
208-
let mut first_error: Option<BoxError> = None;
204+
let mut errors: Vec<WorkerError> = Vec::new();
209205
let mut remaining_workers = limit;
210-
while let Some(result) = futures::StreamExt::next(&mut workers).await {
206+
while let Some((task_id, result)) = futures::StreamExt::next(&mut workers).await {
211207
remaining_workers = remaining_workers.saturating_sub(1);
212208
match result {
213209
Ok(Ok(())) => {
214210
// `concurrent_worker_loop` runs indefinitely, so an Ok return indicates
215211
// an unexpected worker exit; we still decrement because the task is gone.
216-
warn!(
212+
let clean_exit_msg = "Concurrent worker exited cleanly (unexpected - loop should run forever)";
213+
error!(
214+
task_id = %task_id,
217215
remaining_workers,
218-
"Concurrent worker exited cleanly (unexpected - loop should run forever)"
216+
"{}",
217+
clean_exit_msg
219218
);
220-
if first_error.is_none() {
221-
first_error = Some(Box::new(io::Error::other(
222-
"all concurrent workers exited cleanly (unexpected - loop should run forever)",
223-
)));
224-
}
219+
errors.push(WorkerError::CleanExit(task_id));
225220
}
226221
Ok(Err(err)) => {
227-
error!(error = %err, remaining_workers, "Concurrent worker exited with error");
228-
if first_error.is_none() {
229-
first_error = Some(err);
230-
}
222+
error!(
223+
task_id = %task_id,
224+
error = %err,
225+
remaining_workers,
226+
"Concurrent worker exited with error"
227+
);
228+
errors.push(WorkerError::Failure(task_id, err));
231229
}
232230
Err(join_err) => {
233231
let err: BoxError = Box::new(join_err);
234-
error!(error = %err, remaining_workers, "Concurrent worker panicked");
235-
if first_error.is_none() {
236-
first_error = Some(err);
237-
}
232+
error!(
233+
task_id = %task_id,
234+
error = %err,
235+
remaining_workers,
236+
"Concurrent worker panicked"
237+
);
238+
errors.push(WorkerError::Failure(task_id, err));
238239
}
239240
}
240241
}
241242

242-
match first_error {
243-
Some(err) => Err(err),
244-
None => Ok(()),
243+
match errors.len() {
244+
0 => Ok(()),
245+
_ => Err(Box::new(ConcurrentWorkerErrors { errors })),
246+
}
247+
}
248+
}
249+
250+
#[derive(Debug)]
251+
enum WorkerError {
252+
CleanExit(tokio::task::Id),
253+
Failure(tokio::task::Id, BoxError),
254+
}
255+
256+
type WorkerJoinResult = (tokio::task::Id, Result<Result<(), BoxError>, tokio::task::JoinError>);
257+
type WorkerJoinFuture = BoxFuture<'static, WorkerJoinResult>;
258+
259+
#[derive(Debug)]
260+
struct ConcurrentWorkerErrors {
261+
errors: Vec<WorkerError>,
262+
}
263+
264+
impl fmt::Display for ConcurrentWorkerErrors {
265+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
266+
let mut clean = Vec::new();
267+
let mut failures = Vec::new();
268+
for error in &self.errors {
269+
match error {
270+
WorkerError::CleanExit(task_id) => clean.push(task_id),
271+
WorkerError::Failure(task_id, err) => failures.push((task_id, err)),
272+
}
273+
}
274+
275+
if failures.is_empty() && !clean.is_empty() {
276+
write!(
277+
f,
278+
"all concurrent workers exited cleanly (unexpected - loop should run forever)"
279+
)?;
280+
for task_id in clean {
281+
write!(f, " [task {task_id}]")?;
282+
}
283+
return Ok(());
284+
}
285+
286+
write!(f, "concurrent workers exited unexpectedly")?;
287+
if !clean.is_empty() {
288+
write!(f, "; clean exits:")?;
289+
for task_id in clean {
290+
write!(f, " [task {task_id}]")?;
291+
}
292+
}
293+
if !failures.is_empty() {
294+
write!(f, "; failures:")?;
295+
for (task_id, err) in failures {
296+
write!(f, " [task {task_id}] {err}")?;
297+
}
245298
}
299+
Ok(())
246300
}
247301
}
248302

303+
impl std::error::Error for ConcurrentWorkerErrors {}
304+
249305
impl<S> Runtime<S>
250306
where
251307
S: Service<LambdaInvocation, Response = (), Error = BoxError>,
252308
{
253309
/// Start the runtime and begin polling for events on the Lambda Runtime API.
310+
///
311+
/// If `AWS_LAMBDA_MAX_CONCURRENCY` is set, this returns an error because it does not enable
312+
/// concurrent polling. Use [`Runtime::run_concurrent`] instead.
254313
pub async fn run(self) -> Result<(), BoxError> {
314+
if let Some(raw) = concurrency_env_value() {
315+
return Err(Box::new(io::Error::other(format!(
316+
"AWS_LAMBDA_MAX_CONCURRENCY is set to '{raw}', but Runtime::run does not support concurrent polling; use Runtime::run_concurrent instead"
317+
))));
318+
}
255319
let incoming = incoming(&self.client);
256320
Self::run_with_incoming(self.service, self.config, incoming).await
257321
}
@@ -332,21 +396,29 @@ fn max_concurrency_from_env() -> Option<u32> {
332396
.filter(|&c| c > 0)
333397
}
334398

399+
fn concurrency_env_value() -> Option<String> {
400+
env::var("AWS_LAMBDA_MAX_CONCURRENCY").ok()
401+
}
402+
335403
async fn concurrent_worker_loop<S>(mut service: S, config: Arc<Config>, client: Arc<ApiClient>) -> Result<(), BoxError>
336404
where
337405
S: Service<LambdaInvocation, Response = (), Error = BoxError>,
338406
S::Future: Send,
339407
{
408+
let task_id = tokio::task::id();
409+
let span = info_span!("concurrent_worker_loop", task_id = %task_id);
340410
loop {
341411
let event = match next_event_future(client.clone()).await {
342412
Ok(event) => event,
343413
Err(e) => {
344-
warn!(error = %e, "Error polling /next, retrying");
414+
warn!(task_id = %task_id, error = %e, "Error polling /next, retrying");
345415
continue;
346416
}
347417
};
348418

349-
process_invocation(&mut service, &config, event, false).await?;
419+
process_invocation(&mut service, &config, event, false)
420+
.instrument(span.clone())
421+
.await?;
350422
}
351423
}
352424

@@ -378,11 +450,6 @@ where
378450
if set_amzn_trace_env {
379451
// Setup Amazon's default tracing data
380452
amzn_trace_env(&invocation.context);
381-
} else {
382-
// Inform users that X-Ray is available via context, not env var, in concurrent mode.
383-
XRAY_LOGGED.get_or_init(|| {
384-
trace!("Concurrent mode: _X_AMZN_TRACE_ID is not set; use context.xray_trace_id");
385-
});
386453
}
387454

388455
// Wait for service to be ready

0 commit comments

Comments
 (0)