diff --git a/bottlecap/Cargo.lock b/bottlecap/Cargo.lock index 41a6272da..bb6f5d25d 100644 --- a/bottlecap/Cargo.lock +++ b/bottlecap/Cargo.lock @@ -525,7 +525,7 @@ dependencies = [ "tokio", "tokio-util", "tonic-types", - "tower 0.5.3", + "tower", "tower-http", "tracing", "tracing-core", diff --git a/bottlecap/src/lifecycle/invocation/context.rs b/bottlecap/src/lifecycle/invocation/context.rs index 321540168..2c72786fd 100644 --- a/bottlecap/src/lifecycle/invocation/context.rs +++ b/bottlecap/src/lifecycle/invocation/context.rs @@ -9,7 +9,7 @@ use std::{ use libdd_trace_protobuf::pb::Span; use serde_json::Value; -use tracing::debug; +use tracing::{debug, warn}; #[derive(Debug, Clone, PartialEq)] pub struct Context { @@ -199,7 +199,7 @@ impl ContextBuffer { { return self.buffer.remove(i); } - debug!("Context for request_id: {:?} not found", request_id); + warn!("Context for request_id: {:?} not found", request_id); None } diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index ccb9f497e..b80798e0e 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -9,7 +9,7 @@ use libdd_trace_protobuf::pb::Span; use libdd_trace_utils::tracer_header_tags; use serde_json::Value; use tokio::time::Instant; -use tracing::{debug, warn}; +use tracing::{debug, trace, warn}; use crate::{ config::{self, aws::AwsConfig}, @@ -285,7 +285,10 @@ impl Processor { /// This is used to create a cold start span, since this telemetry event does not /// provide a `request_id`, we try to guess which invocation is the cold start. pub fn on_platform_init_start(&mut self, time: DateTime, runtime_version: Option) { - if runtime_version.as_deref().map_or(false, |rv| rv.contains("DurableFunction")) { + if runtime_version + .as_deref() + .is_some_and(|rv| rv.contains("DurableFunction")) + { self.enhanced_metrics.set_durable_function_tag(); } let start_time: i64 = SystemTime::from(time) @@ -768,6 +771,14 @@ impl Processor { self.enhanced_metrics .set_cpu_time_enhanced_metrics(offsets.cpu_offset.clone()); } + + // Release the context now that all processing for this invocation is complete. + // This prevents unbounded memory growth across warm invocations. + self.context_buffer.remove(request_id); + trace!( + "Context released (buffer size after remove: {})", + self.context_buffer.size() + ); } /// Handles Managed Instance mode platform report processing. @@ -1609,8 +1620,7 @@ mod tests { duration_ms, status, error_type, - should_have_context_after, - ): (&str, bool, f64, Status, Option, bool) = $value; + ): (&str, bool, f64, Status, Option) = $value; let mut processor = setup(); @@ -1652,6 +1662,17 @@ mod tests { stats_generator: Arc::new(StatsGenerator::new(stats_concentrator_handle)), }); + // Verify context state before on_platform_report + let request_id_string_for_get = request_id.to_string(); + assert_eq!( + processor.context_buffer.get(&request_id_string_for_get).is_some(), + // Use setup_context because it dictates whether the request is handled up front, + // which in turn signals whether the request is valid/processed. + setup_context, + "Context existence mismatch for request_id: {}", + request_id + ); + // Call on_platform_report let request_id_string = request_id.to_string(); processor.on_platform_report( @@ -1669,7 +1690,7 @@ mod tests { let request_id_string_for_get = request_id.to_string(); assert_eq!( processor.context_buffer.get(&request_id_string_for_get).is_some(), - should_have_context_after, + false, "Context existence mismatch for request_id: {}", request_id ); @@ -1679,14 +1700,13 @@ mod tests { } platform_report_managed_instance_tests! { - // (request_id, setup_context, duration_ms, status, error_type, should_have_context_after) + // (request_id, setup_context, duration_ms, status, error_type) test_on_platform_report_managed_instance_mode_with_valid_context: ( "test-request-id", true, // setup context 123.45, Status::Success, None, - true, // context should still exist ), test_on_platform_report_managed_instance_mode_without_context: ( @@ -1695,7 +1715,6 @@ mod tests { 123.45, Status::Success, None, - false, // context should not exist ), test_on_platform_report_managed_instance_mode_with_error_status: ( @@ -1704,7 +1723,6 @@ mod tests { 200.0, Status::Error, Some("RuntimeError".to_string()), - true, // context should still exist ), test_on_platform_report_managed_instance_mode_with_timeout: ( @@ -1713,10 +1731,73 @@ mod tests { 30000.0, Status::Timeout, None, - true, // context should still exist ), } + #[tokio::test] + async fn test_context_removed_after_on_platform_report_on_demand() { + use libdd_trace_obfuscation::obfuscation_config::ObfuscationConfig; + + let mut p = setup(); + let request_id = String::from("test-request-id"); + + p.on_invoke_event(request_id.clone()); + let start_time = chrono::Utc::now(); + p.on_platform_start(request_id.clone(), start_time); + assert!( + p.context_buffer.get(&request_id).is_some(), + "context must exist before report" + ); + + let config = Arc::new(config::Config { + service: Some("test-service".to_string()), + ..config::Config::default() + }); + let tags_provider = Arc::new(provider::Provider::new( + Arc::clone(&config), + LAMBDA_RUNTIME_SLUG.to_string(), + &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), + )); + let (stats_concentrator_service, stats_concentrator_handle) = + StatsConcentratorService::new(Arc::clone(&config)); + tokio::spawn(stats_concentrator_service.run()); + let trace_sender = Arc::new(SendingTraceProcessor { + appsec: None, + processor: Arc::new(trace_processor::ServerlessTraceProcessor { + obfuscation_config: Arc::new( + ObfuscationConfig::new().expect("Failed to create ObfuscationConfig"), + ), + }), + trace_tx: tokio::sync::mpsc::channel(1).0, + stats_generator: Arc::new(StatsGenerator::new(stats_concentrator_handle)), + }); + + p.on_platform_report( + &request_id, + ReportMetrics::OnDemand(OnDemandReportMetrics { + duration_ms: 123.45, + billed_duration_ms: 124, + memory_size_mb: 256, + max_memory_used_mb: 128, + init_duration_ms: None, + restore_duration_ms: None, + }), + chrono::Utc::now().timestamp(), + Status::Success, + None, + None, + tags_provider, + trace_sender, + ) + .await; + + assert!( + p.context_buffer.get(&request_id).is_none(), + "context must be removed after on_platform_report completes" + ); + assert_eq!(p.context_buffer.size(), 0); + } + #[tokio::test] async fn test_on_platform_init_start_sets_durable_function_tag() { let mut processor = setup();