From 3edfbf35baa6e9520325e72611b39bf9d498e91f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Thu, 27 Mar 2025 19:36:55 -0400 Subject: [PATCH 1/4] remove lock on `context_buffer` replaces sharing the `ContextBuffer`, now we directly access the `InvocationProcessor` to add the tracer top level span --- bottlecap/src/bin/bottlecap/main.rs | 11 ++--- .../src/lifecycle/invocation/processor.rs | 48 +++++++++---------- 2 files changed, 27 insertions(+), 32 deletions(-) diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 50e1d01ac..c232678fd 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -15,8 +15,7 @@ use bottlecap::{ event_bus::bus::EventBus, events::Event, lifecycle::{ - flush_control::FlushControl, - invocation::{context::ContextBuffer, processor::Processor as InvocationProcessor}, + flush_control::FlushControl, invocation::processor::Processor as InvocationProcessor, listener::Listener as LifecycleListener, }, logger, @@ -330,20 +329,18 @@ async fn extension_loop_active( let mut metrics_flusher = start_metrics_flusher(resolved_api_key.clone(), &metrics_aggr, config); // Lifecycle Invocation Processor - let context_buffer = Arc::new(Mutex::new(ContextBuffer::default())); let invocation_processor = Arc::new(TokioMutex::new(InvocationProcessor::new( Arc::clone(&tags_provider), Arc::clone(config), aws_config, Arc::clone(&metrics_aggr), - Arc::clone(&context_buffer), ))); let (trace_agent_channel, trace_flusher, trace_processor, stats_flusher) = start_trace_agent( config, resolved_api_key.clone(), &tags_provider, - context_buffer.clone(), + Arc::clone(&invocation_processor), ); let lifecycle_listener = LifecycleListener { @@ -695,7 +692,7 @@ fn start_trace_agent( config: &Arc, resolved_api_key: String, tags_provider: &Arc, - context_buffer: Arc>, + invocation_processor: Arc>, ) -> ( Sender, Arc, @@ -739,8 +736,8 @@ fn start_trace_agent( trace_processor.clone(), stats_aggregator, stats_processor, + invocation_processor, Arc::clone(tags_provider), - context_buffer, resolved_api_key, )); let trace_agent_channel = trace_agent.get_sender_copy(); diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index c0d0bc3c0..0419f0253 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -51,7 +51,7 @@ const TAG_SAMPLING_PRIORITY: &str = "_sampling_priority_v1"; pub struct Processor { // Buffer containing context of the previous 5 invocations - context_buffer: Arc>, + context_buffer: ContextBuffer, // Helper to infer span information inferrer: SpanInferrer, // Current invocation span @@ -79,7 +79,6 @@ impl Processor { config: Arc, aws_config: &AwsConfig, metrics_aggregator: Arc>, - context_buffer: Arc>, ) -> Self { let service = config.service.clone().unwrap_or(String::from("aws.lambda")); let resource = tags_provider @@ -89,7 +88,7 @@ impl Processor { let propagator = DatadogCompositePropagator::new(Arc::clone(&config)); Processor { - context_buffer, + context_buffer: ContextBuffer::default(), inferrer: SpanInferrer::new(config.service_mapping.clone()), span: create_empty_span(String::from("aws.lambda"), resource, service), cold_start_span: None, @@ -115,8 +114,7 @@ impl Processor { self.reset_state(); self.set_init_tags(); - let mut context_buffer = self.context_buffer.lock().expect("lock poisoned"); - context_buffer.create_context(&request_id); + self.context_buffer.create_context(&request_id); if self.config.enhanced_metrics { // Collect offsets for network and cpu metrics let network_offset: Option = proc::get_network_data().ok(); @@ -139,9 +137,9 @@ impl Processor { tmp_chan_tx, process_chan_tx, }); - context_buffer.add_enhanced_metric_data(&request_id, enhanced_metric_offsets); + self.context_buffer + .add_enhanced_metric_data(&request_id, enhanced_metric_offsets); } - drop(context_buffer); // Increment the invocation metric self.enhanced_metrics.increment_invocation_metric(timestamp); @@ -173,8 +171,7 @@ impl Processor { let mut cold_start = false; // If it's empty, then we are in a cold start - let context_buffer = self.context_buffer.lock().expect("lock poisoned"); - if context_buffer.is_empty() { + if self.context_buffer.is_empty() { let now = Instant::now(); let time_since_sandbox_init = now.duration_since(self.aws_config.sandbox_init_time); if time_since_sandbox_init.as_millis() > PROACTIVE_INITIALIZATION_THRESHOLD_MS.into() { @@ -188,7 +185,6 @@ impl Processor { self.enhanced_metrics.set_runtime_tag(&runtime); self.runtime = Some(runtime); } - drop(context_buffer); if proactive_initialization { self.span.meta.insert( @@ -256,8 +252,7 @@ impl Processor { .as_nanos() .try_into() .unwrap_or_default(); - let mut context_buffer = self.context_buffer.lock().expect("lock poisoned"); - context_buffer.add_start_time(&request_id, start_time); + self.context_buffer.add_start_time(&request_id, start_time); self.span.start = start_time; } @@ -302,9 +297,9 @@ impl Processor { { // Scope to drop the lock on context_buffer - let mut context_buffer = self.context_buffer.lock().expect("lock poisoned"); - context_buffer.add_runtime_duration(request_id, metrics.duration_ms); - if let Some(context) = context_buffer.get(request_id) { + self.context_buffer + .add_runtime_duration(request_id, metrics.duration_ms); + if let Some(context) = self.context_buffer.get(request_id) { // `round` is intentionally meant to be a whole integer self.span.duration = (context.runtime_duration_ms * MS_TO_NS).round() as i64; self.span @@ -385,7 +380,6 @@ impl Processor { vec![traces], body_size, self.inferrer.span_pointers.clone(), - self.context_buffer.clone(), ); if let Err(e) = trace_agent_tx.send(send_data).await { @@ -410,8 +404,7 @@ impl Processor { self.enhanced_metrics .set_report_log_metrics(&metrics, timestamp); - let context_buffer = self.context_buffer.lock().expect("lock poisoned"); - if let Some(context) = context_buffer.get(request_id) { + if let Some(context) = self.context_buffer.get(request_id) { if context.runtime_duration_ms != 0.0 { let post_runtime_duration_ms = metrics.duration_ms - context.runtime_duration_ms; @@ -655,6 +648,17 @@ impl Processor { pub fn on_out_of_memory_error(&mut self, timestamp: i64) { self.enhanced_metrics.increment_oom_metric(timestamp); } + + /// Add a tracer span to the context buffer for the given request_id, if present. + /// + /// This is used to enrich the invocation span with additional metadata from the tracers + /// top level span, since we discard the tracer span when we create the invocation span. + pub fn add_tracer_span(&mut self, tracer_top_level_span: &Span) { + if let Some(request_id) = tracer_top_level_span.meta.get("request_id") { + self.context_buffer + .add_tracer_span(request_id, Some(tracer_top_level_span.clone())); + } + } } #[cfg(test)] @@ -693,13 +697,7 @@ mod tests { Aggregator::new(EMPTY_TAGS, 1024).expect("failed to create aggregator"), )); - Processor::new( - tags_provider, - config, - &aws_config, - metrics_aggregator, - Arc::new(Mutex::new(ContextBuffer::default())), - ) + Processor::new(tags_provider, config, &aws_config, metrics_aggregator) } #[test] From 5d25af6576d57a803fa201412c1cb53153c2790f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Thu, 27 Mar 2025 19:37:22 -0400 Subject: [PATCH 2/4] make `TraceAgent` use `InvocationProcessor` --- bottlecap/src/traces/trace_agent.rs | 38 ++++++++++++++++++----------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index 611133066..983b8cc83 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -8,7 +8,6 @@ use serde_json::json; use std::convert::Infallible; use std::net::SocketAddr; use std::sync::Arc; -use std::sync::Mutex as SyncMutex; use std::time::Instant; use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::sync::Mutex; @@ -16,9 +15,11 @@ use tracing::{debug, error}; use crate::config; use crate::http_client; -use crate::lifecycle::invocation::context::ContextBuffer; +use crate::lifecycle::invocation::processor::Processor as InvocationProcessor; use crate::tags::provider; -use crate::traces::{stats_aggregator, stats_processor, trace_aggregator, trace_processor}; +use crate::traces::{ + stats_aggregator, stats_processor, trace_aggregator, trace_processor, INVOCATION_SPAN_RESOURCE, +}; use datadog_trace_mini_agent::http_utils::{ self, log_and_create_http_response, log_and_create_traces_success_http_response, }; @@ -45,7 +46,7 @@ pub struct TraceAgent { pub stats_aggregator: Arc>, pub stats_processor: Arc, pub tags_provider: Arc, - pub context_buffer: Arc>, + invocation_processor: Arc>, http_client: reqwest::Client, api_key: String, tx: Sender, @@ -66,8 +67,8 @@ impl TraceAgent { trace_processor: Arc, stats_aggregator: Arc>, stats_processor: Arc, + invocation_processor: Arc>, tags_provider: Arc, - context_buffer: Arc>, resolved_api_key: String, ) -> TraceAgent { // setup a channel to send processed traces to our flusher. tx is passed through each @@ -91,8 +92,8 @@ impl TraceAgent { trace_processor, stats_aggregator, stats_processor, + invocation_processor, tags_provider, - context_buffer, http_client: http_client::get_client(config), tx: trace_tx, api_key: resolved_api_key, @@ -123,7 +124,7 @@ impl TraceAgent { let stats_processor = self.stats_processor.clone(); let endpoint_config = self.config.clone(); let tags_provider = self.tags_provider.clone(); - let context_buffer = self.context_buffer.clone(); + let invocation_processor = self.invocation_processor.clone(); let client = self.http_client.clone(); let api_key = self.api_key.clone(); @@ -136,7 +137,7 @@ impl TraceAgent { let endpoint_config = endpoint_config.clone(); let tags_provider = tags_provider.clone(); - let context_buffer = context_buffer.clone(); + let invocation_processor = invocation_processor.clone(); let client = client.clone(); let api_key = api_key.clone(); @@ -148,8 +149,8 @@ impl TraceAgent { trace_tx.clone(), stats_processor.clone(), stats_tx.clone(), + invocation_processor.clone(), tags_provider.clone(), - context_buffer.clone(), client.clone(), api_key.clone(), ) @@ -187,8 +188,8 @@ impl TraceAgent { trace_tx: Sender, stats_processor: Arc, stats_tx: Sender, + invocation_processor: Arc>, tags_provider: Arc, - context_buffer: Arc>, client: reqwest::Client, api_key: String, ) -> http::Result> { @@ -198,9 +199,9 @@ impl TraceAgent { req, trace_processor.clone(), trace_tx, + invocation_processor.clone(), tags_provider, ApiVersion::V04, - context_buffer.clone(), ) .await { @@ -215,9 +216,9 @@ impl TraceAgent { req, trace_processor.clone(), trace_tx, + invocation_processor.clone(), tags_provider, ApiVersion::V05, - context_buffer.clone(), ) .await { @@ -276,9 +277,9 @@ impl TraceAgent { req: Request, trace_processor: Arc, trace_tx: Sender, + invocation_processor: Arc>, tags_provider: Arc, version: ApiVersion, - context_buffer: Arc>, ) -> http::Result> { let (parts, body) = req.into_parts(); @@ -313,6 +314,16 @@ impl TraceAgent { }, }; + // Search for trace invocation span and send it to the invocation processor + for chunk in &traces { + for span in chunk { + if span.resource == INVOCATION_SPAN_RESOURCE { + let mut invocation_processor = invocation_processor.lock().await; + invocation_processor.add_tracer_span(span); + } + } + } + let send_data = trace_processor.process_traces( config, tags_provider, @@ -320,7 +331,6 @@ impl TraceAgent { traces, body_size, None, - context_buffer, ); // send trace payload to our trace flusher From 6f87d3f1de29339d4a5fa70e3cb90388b238ce21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Thu, 27 Mar 2025 19:37:47 -0400 Subject: [PATCH 3/4] remove dependency on `ContextBuffer` --- bottlecap/src/traces/trace_processor.rs | 38 +++++++------------------ 1 file changed, 11 insertions(+), 27 deletions(-) diff --git a/bottlecap/src/traces/trace_processor.rs b/bottlecap/src/traces/trace_processor.rs index eb752fcdd..36eb48bfd 100644 --- a/bottlecap/src/traces/trace_processor.rs +++ b/bottlecap/src/traces/trace_processor.rs @@ -2,7 +2,6 @@ // SPDX-License-Identifier: Apache-2.0 use crate::config; -use crate::lifecycle::invocation::context::ContextBuffer; use crate::tags::provider; use crate::traces::span_pointers::{attach_span_pointers_to_meta, SpanPointer}; use crate::traces::{ @@ -23,7 +22,7 @@ use datadog_trace_utils::tracer_payload::{ }; use ddcommon::Endpoint; use std::str::FromStr; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use tracing::error; #[derive(Clone)] @@ -37,14 +36,13 @@ struct ChunkProcessor { obfuscation_config: Arc, tags_provider: Arc, span_pointers: Option>, - context_buffer: Arc>, } impl TraceChunkProcessor for ChunkProcessor { fn process(&mut self, chunk: &mut pb::TraceChunk, root_span_index: usize) { - chunk.spans.retain(|span| { - !filter_span_from_lambda_library_or_runtime(span, self.context_buffer.clone()) - }); + chunk + .spans + .retain(|span| !filter_span_from_lambda_library_or_runtime(span)); for span in &mut chunk.spans { // Service name could be incorrectly set to 'aws.lambda' // in datadog lambda libraries @@ -70,10 +68,7 @@ impl TraceChunkProcessor for ChunkProcessor { } } -fn filter_span_from_lambda_library_or_runtime( - span: &Span, - context_buffer: Arc>, -) -> bool { +fn filter_span_from_lambda_library_or_runtime(span: &Span) -> bool { if let Some(url) = span.meta.get("http.url") { if url.starts_with(LAMBDA_RUNTIME_URL_PREFIX) || url.starts_with(LAMBDA_EXTENSION_URL_PREFIX) @@ -107,10 +102,6 @@ fn filter_span_from_lambda_library_or_runtime( } } if span.resource == INVOCATION_SPAN_RESOURCE { - let mut guard = context_buffer.lock().expect("lock poisoned"); - if let Some(request_id) = span.meta.get("request_id") { - guard.add_tracer_span(request_id, Some(span.clone())); - } return true; } @@ -135,7 +126,6 @@ pub trait TraceProcessor { traces: Vec>, body_size: usize, span_pointers: Option>, - context_buffer: Arc>, ) -> SendData; } @@ -148,7 +138,6 @@ impl TraceProcessor for ServerlessTraceProcessor { traces: Vec>, body_size: usize, span_pointers: Option>, - context_buffer: Arc>, ) -> SendData { let mut payload = trace_utils::collect_trace_chunks( V07(traces), @@ -157,7 +146,6 @@ impl TraceProcessor for ServerlessTraceProcessor { obfuscation_config: self.obfuscation_config.clone(), tags_provider: tags_provider.clone(), span_pointers, - context_buffer, }, true, false, @@ -197,19 +185,16 @@ impl TraceProcessor for ServerlessTraceProcessor { #[cfg(test)] mod tests { - use datadog_trace_obfuscation::obfuscation_config::ObfuscationConfig; use std::{ collections::HashMap, - sync::{Arc, Mutex}, time::{SystemTime, UNIX_EPOCH}, }; - use crate::tags::provider::Provider; - use crate::traces::trace_processor::{self, TraceProcessor}; - use crate::LAMBDA_RUNTIME_SLUG; - use crate::{config::Config, lifecycle::invocation::context::ContextBuffer}; - use datadog_trace_protobuf::pb; - use datadog_trace_utils::{tracer_header_tags, tracer_payload::TracerPayloadCollection}; + use datadog_trace_obfuscation::obfuscation_config::ObfuscationConfig; + + use crate::{config::Config, tags::provider::Provider, LAMBDA_RUNTIME_SLUG}; + + use super::*; fn get_current_timestamp_nanos() -> i64 { i64::try_from( @@ -306,7 +291,7 @@ mod tests { dropped_p0_spans: 0, }; - let trace_processor = trace_processor::ServerlessTraceProcessor { + let trace_processor = ServerlessTraceProcessor { resolved_api_key: "foo".to_string(), obfuscation_config: Arc::new(ObfuscationConfig::new().unwrap()), }; @@ -319,7 +304,6 @@ mod tests { traces, 100, None, - Arc::new(Mutex::new(ContextBuffer::default())), ); let expected_tracer_payload = pb::TracerPayload { From d60cad2c6db1272d7a657fdc7e6f39cc90f3addf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Thu, 27 Mar 2025 22:32:28 -0400 Subject: [PATCH 4/4] fmt --- bottlecap/src/lifecycle/invocation/processor.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index 0419f0253..38b3d27a4 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -649,14 +649,14 @@ impl Processor { self.enhanced_metrics.increment_oom_metric(timestamp); } - /// Add a tracer span to the context buffer for the given request_id, if present. + /// Add a tracer span to the context buffer for the given `request_id`, if present. /// /// This is used to enrich the invocation span with additional metadata from the tracers /// top level span, since we discard the tracer span when we create the invocation span. - pub fn add_tracer_span(&mut self, tracer_top_level_span: &Span) { - if let Some(request_id) = tracer_top_level_span.meta.get("request_id") { + pub fn add_tracer_span(&mut self, span: &Span) { + if let Some(request_id) = span.meta.get("request_id") { self.context_buffer - .add_tracer_span(request_id, Some(tracer_top_level_span.clone())); + .add_tracer_span(request_id, Some(span.clone())); } } }