Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bottlecap/src/bin/bottlecap/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ async fn handle_event_bus_event(
let mut p = invocation_processor.lock().await;
p.on_platform_runtime_done(
request_id,
metrics.duration_ms,
metrics,
status,
config.clone(),
tags_provider.clone(),
Expand Down
8 changes: 4 additions & 4 deletions bottlecap/src/lifecycle/invocation/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::{
CPUData, NetworkData,
},
tags::{lambda::tags::resolve_runtime_from_proc, provider},
telemetry::events::{ReportMetrics, Status},
telemetry::events::{ReportMetrics, RuntimeDoneMetrics, Status},
traces::{
context::SpanContext,
propagation::{
Expand Down Expand Up @@ -261,7 +261,7 @@ impl Processor {
pub async fn on_platform_runtime_done(
&mut self,
request_id: &String,
duration_ms: f64,
metrics: RuntimeDoneMetrics,
status: Status,
config: Arc<config::Config>,
tags_provider: Arc<provider::Provider>,
Expand All @@ -270,11 +270,11 @@ impl Processor {
timestamp: i64,
) {
self.context_buffer
.add_runtime_duration(request_id, duration_ms);
.add_runtime_duration(request_id, metrics.duration_ms);

// Set the runtime duration metric
self.enhanced_metrics
.set_runtime_duration_metric(duration_ms, timestamp);
.set_runtime_done_metrics(&metrics, timestamp);

if status != Status::Success {
// Increment the error metric
Expand Down
60 changes: 56 additions & 4 deletions bottlecap/src/metrics/enhanced/lambda.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::metrics::enhanced::{
statfs::statfs_info,
};
use crate::proc::{self, CPUData, NetworkData};
use crate::telemetry::events::ReportMetrics;
use crate::telemetry::events::{ReportMetrics, RuntimeDoneMetrics};
use dogstatsd::metric;
use dogstatsd::metric::{Metric, MetricValue};
use dogstatsd::{aggregator::Aggregator, metric::SortedTags};
Expand Down Expand Up @@ -127,13 +127,13 @@ impl Lambda {
}
}

pub fn set_runtime_duration_metric(&self, duration_ms: f64, timestamp: i64) {
pub fn set_runtime_done_metrics(&self, metrics: &RuntimeDoneMetrics, timestamp: i64) {
if !self.config.enhanced_metrics {
return;
}
let metric = Metric::new(
constants::RUNTIME_DURATION_METRIC.into(),
MetricValue::distribution(duration_ms),
MetricValue::distribution(metrics.duration_ms),
// Datadog expects this value as milliseconds, not seconds
self.get_dynamic_value_tags(),
Some(timestamp),
Expand All @@ -146,6 +146,24 @@ impl Lambda {
{
error!("failed to insert runtime duration metric: {}", e);
}

if let Some(produced_bytes) = metrics.produced_bytes {
let metric = Metric::new(
constants::PRODUCED_BYTES_METRIC.into(),
MetricValue::distribution(produced_bytes as f64),
// Datadog expects this value as milliseconds, not seconds
self.get_dynamic_value_tags(),
Some(timestamp),
);
if let Err(e) = self
.aggregator
.lock()
.expect("lock poisoned")
.insert(metric)
{
error!("failed to insert produced bytes metric: {}", e);
}
}
}

pub fn set_post_runtime_duration_metric(&self, duration_ms: f64, timestamp: i64) {
Expand Down Expand Up @@ -842,7 +860,13 @@ mod tests {
lambda.increment_errors_metric(now);
lambda.increment_timeout_metric(now);
lambda.set_init_duration_metric(100.0, now);
lambda.set_runtime_duration_metric(100.0, now);
lambda.set_runtime_done_metrics(
&RuntimeDoneMetrics {
duration_ms: 100.0,
produced_bytes: Some(42 as u64),
},
now,
);
lambda.set_post_runtime_duration_metric(100.0, now);
lambda.set_report_log_metrics(
&ReportMetrics {
Expand Down Expand Up @@ -871,6 +895,9 @@ mod tests {
assert!(aggr
.get_entry_by_id(constants::RUNTIME_DURATION_METRIC.into(), &None, now)
.is_none());
assert!(aggr
.get_entry_by_id(constants::PRODUCED_BYTES_METRIC.into(), &None, now)
.is_none());
assert!(aggr
.get_entry_by_id(constants::POST_RUNTIME_DURATION_METRIC.into(), &None, now)
.is_none());
Expand Down Expand Up @@ -949,6 +976,31 @@ mod tests {
.is_none());
}

#[test]
fn test_set_runtime_done_metrics() {
let (metrics_aggr, my_config) = setup();
let lambda = Lambda::new(metrics_aggr.clone(), my_config);
let runtime_done_metrics = RuntimeDoneMetrics {
duration_ms: 100.0,
produced_bytes: Some(42 as u64),
};
let now: i64 = std::time::UNIX_EPOCH
.elapsed()
.expect("unable to poll clock, unrecoverable")
.as_secs()
.try_into()
.unwrap_or_default();
lambda.set_runtime_done_metrics(&runtime_done_metrics, now);

assert_sketch(
&metrics_aggr,
constants::RUNTIME_DURATION_METRIC,
100.0,
now,
);
assert_sketch(&metrics_aggr, constants::PRODUCED_BYTES_METRIC, 42.0, now);
}

#[test]
fn test_set_report_log_metrics() {
let (metrics_aggr, my_config) = setup();
Expand Down
Loading