Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bottlecap/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions bottlecap/src/lifecycle/invocation/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{

use libdd_trace_protobuf::pb::Span;
use serde_json::Value;
use tracing::debug;
use tracing::{debug, warn};

#[derive(Debug, Clone, PartialEq)]
pub struct Context {
Expand Down Expand Up @@ -199,7 +199,7 @@ impl ContextBuffer {
{
return self.buffer.remove(i);
}
debug!("Context for request_id: {:?} not found", request_id);
warn!("Context for request_id: {:?} not found", request_id);

None
}
Expand Down
101 changes: 91 additions & 10 deletions bottlecap/src/lifecycle/invocation/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use libdd_trace_protobuf::pb::Span;
use libdd_trace_utils::tracer_header_tags;
use serde_json::Value;
use tokio::time::Instant;
use tracing::{debug, warn};
use tracing::{debug, trace, warn};

use crate::{
config::{self, aws::AwsConfig},
Expand Down Expand Up @@ -285,7 +285,10 @@ impl Processor {
/// This is used to create a cold start span, since this telemetry event does not
/// provide a `request_id`, we try to guess which invocation is the cold start.
pub fn on_platform_init_start(&mut self, time: DateTime<Utc>, runtime_version: Option<String>) {
if runtime_version.as_deref().map_or(false, |rv| rv.contains("DurableFunction")) {
if runtime_version
.as_deref()
.is_some_and(|rv| rv.contains("DurableFunction"))
{
self.enhanced_metrics.set_durable_function_tag();
}
let start_time: i64 = SystemTime::from(time)
Expand Down Expand Up @@ -768,6 +771,14 @@ impl Processor {
self.enhanced_metrics
.set_cpu_time_enhanced_metrics(offsets.cpu_offset.clone());
}

// Release the context now that all processing for this invocation is complete.
// This prevents unbounded memory growth across warm invocations.
self.context_buffer.remove(request_id);
trace!(
"Context released (buffer size after remove: {})",
self.context_buffer.size()
);
}

/// Handles Managed Instance mode platform report processing.
Expand Down Expand Up @@ -1609,8 +1620,7 @@ mod tests {
duration_ms,
status,
error_type,
should_have_context_after,
): (&str, bool, f64, Status, Option<String>, bool) = $value;
): (&str, bool, f64, Status, Option<String>) = $value;

let mut processor = setup();

Expand Down Expand Up @@ -1652,6 +1662,17 @@ mod tests {
stats_generator: Arc::new(StatsGenerator::new(stats_concentrator_handle)),
});

// Verify context state before on_platform_report
let request_id_string_for_get = request_id.to_string();
assert_eq!(
processor.context_buffer.get(&request_id_string_for_get).is_some(),
// Use setup_context because it dictates whether the request is handled up front,
// which in turn signals whether the request is valid/processed.
setup_context,
"Context existence mismatch for request_id: {}",
request_id
);

// Call on_platform_report
let request_id_string = request_id.to_string();
processor.on_platform_report(
Expand All @@ -1669,7 +1690,7 @@ mod tests {
let request_id_string_for_get = request_id.to_string();
assert_eq!(
processor.context_buffer.get(&request_id_string_for_get).is_some(),
should_have_context_after,
false,
"Context existence mismatch for request_id: {}",
request_id
);
Expand All @@ -1679,14 +1700,13 @@ mod tests {
}

platform_report_managed_instance_tests! {
// (request_id, setup_context, duration_ms, status, error_type, should_have_context_after)
// (request_id, setup_context, duration_ms, status, error_type)
test_on_platform_report_managed_instance_mode_with_valid_context: (
"test-request-id",
true, // setup context
123.45,
Status::Success,
None,
true, // context should still exist
),

test_on_platform_report_managed_instance_mode_without_context: (
Expand All @@ -1695,7 +1715,6 @@ mod tests {
123.45,
Status::Success,
None,
false, // context should not exist
),

test_on_platform_report_managed_instance_mode_with_error_status: (
Expand All @@ -1704,7 +1723,6 @@ mod tests {
200.0,
Status::Error,
Some("RuntimeError".to_string()),
true, // context should still exist
),

test_on_platform_report_managed_instance_mode_with_timeout: (
Expand All @@ -1713,10 +1731,73 @@ mod tests {
30000.0,
Status::Timeout,
None,
true, // context should still exist
),
}

#[tokio::test]
async fn test_context_removed_after_on_platform_report_on_demand() {
use libdd_trace_obfuscation::obfuscation_config::ObfuscationConfig;

let mut p = setup();
let request_id = String::from("test-request-id");

p.on_invoke_event(request_id.clone());
let start_time = chrono::Utc::now();
p.on_platform_start(request_id.clone(), start_time);
assert!(
p.context_buffer.get(&request_id).is_some(),
"context must exist before report"
);

let config = Arc::new(config::Config {
service: Some("test-service".to_string()),
..config::Config::default()
});
let tags_provider = Arc::new(provider::Provider::new(
Arc::clone(&config),
LAMBDA_RUNTIME_SLUG.to_string(),
&HashMap::from([("function_arn".to_string(), "test-arn".to_string())]),
));
let (stats_concentrator_service, stats_concentrator_handle) =
StatsConcentratorService::new(Arc::clone(&config));
tokio::spawn(stats_concentrator_service.run());
let trace_sender = Arc::new(SendingTraceProcessor {
appsec: None,
processor: Arc::new(trace_processor::ServerlessTraceProcessor {
obfuscation_config: Arc::new(
ObfuscationConfig::new().expect("Failed to create ObfuscationConfig"),
),
}),
trace_tx: tokio::sync::mpsc::channel(1).0,
stats_generator: Arc::new(StatsGenerator::new(stats_concentrator_handle)),
});

p.on_platform_report(
&request_id,
ReportMetrics::OnDemand(OnDemandReportMetrics {
duration_ms: 123.45,
billed_duration_ms: 124,
memory_size_mb: 256,
max_memory_used_mb: 128,
init_duration_ms: None,
restore_duration_ms: None,
}),
chrono::Utc::now().timestamp(),
Status::Success,
None,
None,
tags_provider,
trace_sender,
)
.await;

assert!(
p.context_buffer.get(&request_id).is_none(),
"context must be removed after on_platform_report completes"
);
assert_eq!(p.context_buffer.size(), 0);
}

#[tokio::test]
async fn test_on_platform_init_start_sets_durable_function_tag() {
let mut processor = setup();
Expand Down
Loading