Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions bottlecap/src/bin/bottlecap/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ async fn extension_loop_active(
let mut p = invocation_processor.lock().await;
p.on_platform_runtime_done(
&runtime_done_meta.request_id,
runtime_done_meta.metrics.duration_ms,
runtime_done_meta.metrics,
runtime_done_meta.status,
config.clone(),
tags_provider.clone(),
Expand Down Expand Up @@ -444,7 +444,7 @@ async fn extension_loop_active(
let mut p = invocation_processor.lock().await;
p.on_platform_runtime_done(
&runtime_done_meta.request_id,
runtime_done_meta.metrics.duration_ms,
runtime_done_meta.metrics,
runtime_done_meta.status,
config.clone(),
tags_provider.clone(),
Expand Down Expand Up @@ -495,7 +495,7 @@ async fn extension_loop_active(
let mut p = invocation_processor.lock().await;
p.on_platform_runtime_done(
&runtime_done_meta.request_id,
runtime_done_meta.metrics.duration_ms,
runtime_done_meta.metrics,
runtime_done_meta.status,
config.clone(),
tags_provider.clone(),
Expand Down
8 changes: 4 additions & 4 deletions bottlecap/src/lifecycle/invocation/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::{
CPUData, NetworkData,
},
tags::{lambda::tags::resolve_runtime_from_proc, provider},
telemetry::events::{ReportMetrics, Status},
telemetry::events::{ReportMetrics, RuntimeDoneMetrics, Status},
traces::{
context::SpanContext,
propagation::{
Expand Down Expand Up @@ -261,7 +261,7 @@ impl Processor {
pub async fn on_platform_runtime_done(
&mut self,
request_id: &String,
duration_ms: f64,
metrics: RuntimeDoneMetrics,
status: Status,
config: Arc<config::Config>,
tags_provider: Arc<provider::Provider>,
Expand All @@ -270,11 +270,11 @@ impl Processor {
timestamp: i64,
) {
self.context_buffer
.add_runtime_duration(request_id, duration_ms);
.add_runtime_duration(request_id, metrics.duration_ms);

// Set the runtime duration metric
self.enhanced_metrics
.set_runtime_duration_metric(duration_ms, timestamp);
.set_runtime_done_metrics(&metrics, timestamp);

if status != Status::Success {
// Increment the error metric
Expand Down
60 changes: 56 additions & 4 deletions bottlecap/src/metrics/enhanced/lambda.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::metrics::enhanced::{
statfs::statfs_info,
};
use crate::proc::{self, CPUData, NetworkData};
use crate::telemetry::events::ReportMetrics;
use crate::telemetry::events::{ReportMetrics, RuntimeDoneMetrics};
use dogstatsd::metric;
use dogstatsd::metric::{Metric, MetricValue};
use dogstatsd::{aggregator::Aggregator, metric::SortedTags};
Expand Down Expand Up @@ -127,13 +127,13 @@ impl Lambda {
}
}

pub fn set_runtime_duration_metric(&self, duration_ms: f64, timestamp: i64) {
pub fn set_runtime_done_metrics(&self, metrics: &RuntimeDoneMetrics, timestamp: i64) {
if !self.config.enhanced_metrics {
return;
}
let metric = Metric::new(
constants::RUNTIME_DURATION_METRIC.into(),
MetricValue::distribution(duration_ms),
MetricValue::distribution(metrics.duration_ms),
// Datadog expects this value as milliseconds, not seconds
self.get_dynamic_value_tags(),
Some(timestamp),
Expand All @@ -146,6 +146,24 @@ impl Lambda {
{
error!("failed to insert runtime duration metric: {}", e);
}

if let Some(produced_bytes) = metrics.produced_bytes {
let metric = Metric::new(
constants::PRODUCED_BYTES_METRIC.into(),
MetricValue::distribution(produced_bytes as f64),
// Datadog expects this value as milliseconds, not seconds
self.get_dynamic_value_tags(),
Some(timestamp),
);
if let Err(e) = self
.aggregator
.lock()
.expect("lock poisoned")
.insert(metric)
{
error!("failed to insert produced bytes metric: {}", e);
}
}
}

pub fn set_post_runtime_duration_metric(&self, duration_ms: f64, timestamp: i64) {
Expand Down Expand Up @@ -842,7 +860,13 @@ mod tests {
lambda.increment_errors_metric(now);
lambda.increment_timeout_metric(now);
lambda.set_init_duration_metric(100.0, now);
lambda.set_runtime_duration_metric(100.0, now);
lambda.set_runtime_done_metrics(
&RuntimeDoneMetrics {
duration_ms: 100.0,
produced_bytes: Some(42 as u64),
},
now,
);
lambda.set_post_runtime_duration_metric(100.0, now);
lambda.set_report_log_metrics(
&ReportMetrics {
Expand Down Expand Up @@ -871,6 +895,9 @@ mod tests {
assert!(aggr
.get_entry_by_id(constants::RUNTIME_DURATION_METRIC.into(), &None, now)
.is_none());
assert!(aggr
.get_entry_by_id(constants::PRODUCED_BYTES_METRIC.into(), &None, now)
.is_none());
assert!(aggr
.get_entry_by_id(constants::POST_RUNTIME_DURATION_METRIC.into(), &None, now)
.is_none());
Expand Down Expand Up @@ -949,6 +976,31 @@ mod tests {
.is_none());
}

#[test]
fn test_set_runtime_done_metrics() {
let (metrics_aggr, my_config) = setup();
let lambda = Lambda::new(metrics_aggr.clone(), my_config);
let runtime_done_metrics = RuntimeDoneMetrics {
duration_ms: 100.0,
produced_bytes: Some(42 as u64),
};
let now: i64 = std::time::UNIX_EPOCH
.elapsed()
.expect("unable to poll clock, unrecoverable")
.as_secs()
.try_into()
.unwrap_or_default();
lambda.set_runtime_done_metrics(&runtime_done_metrics, now);

assert_sketch(
&metrics_aggr,
constants::RUNTIME_DURATION_METRIC,
100.0,
now,
);
assert_sketch(&metrics_aggr, constants::PRODUCED_BYTES_METRIC, 42.0, now);
}

#[test]
fn test_set_report_log_metrics() {
let (metrics_aggr, my_config) = setup();
Expand Down