Skip to content

Commit ce7fb75

Browse files
committed
Add MaxGauge metric
1 parent a246c29 commit ce7fb75

6 files changed

Lines changed: 174 additions & 8 deletions

File tree

src/lib.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@ pub use execution_plans::{
3232
};
3333
pub use metrics::{
3434
AvgLatencyMetric, BytesCounterMetric, BytesMetricExt, DISTRIBUTED_DATAFUSION_TASK_ID_LABEL,
35-
DistributedMetricsFormat, FirstLatencyMetric, LatencyMetricExt, MaxLatencyMetric,
36-
MinLatencyMetric, P50LatencyMetric, P75LatencyMetric, P95LatencyMetric, P99LatencyMetric,
37-
rewrite_distributed_plan_with_metrics,
35+
DistributedMetricsFormat, FirstLatencyMetric, GaugeMetricExt, LatencyMetricExt, MaxGaugeMetric,
36+
MaxLatencyMetric, MinLatencyMetric, P50LatencyMetric, P75LatencyMetric, P95LatencyMetric,
37+
P99LatencyMetric, rewrite_distributed_plan_with_metrics,
3838
};
3939
pub use networking::{
4040
BoxCloneSyncChannel, ChannelResolver, DefaultChannelResolver, WorkerResolver,

src/metrics/max_gauge_metric.rs

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
use datafusion::physical_plan::metrics::{CustomMetricValue, MetricBuilder, MetricValue};
2+
use std::sync::atomic::Ordering::Relaxed;
3+
use std::{
4+
any::Any,
5+
borrow::Cow,
6+
fmt::{Display, Formatter},
7+
sync::{Arc, atomic::AtomicUsize},
8+
};
9+
10+
/// Extension trait for DataFusion's metric system that adds support for a Gauge metric that
11+
/// aggregates to others using `max` instead of `sum`
12+
pub trait GaugeMetricExt {
13+
fn max_gauge(self, name: impl Into<Cow<'static, str>>) -> MaxGaugeMetric;
14+
}
15+
16+
impl GaugeMetricExt for MetricBuilder<'_> {
17+
fn max_gauge(self, name: impl Into<Cow<'static, str>>) -> MaxGaugeMetric {
18+
let value = MaxGaugeMetric::default();
19+
self.build(MetricValue::Custom {
20+
name: name.into(),
21+
value: Arc::new(value.clone()),
22+
});
23+
value
24+
}
25+
}
26+
27+
/// Similar to DataFusion's Gauge metric, but aggregates between instances using `max` instead of
28+
/// `sum`.
29+
#[derive(Debug, Clone)]
30+
pub struct MaxGaugeMetric {
31+
value: Arc<AtomicUsize>,
32+
}
33+
34+
impl Default for MaxGaugeMetric {
35+
fn default() -> Self {
36+
Self {
37+
value: Arc::new(AtomicUsize::new(usize::MIN)),
38+
}
39+
}
40+
}
41+
42+
impl MaxGaugeMetric {
43+
pub fn from_value(bytes: usize) -> Self {
44+
Self {
45+
value: Arc::new(AtomicUsize::new(bytes)),
46+
}
47+
}
48+
49+
pub fn value(&self) -> usize {
50+
self.value.load(Relaxed)
51+
}
52+
53+
pub fn set_max(&self, n: usize) {
54+
self.value.fetch_max(n, Relaxed);
55+
}
56+
}
57+
58+
impl Display for MaxGaugeMetric {
59+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
60+
write!(f, "{}", self.value())
61+
}
62+
}
63+
64+
impl CustomMetricValue for MaxGaugeMetric {
65+
fn new_empty(&self) -> Arc<dyn CustomMetricValue> {
66+
Arc::new(MaxGaugeMetric::default())
67+
}
68+
69+
fn aggregate(&self, other: Arc<dyn CustomMetricValue + 'static>) {
70+
let Some(other) = other.as_any().downcast_ref::<Self>() else {
71+
return;
72+
};
73+
self.value.fetch_max(other.value.load(Relaxed), Relaxed);
74+
}
75+
76+
fn as_any(&self) -> &dyn Any {
77+
self
78+
}
79+
80+
fn as_usize(&self) -> usize {
81+
self.value()
82+
}
83+
84+
fn is_eq(&self, other: &Arc<dyn CustomMetricValue>) -> bool {
85+
let Some(other) = other.as_any().downcast_ref::<Self>() else {
86+
return false;
87+
};
88+
other.value() == self.value()
89+
}
90+
}
91+
92+
#[cfg(test)]
93+
mod tests {
94+
use super::*;
95+
96+
#[test]
97+
fn default_is_zero_and_set_max_updates() {
98+
let m = MaxGaugeMetric::default();
99+
assert_eq!(m.value(), 0);
100+
m.set_max(1024);
101+
assert_eq!(m.value(), 1024);
102+
// Lower value should not decrease the gauge
103+
m.set_max(512);
104+
assert_eq!(m.value(), 1024);
105+
// Higher value should increase it
106+
m.set_max(2048);
107+
assert_eq!(m.value(), 2048);
108+
}
109+
110+
#[test]
111+
fn from_value_constructs_correctly() {
112+
let m = MaxGaugeMetric::from_value(1_000_000);
113+
assert_eq!(m.value(), 1_000_000);
114+
}
115+
116+
#[test]
117+
fn aggregate_takes_max() {
118+
let a = MaxGaugeMetric::from_value(500);
119+
let b = MaxGaugeMetric::from_value(300);
120+
a.aggregate(Arc::new(b));
121+
assert_eq!(a.value(), 500);
122+
123+
let a = MaxGaugeMetric::from_value(300);
124+
let b = MaxGaugeMetric::from_value(500);
125+
a.aggregate(Arc::new(b));
126+
assert_eq!(a.value(), 500);
127+
}
128+
}

src/metrics/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
mod bytes_metric;
22
mod latency_metric;
3+
mod max_gauge_metric;
34
pub(crate) mod proto;
45
mod task_metrics_collector;
56
mod task_metrics_rewriter;
@@ -9,6 +10,7 @@ pub use latency_metric::{
910
AvgLatencyMetric, FirstLatencyMetric, LatencyMetricExt, MaxLatencyMetric, MinLatencyMetric,
1011
P50LatencyMetric, P75LatencyMetric, P95LatencyMetric, P99LatencyMetric,
1112
};
13+
pub use max_gauge_metric::{GaugeMetricExt, MaxGaugeMetric};
1214
pub(crate) use task_metrics_collector::collect_plan_metrics;
1315
pub use task_metrics_rewriter::{DistributedMetricsFormat, rewrite_distributed_plan_with_metrics};
1416
/// Label used to annotate metrics in execution plan nodes with the task in which they were executed.

src/metrics/proto.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use super::latency_metric::{
1313
AvgLatencyMetric, FirstLatencyMetric, MaxLatencyMetric, MinLatencyMetric, P50LatencyMetric,
1414
P75LatencyMetric, P95LatencyMetric, P99LatencyMetric,
1515
};
16+
use crate::MaxGaugeMetric;
1617
use crate::worker::generated::worker as pb;
1718

1819
/// df_metrics_set_to_proto converts a [datafusion::physical_plan::metrics::MetricsSet] to a [pb::MetricsSet].
@@ -239,6 +240,15 @@ pub fn df_metric_to_proto(metric: Arc<Metric>) -> Result<pb::Metric, DataFusionE
239240
partition,
240241
labels,
241242
})
243+
} else if let Some(max_gauge) = value.as_any().downcast_ref::<MaxGaugeMetric>() {
244+
Ok(pb::Metric {
245+
value: Some(pb::metric::Value::CustomMaxGauge(pb::MaxGauge {
246+
name: name.to_string(),
247+
value: max_gauge.value() as u64,
248+
})),
249+
partition,
250+
labels,
251+
})
242252
} else {
243253
internal_err!("{}", CUSTOM_METRICS_NOT_SUPPORTED)
244254
}
@@ -554,6 +564,17 @@ pub fn metric_proto_to_df(metric: pb::Metric) -> Result<Arc<Metric>, DataFusionE
554564
labels,
555565
)))
556566
}
567+
Some(pb::metric::Value::CustomMaxGauge(gauge)) => {
568+
let value = MaxGaugeMetric::from_value(gauge.value as usize);
569+
Ok(Arc::new(Metric::new_with_labels(
570+
MetricValue::Custom {
571+
name: Cow::Owned(gauge.name),
572+
value: Arc::new(value),
573+
},
574+
partition,
575+
labels,
576+
)))
577+
}
557578
None => internal_err!("proto metric is missing the metric field"),
558579
}
559580
}

src/worker/generated/worker.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -107,16 +107,16 @@ pub struct WorkUnit {
107107
/// Arbitrary user-defined data (e.g., a file address) necessary during execution.
108108
#[prost(bytes = "vec", tag = "3")]
109109
pub body: ::prost::alloc::vec::Vec<u8>,
110-
/// Unix timestamp in nanoseconds at which this message was created.
110+
/// Unix timestamp in nanoseconds at which this message was created in the coordinator.
111111
#[prost(uint64, tag = "4")]
112112
pub created_timestamp_unix_nanos: u64,
113-
/// Unix timestamp in nanoseconds at which this message was sent.
113+
/// Unix timestamp in nanoseconds at which this message was sent by the coordinator.
114114
#[prost(uint64, tag = "5")]
115115
pub sent_timestamp_unix_nanos: u64,
116-
/// Unix timestamp in nanoseconds at which this message was received.
116+
/// Unix timestamp in nanoseconds at which this message was received by a worker.
117117
#[prost(uint64, tag = "6")]
118118
pub received_timestamp_unix_nanos: u64,
119-
/// Unix timestamp in nanoseconds at which this message was processed.
119+
/// Unix timestamp in nanoseconds at which this message started being processed.
120120
#[prost(uint64, tag = "7")]
121121
pub processed_timestamp_unix_nanos: u64,
122122
}
@@ -171,7 +171,7 @@ pub struct Metric {
171171
pub partition: ::core::option::Option<u64>,
172172
#[prost(
173173
oneof = "metric::Value",
174-
tags = "10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33"
174+
tags = "10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34"
175175
)]
176176
pub value: ::core::option::Option<metric::Value>,
177177
}
@@ -227,6 +227,8 @@ pub mod metric {
227227
CustomP95Latency(super::PercentileLatency),
228228
#[prost(message, tag = "33")]
229229
CustomP99Latency(super::PercentileLatency),
230+
#[prost(message, tag = "34")]
231+
CustomMaxGauge(super::MaxGauge),
230232
}
231233
}
232234
/// A MetricsSet is a protobuf mirror of datafusion::physical_plan::metrics::MetricsSet. It represents
@@ -369,6 +371,13 @@ pub struct PercentileLatency {
369371
#[prost(bytes = "vec", tag = "4")]
370372
pub sketch_bytes: ::prost::alloc::vec::Vec<u8>,
371373
}
374+
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
375+
pub struct MaxGauge {
376+
#[prost(string, tag = "1")]
377+
pub name: ::prost::alloc::string::String,
378+
#[prost(uint64, tag = "2")]
379+
pub value: u64,
380+
}
372381
/// Generated client implementations.
373382
pub mod worker_service_client {
374383
#![allow(

src/worker/worker.proto

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ message Metric {
188188
PercentileLatency custom_p75_latency = 31;
189189
PercentileLatency custom_p95_latency = 32;
190190
PercentileLatency custom_p99_latency = 33;
191+
MaxGauge custom_max_gauge = 34;
191192
}
192193
}
193194

@@ -294,3 +295,8 @@ message PercentileLatency {
294295
string name = 1;
295296
bytes sketch_bytes = 4;
296297
}
298+
299+
message MaxGauge {
300+
string name = 1;
301+
uint64 value = 2;
302+
}

0 commit comments

Comments
 (0)