Skip to content

Commit 0ebdbde

Browse files
test: verify request-ID isolation in concurrent exec (#1086)
* test: add request-ID isolation in concurrent exec Adds test to ensure request IDs are properly isolated in concurrent Lambda execution with structured logging. Verifies each concurrent invocation maintains unique request ID context.
1 parent c56dcce commit 0ebdbde

2 files changed

Lines changed: 174 additions & 0 deletions

File tree

lambda-runtime/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ idna_adapter = "=1.2.0"
7373
lambda_runtime = { path = ".", features = ["tracing", "graceful-shutdown"] }
7474
pin-project-lite = { workspace = true }
7575
tracing-appender = "0.2"
76+
tracing-capture = "0.1.0"
77+
tracing-subscriber = { version = "0.3", features = ["registry"] }
7678

7779
[package.metadata.docs.rs]
7880
all-features = true

lambda-runtime/src/runtime.rs

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -908,4 +908,176 @@ mod endpoint_tests {
908908
server_handle.abort();
909909
Ok(())
910910
}
911+
912+
#[tokio::test]
913+
#[cfg(feature = "experimental-concurrency")]
914+
async fn test_concurrent_structured_logging_isolation() -> Result<(), Error> {
915+
use std::collections::HashSet;
916+
use tracing::info;
917+
use tracing_capture::{CaptureLayer, SharedStorage};
918+
use tracing_subscriber::layer::SubscriberExt;
919+
920+
let storage = SharedStorage::default();
921+
let subscriber = tracing_subscriber::registry().with(CaptureLayer::new(&storage));
922+
tracing::subscriber::set_global_default(subscriber).unwrap();
923+
924+
let request_count = Arc::new(AtomicUsize::new(0));
925+
let done = Arc::new(tokio::sync::Notify::new());
926+
let listener = TcpListener::bind("127.0.0.1:0").await?;
927+
let addr = listener.local_addr()?;
928+
let base: http::Uri = format!("http://{addr}").parse()?;
929+
930+
let server_handle = {
931+
let request_count = request_count.clone();
932+
let done = done.clone();
933+
tokio::spawn(async move {
934+
loop {
935+
let (tcp, _) = match listener.accept().await {
936+
Ok(v) => v,
937+
Err(_) => return,
938+
};
939+
940+
let request_count = request_count.clone();
941+
let done = done.clone();
942+
let service = service_fn(move |req: Request<Incoming>| {
943+
let request_count = request_count.clone();
944+
let done = done.clone();
945+
async move {
946+
let (parts, body) = req.into_parts();
947+
if parts.method == Method::POST {
948+
let _ = body.collect().await;
949+
}
950+
951+
if parts.method == Method::GET && parts.uri.path() == "/2018-06-01/runtime/invocation/next"
952+
{
953+
let count = request_count.fetch_add(1, Ordering::SeqCst);
954+
if count < 300 {
955+
let request_id = format!("test-request-{}", count + 1);
956+
let res = Response::builder()
957+
.status(StatusCode::OK)
958+
.header("lambda-runtime-aws-request-id", &request_id)
959+
.header("lambda-runtime-deadline-ms", "9999999999999")
960+
.body(Full::new(Bytes::from_static(b"{}")))
961+
.unwrap();
962+
return Ok::<_, Infallible>(res);
963+
} else {
964+
done.notify_one();
965+
let res = Response::builder()
966+
.status(StatusCode::NO_CONTENT)
967+
.body(Full::new(Bytes::new()))
968+
.unwrap();
969+
return Ok::<_, Infallible>(res);
970+
}
971+
}
972+
973+
if parts.method == Method::POST && parts.uri.path().contains("/response") {
974+
let res = Response::builder()
975+
.status(StatusCode::OK)
976+
.body(Full::new(Bytes::new()))
977+
.unwrap();
978+
return Ok::<_, Infallible>(res);
979+
}
980+
981+
let res = Response::builder()
982+
.status(StatusCode::NOT_FOUND)
983+
.body(Full::new(Bytes::new()))
984+
.unwrap();
985+
Ok::<_, Infallible>(res)
986+
}
987+
});
988+
989+
let io = TokioIo::new(tcp);
990+
tokio::spawn(async move {
991+
let _ = ServerBuilder::new(TokioExecutor::new())
992+
.serve_connection(io, service)
993+
.await;
994+
});
995+
}
996+
})
997+
};
998+
999+
async fn test_handler(event: crate::LambdaEvent<serde_json::Value>) -> Result<(), Error> {
1000+
let request_id = &event.context.request_id;
1001+
info!(observed_request_id = request_id);
1002+
tokio::time::sleep(Duration::from_millis(100)).await;
1003+
Ok(())
1004+
}
1005+
1006+
let handler = crate::service_fn(test_handler);
1007+
let client = Arc::new(Client::builder().with_endpoint(base).build()?);
1008+
1009+
// Add tracing layer to capture span fields
1010+
use crate::layers::trace::TracingLayer;
1011+
use tower::ServiceBuilder;
1012+
let service = ServiceBuilder::new()
1013+
.layer(TracingLayer::new())
1014+
.service(wrap_handler(handler, client.clone()));
1015+
1016+
let runtime = Runtime {
1017+
client: client.clone(),
1018+
config: Arc::new(Config {
1019+
function_name: "test_fn".to_string(),
1020+
memory: 128,
1021+
version: "1".to_string(),
1022+
log_stream: "test_stream".to_string(),
1023+
log_group: "test_log".to_string(),
1024+
}),
1025+
service,
1026+
concurrency_limit: 3,
1027+
};
1028+
1029+
let runtime_handle = tokio::spawn(async move { runtime.run_concurrent().await });
1030+
1031+
done.notified().await;
1032+
// Give handlers time to complete after server signals done
1033+
tokio::time::sleep(Duration::from_millis(500)).await;
1034+
1035+
runtime_handle.abort();
1036+
server_handle.abort();
1037+
1038+
let storage = storage.lock();
1039+
let events: Vec<_> = storage
1040+
.all_events()
1041+
.filter(|e| e.value("observed_request_id").is_some())
1042+
.collect();
1043+
1044+
assert!(
1045+
events.len() >= 300,
1046+
"Should have at least 300 log entries, got {}",
1047+
events.len()
1048+
);
1049+
1050+
let mut seen_ids = HashSet::new();
1051+
for event in &events {
1052+
let observed_id = event["observed_request_id"].as_str().unwrap();
1053+
1054+
// Find the parent "Lambda runtime invoke" span and get its requestId
1055+
let span_request_id = event
1056+
.ancestors()
1057+
.find(|s| s.metadata().name() == "Lambda runtime invoke")
1058+
.and_then(|s| s.value("requestId"))
1059+
.and_then(|v| v.as_str())
1060+
.expect("Event should have a Lambda runtime invoke ancestor with requestId");
1061+
1062+
assert!(
1063+
observed_id.starts_with("test-request-"),
1064+
"Request ID should match pattern: {}",
1065+
observed_id
1066+
);
1067+
assert!(
1068+
seen_ids.insert(observed_id.to_string()),
1069+
"Request ID should be unique: {}",
1070+
observed_id
1071+
);
1072+
1073+
// Verify span request ID matches logged request ID
1074+
assert_eq!(
1075+
observed_id, span_request_id,
1076+
"Span request ID should match logged request ID: span={}, logged={}",
1077+
span_request_id, observed_id
1078+
);
1079+
}
1080+
1081+
Ok(())
1082+
}
9111083
}

0 commit comments

Comments
 (0)