Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions bottlecap/src/bin/bottlecap/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ use bottlecap::{
event_bus::bus::EventBus,
events::Event,
lifecycle::{
flush_control::FlushControl,
invocation::{context::ContextBuffer, processor::Processor as InvocationProcessor},
flush_control::FlushControl, invocation::processor::Processor as InvocationProcessor,
listener::Listener as LifecycleListener,
},
logger,
Expand Down Expand Up @@ -330,20 +329,18 @@ async fn extension_loop_active(
let mut metrics_flusher =
start_metrics_flusher(resolved_api_key.clone(), &metrics_aggr, config);
// Lifecycle Invocation Processor
let context_buffer = Arc::new(Mutex::new(ContextBuffer::default()));
let invocation_processor = Arc::new(TokioMutex::new(InvocationProcessor::new(
Arc::clone(&tags_provider),
Arc::clone(config),
aws_config,
Arc::clone(&metrics_aggr),
Arc::clone(&context_buffer),
)));

let (trace_agent_channel, trace_flusher, trace_processor, stats_flusher) = start_trace_agent(
config,
resolved_api_key.clone(),
&tags_provider,
context_buffer.clone(),
Arc::clone(&invocation_processor),
);

let lifecycle_listener = LifecycleListener {
Expand Down Expand Up @@ -695,7 +692,7 @@ fn start_trace_agent(
config: &Arc<Config>,
resolved_api_key: String,
tags_provider: &Arc<TagProvider>,
context_buffer: Arc<Mutex<ContextBuffer>>,
invocation_processor: Arc<TokioMutex<InvocationProcessor>>,
) -> (
Sender<datadog_trace_utils::send_data::SendData>,
Arc<trace_flusher::ServerlessTraceFlusher>,
Expand Down Expand Up @@ -739,8 +736,8 @@ fn start_trace_agent(
trace_processor.clone(),
stats_aggregator,
stats_processor,
invocation_processor,
Arc::clone(tags_provider),
context_buffer,
resolved_api_key,
));
let trace_agent_channel = trace_agent.get_sender_copy();
Expand Down
48 changes: 23 additions & 25 deletions bottlecap/src/lifecycle/invocation/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ const TAG_SAMPLING_PRIORITY: &str = "_sampling_priority_v1";

pub struct Processor {
// Buffer containing context of the previous 5 invocations
context_buffer: Arc<Mutex<ContextBuffer>>,
context_buffer: ContextBuffer,
// Helper to infer span information
inferrer: SpanInferrer,
// Current invocation span
Expand Down Expand Up @@ -79,7 +79,6 @@ impl Processor {
config: Arc<config::Config>,
aws_config: &AwsConfig,
metrics_aggregator: Arc<Mutex<MetricsAggregator>>,
context_buffer: Arc<Mutex<ContextBuffer>>,
) -> Self {
let service = config.service.clone().unwrap_or(String::from("aws.lambda"));
let resource = tags_provider
Expand All @@ -89,7 +88,7 @@ impl Processor {
let propagator = DatadogCompositePropagator::new(Arc::clone(&config));

Processor {
context_buffer,
context_buffer: ContextBuffer::default(),
inferrer: SpanInferrer::new(config.service_mapping.clone()),
span: create_empty_span(String::from("aws.lambda"), resource, service),
cold_start_span: None,
Expand All @@ -115,8 +114,7 @@ impl Processor {
self.reset_state();
self.set_init_tags();

let mut context_buffer = self.context_buffer.lock().expect("lock poisoned");
context_buffer.create_context(&request_id);
self.context_buffer.create_context(&request_id);
if self.config.enhanced_metrics {
// Collect offsets for network and cpu metrics
let network_offset: Option<NetworkData> = proc::get_network_data().ok();
Expand All @@ -139,9 +137,9 @@ impl Processor {
tmp_chan_tx,
process_chan_tx,
});
context_buffer.add_enhanced_metric_data(&request_id, enhanced_metric_offsets);
self.context_buffer
.add_enhanced_metric_data(&request_id, enhanced_metric_offsets);
}
drop(context_buffer);

// Increment the invocation metric
self.enhanced_metrics.increment_invocation_metric(timestamp);
Expand Down Expand Up @@ -173,8 +171,7 @@ impl Processor {
let mut cold_start = false;

// If it's empty, then we are in a cold start
let context_buffer = self.context_buffer.lock().expect("lock poisoned");
if context_buffer.is_empty() {
if self.context_buffer.is_empty() {
let now = Instant::now();
let time_since_sandbox_init = now.duration_since(self.aws_config.sandbox_init_time);
if time_since_sandbox_init.as_millis() > PROACTIVE_INITIALIZATION_THRESHOLD_MS.into() {
Expand All @@ -188,7 +185,6 @@ impl Processor {
self.enhanced_metrics.set_runtime_tag(&runtime);
self.runtime = Some(runtime);
}
drop(context_buffer);

if proactive_initialization {
self.span.meta.insert(
Expand Down Expand Up @@ -256,8 +252,7 @@ impl Processor {
.as_nanos()
.try_into()
.unwrap_or_default();
let mut context_buffer = self.context_buffer.lock().expect("lock poisoned");
context_buffer.add_start_time(&request_id, start_time);
self.context_buffer.add_start_time(&request_id, start_time);
self.span.start = start_time;
}

Expand Down Expand Up @@ -302,9 +297,9 @@ impl Processor {

{
// Scope to drop the lock on context_buffer
let mut context_buffer = self.context_buffer.lock().expect("lock poisoned");
context_buffer.add_runtime_duration(request_id, metrics.duration_ms);
if let Some(context) = context_buffer.get(request_id) {
self.context_buffer
.add_runtime_duration(request_id, metrics.duration_ms);
if let Some(context) = self.context_buffer.get(request_id) {
// `round` is intentionally meant to be a whole integer
self.span.duration = (context.runtime_duration_ms * MS_TO_NS).round() as i64;
self.span
Expand Down Expand Up @@ -385,7 +380,6 @@ impl Processor {
vec![traces],
body_size,
self.inferrer.span_pointers.clone(),
self.context_buffer.clone(),
);

if let Err(e) = trace_agent_tx.send(send_data).await {
Expand All @@ -410,8 +404,7 @@ impl Processor {
self.enhanced_metrics
.set_report_log_metrics(&metrics, timestamp);

let context_buffer = self.context_buffer.lock().expect("lock poisoned");
if let Some(context) = context_buffer.get(request_id) {
if let Some(context) = self.context_buffer.get(request_id) {
if context.runtime_duration_ms != 0.0 {
let post_runtime_duration_ms = metrics.duration_ms - context.runtime_duration_ms;

Expand Down Expand Up @@ -655,6 +648,17 @@ impl Processor {
pub fn on_out_of_memory_error(&mut self, timestamp: i64) {
self.enhanced_metrics.increment_oom_metric(timestamp);
}

/// Add a tracer span to the context buffer for the given `request_id`, if present.
///
/// This is used to enrich the invocation span with additional metadata from the tracers
/// top level span, since we discard the tracer span when we create the invocation span.
pub fn add_tracer_span(&mut self, span: &Span) {
if let Some(request_id) = span.meta.get("request_id") {
self.context_buffer
.add_tracer_span(request_id, Some(span.clone()));
}
}
}

#[cfg(test)]
Expand Down Expand Up @@ -693,13 +697,7 @@ mod tests {
Aggregator::new(EMPTY_TAGS, 1024).expect("failed to create aggregator"),
));

Processor::new(
tags_provider,
config,
&aws_config,
metrics_aggregator,
Arc::new(Mutex::new(ContextBuffer::default())),
)
Processor::new(tags_provider, config, &aws_config, metrics_aggregator)
}

#[test]
Expand Down
38 changes: 24 additions & 14 deletions bottlecap/src/traces/trace_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,18 @@ use serde_json::json;
use std::convert::Infallible;
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::Mutex as SyncMutex;
use std::time::Instant;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::sync::Mutex;
use tracing::{debug, error};

use crate::config;
use crate::http_client;
use crate::lifecycle::invocation::context::ContextBuffer;
use crate::lifecycle::invocation::processor::Processor as InvocationProcessor;
use crate::tags::provider;
use crate::traces::{stats_aggregator, stats_processor, trace_aggregator, trace_processor};
use crate::traces::{
stats_aggregator, stats_processor, trace_aggregator, trace_processor, INVOCATION_SPAN_RESOURCE,
};
use datadog_trace_mini_agent::http_utils::{
self, log_and_create_http_response, log_and_create_traces_success_http_response,
};
Expand All @@ -45,7 +46,7 @@ pub struct TraceAgent {
pub stats_aggregator: Arc<Mutex<stats_aggregator::StatsAggregator>>,
pub stats_processor: Arc<dyn stats_processor::StatsProcessor + Send + Sync>,
pub tags_provider: Arc<provider::Provider>,
pub context_buffer: Arc<SyncMutex<ContextBuffer>>,
invocation_processor: Arc<Mutex<InvocationProcessor>>,
http_client: reqwest::Client,
api_key: String,
tx: Sender<SendData>,
Expand All @@ -66,8 +67,8 @@ impl TraceAgent {
trace_processor: Arc<dyn trace_processor::TraceProcessor + Send + Sync>,
stats_aggregator: Arc<Mutex<stats_aggregator::StatsAggregator>>,
stats_processor: Arc<dyn stats_processor::StatsProcessor + Send + Sync>,
invocation_processor: Arc<Mutex<InvocationProcessor>>,
tags_provider: Arc<provider::Provider>,
context_buffer: Arc<SyncMutex<ContextBuffer>>,
resolved_api_key: String,
) -> TraceAgent {
// setup a channel to send processed traces to our flusher. tx is passed through each
Expand All @@ -91,8 +92,8 @@ impl TraceAgent {
trace_processor,
stats_aggregator,
stats_processor,
invocation_processor,
tags_provider,
context_buffer,
http_client: http_client::get_client(config),
tx: trace_tx,
api_key: resolved_api_key,
Expand Down Expand Up @@ -123,7 +124,7 @@ impl TraceAgent {
let stats_processor = self.stats_processor.clone();
let endpoint_config = self.config.clone();
let tags_provider = self.tags_provider.clone();
let context_buffer = self.context_buffer.clone();
let invocation_processor = self.invocation_processor.clone();
let client = self.http_client.clone();
let api_key = self.api_key.clone();

Expand All @@ -136,7 +137,7 @@ impl TraceAgent {

let endpoint_config = endpoint_config.clone();
let tags_provider = tags_provider.clone();
let context_buffer = context_buffer.clone();
let invocation_processor = invocation_processor.clone();
let client = client.clone();
let api_key = api_key.clone();

Expand All @@ -148,8 +149,8 @@ impl TraceAgent {
trace_tx.clone(),
stats_processor.clone(),
stats_tx.clone(),
invocation_processor.clone(),
tags_provider.clone(),
context_buffer.clone(),
client.clone(),
api_key.clone(),
)
Expand Down Expand Up @@ -187,8 +188,8 @@ impl TraceAgent {
trace_tx: Sender<SendData>,
stats_processor: Arc<dyn stats_processor::StatsProcessor + Send + Sync>,
stats_tx: Sender<pb::ClientStatsPayload>,
invocation_processor: Arc<Mutex<InvocationProcessor>>,
tags_provider: Arc<provider::Provider>,
context_buffer: Arc<SyncMutex<ContextBuffer>>,
client: reqwest::Client,
api_key: String,
) -> http::Result<Response<Body>> {
Expand All @@ -198,9 +199,9 @@ impl TraceAgent {
req,
trace_processor.clone(),
trace_tx,
invocation_processor.clone(),
tags_provider,
ApiVersion::V04,
context_buffer.clone(),
)
.await
{
Expand All @@ -215,9 +216,9 @@ impl TraceAgent {
req,
trace_processor.clone(),
trace_tx,
invocation_processor.clone(),
tags_provider,
ApiVersion::V05,
context_buffer.clone(),
)
.await
{
Expand Down Expand Up @@ -276,9 +277,9 @@ impl TraceAgent {
req: Request<Body>,
trace_processor: Arc<dyn trace_processor::TraceProcessor + Send + Sync>,
trace_tx: Sender<SendData>,
invocation_processor: Arc<Mutex<InvocationProcessor>>,
tags_provider: Arc<provider::Provider>,
version: ApiVersion,
context_buffer: Arc<SyncMutex<ContextBuffer>>,
) -> http::Result<Response<Body>> {
let (parts, body) = req.into_parts();

Expand Down Expand Up @@ -313,14 +314,23 @@ impl TraceAgent {
},
};

// Search for trace invocation span and send it to the invocation processor
for chunk in &traces {
for span in chunk {
if span.resource == INVOCATION_SPAN_RESOURCE {
let mut invocation_processor = invocation_processor.lock().await;
invocation_processor.add_tracer_span(span);
}
}
}

let send_data = trace_processor.process_traces(
config,
tags_provider,
tracer_header_tags,
traces,
body_size,
None,
context_buffer,
);

// send trace payload to our trace flusher
Expand Down
Loading
Loading