From 03f60b4ee052f470037df2d21429a484b5271140 Mon Sep 17 00:00:00 2001 From: Tianning Li Date: Thu, 26 Feb 2026 13:51:04 -0500 Subject: [PATCH] fix(lifecycle): release invocation context after platform report to prevent memory leak Contexts added to ContextBuffer on every invocation were never removed after processing, causing unbounded memory growth across warm invocations. The growth was most visible when DD_CAPTURE_LAMBDA_PAYLOAD=true with large response payloads (issue #1049), but affects all invocations. Remove the context at the end of on_platform_report, which is the last point in the lifecycle where downstream code still reads context fields (runtime_duration_ms for post-runtime metrics, enhanced_metric_data for network/CPU metrics). Both on-demand and managed instance paths are fixed. Co-Authored-By: Claude Sonnet 4.6 --- bottlecap/Cargo.lock | 2 +- bottlecap/src/lifecycle/invocation/context.rs | 4 +- .../src/lifecycle/invocation/processor.rs | 101 ++++++++++++++++-- 3 files changed, 94 insertions(+), 13 deletions(-) diff --git a/bottlecap/Cargo.lock b/bottlecap/Cargo.lock index 41a6272da..bb6f5d25d 100644 --- a/bottlecap/Cargo.lock +++ b/bottlecap/Cargo.lock @@ -525,7 +525,7 @@ dependencies = [ "tokio", "tokio-util", "tonic-types", - "tower 0.5.3", + "tower", "tower-http", "tracing", "tracing-core", diff --git a/bottlecap/src/lifecycle/invocation/context.rs b/bottlecap/src/lifecycle/invocation/context.rs index 321540168..2c72786fd 100644 --- a/bottlecap/src/lifecycle/invocation/context.rs +++ b/bottlecap/src/lifecycle/invocation/context.rs @@ -9,7 +9,7 @@ use std::{ use libdd_trace_protobuf::pb::Span; use serde_json::Value; -use tracing::debug; +use tracing::{debug, warn}; #[derive(Debug, Clone, PartialEq)] pub struct Context { @@ -199,7 +199,7 @@ impl ContextBuffer { { return self.buffer.remove(i); } - debug!("Context for request_id: {:?} not found", request_id); + warn!("Context for request_id: {:?} not found", request_id); None } diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index ccb9f497e..b80798e0e 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -9,7 +9,7 @@ use libdd_trace_protobuf::pb::Span; use libdd_trace_utils::tracer_header_tags; use serde_json::Value; use tokio::time::Instant; -use tracing::{debug, warn}; +use tracing::{debug, trace, warn}; use crate::{ config::{self, aws::AwsConfig}, @@ -285,7 +285,10 @@ impl Processor { /// This is used to create a cold start span, since this telemetry event does not /// provide a `request_id`, we try to guess which invocation is the cold start. pub fn on_platform_init_start(&mut self, time: DateTime, runtime_version: Option) { - if runtime_version.as_deref().map_or(false, |rv| rv.contains("DurableFunction")) { + if runtime_version + .as_deref() + .is_some_and(|rv| rv.contains("DurableFunction")) + { self.enhanced_metrics.set_durable_function_tag(); } let start_time: i64 = SystemTime::from(time) @@ -768,6 +771,14 @@ impl Processor { self.enhanced_metrics .set_cpu_time_enhanced_metrics(offsets.cpu_offset.clone()); } + + // Release the context now that all processing for this invocation is complete. + // This prevents unbounded memory growth across warm invocations. + self.context_buffer.remove(request_id); + trace!( + "Context released (buffer size after remove: {})", + self.context_buffer.size() + ); } /// Handles Managed Instance mode platform report processing. @@ -1609,8 +1620,7 @@ mod tests { duration_ms, status, error_type, - should_have_context_after, - ): (&str, bool, f64, Status, Option, bool) = $value; + ): (&str, bool, f64, Status, Option) = $value; let mut processor = setup(); @@ -1652,6 +1662,17 @@ mod tests { stats_generator: Arc::new(StatsGenerator::new(stats_concentrator_handle)), }); + // Verify context state before on_platform_report + let request_id_string_for_get = request_id.to_string(); + assert_eq!( + processor.context_buffer.get(&request_id_string_for_get).is_some(), + // Use setup_context because it dictates whether the request is handled up front, + // which in turn signals whether the request is valid/processed. + setup_context, + "Context existence mismatch for request_id: {}", + request_id + ); + // Call on_platform_report let request_id_string = request_id.to_string(); processor.on_platform_report( @@ -1669,7 +1690,7 @@ mod tests { let request_id_string_for_get = request_id.to_string(); assert_eq!( processor.context_buffer.get(&request_id_string_for_get).is_some(), - should_have_context_after, + false, "Context existence mismatch for request_id: {}", request_id ); @@ -1679,14 +1700,13 @@ mod tests { } platform_report_managed_instance_tests! { - // (request_id, setup_context, duration_ms, status, error_type, should_have_context_after) + // (request_id, setup_context, duration_ms, status, error_type) test_on_platform_report_managed_instance_mode_with_valid_context: ( "test-request-id", true, // setup context 123.45, Status::Success, None, - true, // context should still exist ), test_on_platform_report_managed_instance_mode_without_context: ( @@ -1695,7 +1715,6 @@ mod tests { 123.45, Status::Success, None, - false, // context should not exist ), test_on_platform_report_managed_instance_mode_with_error_status: ( @@ -1704,7 +1723,6 @@ mod tests { 200.0, Status::Error, Some("RuntimeError".to_string()), - true, // context should still exist ), test_on_platform_report_managed_instance_mode_with_timeout: ( @@ -1713,10 +1731,73 @@ mod tests { 30000.0, Status::Timeout, None, - true, // context should still exist ), } + #[tokio::test] + async fn test_context_removed_after_on_platform_report_on_demand() { + use libdd_trace_obfuscation::obfuscation_config::ObfuscationConfig; + + let mut p = setup(); + let request_id = String::from("test-request-id"); + + p.on_invoke_event(request_id.clone()); + let start_time = chrono::Utc::now(); + p.on_platform_start(request_id.clone(), start_time); + assert!( + p.context_buffer.get(&request_id).is_some(), + "context must exist before report" + ); + + let config = Arc::new(config::Config { + service: Some("test-service".to_string()), + ..config::Config::default() + }); + let tags_provider = Arc::new(provider::Provider::new( + Arc::clone(&config), + LAMBDA_RUNTIME_SLUG.to_string(), + &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), + )); + let (stats_concentrator_service, stats_concentrator_handle) = + StatsConcentratorService::new(Arc::clone(&config)); + tokio::spawn(stats_concentrator_service.run()); + let trace_sender = Arc::new(SendingTraceProcessor { + appsec: None, + processor: Arc::new(trace_processor::ServerlessTraceProcessor { + obfuscation_config: Arc::new( + ObfuscationConfig::new().expect("Failed to create ObfuscationConfig"), + ), + }), + trace_tx: tokio::sync::mpsc::channel(1).0, + stats_generator: Arc::new(StatsGenerator::new(stats_concentrator_handle)), + }); + + p.on_platform_report( + &request_id, + ReportMetrics::OnDemand(OnDemandReportMetrics { + duration_ms: 123.45, + billed_duration_ms: 124, + memory_size_mb: 256, + max_memory_used_mb: 128, + init_duration_ms: None, + restore_duration_ms: None, + }), + chrono::Utc::now().timestamp(), + Status::Success, + None, + None, + tags_provider, + trace_sender, + ) + .await; + + assert!( + p.context_buffer.get(&request_id).is_none(), + "context must be removed after on_platform_report completes" + ); + assert_eq!(p.context_buffer.size(), 0); + } + #[tokio::test] async fn test_on_platform_init_start_sets_durable_function_tag() { let mut processor = setup();