Skip to content

Commit d2745b8

Browse files
authored
feat: Support defining custom MetricValues in PhysicalPlans (apache#16195) (#30)
1 parent ca48c4d commit d2745b8

3 files changed

Lines changed: 296 additions & 7 deletions

File tree

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Custom metric value type.
19+
20+
use std::{any::Any, fmt::Debug, fmt::Display, sync::Arc};
21+
22+
/// A trait for implementing custom metric values.
23+
///
24+
/// This trait enables defining application- or operator-specific metric types
25+
/// that can be aggregated and displayed alongside standard metrics. These
26+
/// custom metrics integrate with [`MetricValue::Custom`] and support
27+
/// aggregation logic, introspection, and optional numeric representation.
28+
///
29+
/// # Requirements
30+
/// Implementations of `CustomMetricValue` must satisfy the following:
31+
///
32+
/// 1. [`Self::aggregate`]: Defines how two metric values are combined
33+
/// 2. [`Self::new_empty`]: Returns a new, zero-value instance for accumulation
34+
/// 3. [`Self::as_any`]: Enables dynamic downcasting for type-specific operations
35+
/// 4. [`Self::as_usize`]: Optionally maps the value to a `usize` (for sorting, display, etc.)
36+
/// 5. [`Self::is_eq`]: Implements comparison between two values, this isn't reusing the std
37+
/// PartialEq trait because this trait is used dynamically in the context of
38+
/// [`MetricValue::Custom`]
39+
///
40+
/// # Examples
41+
/// ```
42+
/// # use std::sync::Arc;
43+
/// # use std::fmt::{Debug, Display};
44+
/// # use std::any::Any;
45+
/// # use std::sync::atomic::{AtomicUsize, Ordering};
46+
///
47+
/// # use datafusion_physical_plan::metrics::CustomMetricValue;
48+
///
49+
/// #[derive(Debug, Default)]
50+
/// struct MyCounter {
51+
/// count: AtomicUsize,
52+
/// }
53+
///
54+
/// impl Display for MyCounter {
55+
/// fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
56+
/// write!(f, "count: {}", self.count.load(Ordering::Relaxed))
57+
/// }
58+
/// }
59+
///
60+
/// impl CustomMetricValue for MyCounter {
61+
/// fn new_empty(&self) -> Arc<dyn CustomMetricValue> {
62+
/// Arc::new(Self::default())
63+
/// }
64+
///
65+
/// fn aggregate(&self, other: Arc<dyn CustomMetricValue>) {
66+
/// let other = other.as_any().downcast_ref::<Self>().unwrap();
67+
/// self.count.fetch_add(other.count.load(Ordering::Relaxed), Ordering::Relaxed);
68+
/// }
69+
///
70+
/// fn as_any(&self) -> &dyn Any {
71+
/// self
72+
/// }
73+
///
74+
/// fn as_usize(&self) -> usize {
75+
/// self.count.load(Ordering::Relaxed)
76+
/// }
77+
///
78+
/// fn is_eq(&self, other: &Arc<dyn CustomMetricValue>) -> bool {
79+
/// let Some(other) = other.as_any().downcast_ref::<Self>() else {
80+
/// return false;
81+
/// };
82+
///
83+
/// self.count.load(Ordering::Relaxed) == other.count.load(Ordering::Relaxed)
84+
/// }
85+
/// }
86+
/// ```
87+
///
88+
/// [`MetricValue::Custom`]: super::MetricValue::Custom
89+
pub trait CustomMetricValue: Display + Debug + Send + Sync {
90+
/// Returns a new, zero-initialized version of this metric value.
91+
///
92+
/// This value is used during metric aggregation to accumulate results.
93+
fn new_empty(&self) -> Arc<dyn CustomMetricValue>;
94+
95+
/// Merges another metric value into this one.
96+
///
97+
/// The type of `other` could be of a different custom type as long as it's aggregatable into self.
98+
fn aggregate(&self, other: Arc<dyn CustomMetricValue + 'static>);
99+
100+
/// Returns this value as a [`Any`] to support dynamic downcasting.
101+
fn as_any(&self) -> &dyn Any;
102+
103+
/// Optionally returns a numeric representation of the value, if meaningful.
104+
/// Otherwise will default to zero.
105+
///
106+
/// This is used for sorting and summarizing metrics.
107+
fn as_usize(&self) -> usize {
108+
0
109+
}
110+
111+
/// Compares this value with another custom value.
112+
fn is_eq(&self, other: &Arc<dyn CustomMetricValue>) -> bool;
113+
}

datafusion/physical-plan/src/metrics/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
2020
mod baseline;
2121
mod builder;
22+
mod custom;
2223
mod value;
2324

2425
use parking_lot::Mutex;
@@ -33,6 +34,7 @@ use datafusion_common::HashMap;
3334
// public exports
3435
pub use baseline::{BaselineMetrics, RecordOutput, SpillMetrics};
3536
pub use builder::MetricBuilder;
37+
pub use custom::CustomMetricValue;
3638
pub use value::{Count, Gauge, MetricValue, ScopedTimerGuard, Time, Timestamp};
3739

3840
/// Something that tracks a value of interest (metric) of a DataFusion
@@ -263,6 +265,7 @@ impl MetricsSet {
263265
MetricValue::Gauge { name, .. } => name == metric_name,
264266
MetricValue::StartTimestamp(_) => false,
265267
MetricValue::EndTimestamp(_) => false,
268+
MetricValue::Custom { .. } => false,
266269
})
267270
}
268271

0 commit comments

Comments
 (0)