Skip to content

Commit 9bbed8a

Browse files
committed
Add MaxGauge metric
1 parent 4093882 commit 9bbed8a

6 files changed

Lines changed: 170 additions & 4 deletions

File tree

src/lib.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@ pub use execution_plans::{
3232
};
3333
pub use metrics::{
3434
AvgLatencyMetric, BytesCounterMetric, BytesMetricExt, DISTRIBUTED_DATAFUSION_TASK_ID_LABEL,
35-
DistributedMetricsFormat, FirstLatencyMetric, LatencyMetricExt, MaxLatencyMetric,
36-
MinLatencyMetric, P50LatencyMetric, P75LatencyMetric, P95LatencyMetric, P99LatencyMetric,
37-
rewrite_distributed_plan_with_metrics,
35+
DistributedMetricsFormat, FirstLatencyMetric, GaugeMetricExt, LatencyMetricExt, MaxGaugeMetric,
36+
MaxLatencyMetric, MinLatencyMetric, P50LatencyMetric, P75LatencyMetric, P95LatencyMetric,
37+
P99LatencyMetric, rewrite_distributed_plan_with_metrics,
3838
};
3939
pub use networking::{
4040
BoxCloneSyncChannel, ChannelResolver, DefaultChannelResolver, WorkerResolver,

src/metrics/max_gauge_metric.rs

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
use datafusion::physical_plan::metrics::{CustomMetricValue, MetricBuilder, MetricValue};
2+
use std::sync::atomic::Ordering::Relaxed;
3+
use std::{
4+
any::Any,
5+
borrow::Cow,
6+
fmt::{Display, Formatter},
7+
sync::{Arc, atomic::AtomicUsize},
8+
};
9+
10+
/// Extension trait for DataFusion's metric system that adds support for a Gauge metric that
11+
/// aggregates to others using `max` instead of `sum`
12+
pub trait GaugeMetricExt {
13+
fn max_gauge(self, name: impl Into<Cow<'static, str>>) -> MaxGaugeMetric;
14+
}
15+
16+
impl GaugeMetricExt for MetricBuilder<'_> {
17+
fn max_gauge(self, name: impl Into<Cow<'static, str>>) -> MaxGaugeMetric {
18+
let value = MaxGaugeMetric::default();
19+
self.build(MetricValue::Custom {
20+
name: name.into(),
21+
value: Arc::new(value.clone()),
22+
});
23+
value
24+
}
25+
}
26+
27+
/// Similar to DataFusion's Gauge metric, but aggregates between instances using `max` instead of
28+
/// `sum`.
29+
#[derive(Debug, Clone)]
30+
pub struct MaxGaugeMetric {
31+
value: Arc<AtomicUsize>,
32+
}
33+
34+
impl Default for MaxGaugeMetric {
35+
fn default() -> Self {
36+
Self {
37+
value: Arc::new(AtomicUsize::new(usize::MIN)),
38+
}
39+
}
40+
}
41+
42+
impl MaxGaugeMetric {
43+
pub fn from_value(bytes: usize) -> Self {
44+
Self {
45+
value: Arc::new(AtomicUsize::new(bytes)),
46+
}
47+
}
48+
49+
pub fn value(&self) -> usize {
50+
self.value.load(Relaxed)
51+
}
52+
53+
pub fn set_max(&self, n: usize) {
54+
self.value.fetch_max(n, Relaxed);
55+
}
56+
}
57+
58+
impl Display for MaxGaugeMetric {
59+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
60+
write!(f, "{}", self.value())
61+
}
62+
}
63+
64+
impl CustomMetricValue for MaxGaugeMetric {
65+
fn new_empty(&self) -> Arc<dyn CustomMetricValue> {
66+
Arc::new(MaxGaugeMetric::default())
67+
}
68+
69+
fn aggregate(&self, other: Arc<dyn CustomMetricValue + 'static>) {
70+
let Some(other) = other.as_any().downcast_ref::<Self>() else {
71+
return;
72+
};
73+
self.value.fetch_max(other.value.load(Relaxed), Relaxed);
74+
}
75+
76+
fn as_any(&self) -> &dyn Any {
77+
self
78+
}
79+
80+
fn as_usize(&self) -> usize {
81+
self.value()
82+
}
83+
84+
fn is_eq(&self, other: &Arc<dyn CustomMetricValue>) -> bool {
85+
let Some(other) = other.as_any().downcast_ref::<Self>() else {
86+
return false;
87+
};
88+
other.value() == self.value()
89+
}
90+
}
91+
92+
#[cfg(test)]
93+
mod tests {
94+
use super::*;
95+
96+
#[test]
97+
fn default_is_zero_and_set_max_updates() {
98+
let m = MaxGaugeMetric::default();
99+
assert_eq!(m.value(), 0);
100+
m.set_max(1024);
101+
assert_eq!(m.value(), 1024);
102+
// Lower value should not decrease the gauge
103+
m.set_max(512);
104+
assert_eq!(m.value(), 1024);
105+
// Higher value should increase it
106+
m.set_max(2048);
107+
assert_eq!(m.value(), 2048);
108+
}
109+
110+
#[test]
111+
fn from_value_constructs_correctly() {
112+
let m = MaxGaugeMetric::from_value(1_000_000);
113+
assert_eq!(m.value(), 1_000_000);
114+
}
115+
116+
#[test]
117+
fn aggregate_takes_max() {
118+
let a = MaxGaugeMetric::from_value(500);
119+
let b = MaxGaugeMetric::from_value(300);
120+
a.aggregate(Arc::new(b));
121+
assert_eq!(a.value(), 500);
122+
123+
let a = MaxGaugeMetric::from_value(300);
124+
let b = MaxGaugeMetric::from_value(500);
125+
a.aggregate(Arc::new(b));
126+
assert_eq!(a.value(), 500);
127+
}
128+
}

src/metrics/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
mod bytes_metric;
22
mod latency_metric;
3+
mod max_gauge_metric;
34
pub(crate) mod proto;
45
mod task_metrics_collector;
56
mod task_metrics_rewriter;
@@ -9,6 +10,7 @@ pub use latency_metric::{
910
AvgLatencyMetric, FirstLatencyMetric, LatencyMetricExt, MaxLatencyMetric, MinLatencyMetric,
1011
P50LatencyMetric, P75LatencyMetric, P95LatencyMetric, P99LatencyMetric,
1112
};
13+
pub use max_gauge_metric::{GaugeMetricExt, MaxGaugeMetric};
1214
pub(crate) use task_metrics_collector::collect_plan_metrics;
1315
pub use task_metrics_rewriter::{DistributedMetricsFormat, rewrite_distributed_plan_with_metrics};
1416
/// Label used to annotate metrics in execution plan nodes with the task in which they were executed.

src/metrics/proto.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use super::latency_metric::{
1313
AvgLatencyMetric, FirstLatencyMetric, MaxLatencyMetric, MinLatencyMetric, P50LatencyMetric,
1414
P75LatencyMetric, P95LatencyMetric, P99LatencyMetric,
1515
};
16+
use crate::MaxGaugeMetric;
1617
use crate::worker::generated::worker as pb;
1718

1819
/// df_metrics_set_to_proto converts a [datafusion::physical_plan::metrics::MetricsSet] to a [pb::MetricsSet].
@@ -239,6 +240,15 @@ pub fn df_metric_to_proto(metric: Arc<Metric>) -> Result<pb::Metric, DataFusionE
239240
partition,
240241
labels,
241242
})
243+
} else if let Some(max_gauge) = value.as_any().downcast_ref::<MaxGaugeMetric>() {
244+
Ok(pb::Metric {
245+
value: Some(pb::metric::Value::MaxGauge(pb::MaxGauge {
246+
name: name.to_string(),
247+
value: max_gauge.value() as u64,
248+
})),
249+
partition,
250+
labels,
251+
})
242252
} else {
243253
internal_err!("{}", CUSTOM_METRICS_NOT_SUPPORTED)
244254
}
@@ -554,6 +564,17 @@ pub fn metric_proto_to_df(metric: pb::Metric) -> Result<Arc<Metric>, DataFusionE
554564
labels,
555565
)))
556566
}
567+
Some(pb::metric::Value::MaxGauge(gauge)) => {
568+
let value = MaxGaugeMetric::from_value(gauge.value as usize);
569+
Ok(Arc::new(Metric::new_with_labels(
570+
MetricValue::Custom {
571+
name: Cow::Owned(gauge.name),
572+
value: Arc::new(value),
573+
},
574+
partition,
575+
labels,
576+
)))
577+
}
557578
None => internal_err!("proto metric is missing the metric field"),
558579
}
559580
}

src/worker/generated/worker.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ pub struct Metric {
159159
pub partition: ::core::option::Option<u64>,
160160
#[prost(
161161
oneof = "metric::Value",
162-
tags = "10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33"
162+
tags = "10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34"
163163
)]
164164
pub value: ::core::option::Option<metric::Value>,
165165
}
@@ -215,6 +215,8 @@ pub mod metric {
215215
CustomP95Latency(super::PercentileLatency),
216216
#[prost(message, tag = "33")]
217217
CustomP99Latency(super::PercentileLatency),
218+
#[prost(message, tag = "34")]
219+
MaxGauge(super::MaxGauge),
218220
}
219221
}
220222
/// A MetricsSet is a protobuf mirror of datafusion::physical_plan::metrics::MetricsSet. It represents
@@ -357,6 +359,13 @@ pub struct PercentileLatency {
357359
#[prost(bytes = "vec", tag = "4")]
358360
pub sketch_bytes: ::prost::alloc::vec::Vec<u8>,
359361
}
362+
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
363+
pub struct MaxGauge {
364+
#[prost(string, tag = "1")]
365+
pub name: ::prost::alloc::string::String,
366+
#[prost(uint64, tag = "2")]
367+
pub value: u64,
368+
}
360369
/// Generated client implementations.
361370
pub mod worker_service_client {
362371
#![allow(

src/worker/worker.proto

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ message Metric {
180180
PercentileLatency custom_p75_latency = 31;
181181
PercentileLatency custom_p95_latency = 32;
182182
PercentileLatency custom_p99_latency = 33;
183+
MaxGauge max_gauge = 34;
183184
}
184185
}
185186

@@ -286,3 +287,8 @@ message PercentileLatency {
286287
string name = 1;
287288
bytes sketch_bytes = 4;
288289
}
290+
291+
message MaxGauge {
292+
string name = 1;
293+
uint64 value = 2;
294+
}

0 commit comments

Comments
 (0)