Skip to content

Commit 3a4b942

Browse files
committed
Optimize metric label cloning
1 parent c8b784a commit 3a4b942

3 files changed

Lines changed: 117 additions & 45 deletions

File tree

datafusion/datasource-parquet/src/metrics.rs

Lines changed: 40 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
// under the License.
1717

1818
use datafusion_physical_plan::metrics::{
19-
Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricCategory, MetricType,
20-
PruningMetrics, RatioMergeStrategy, RatioMetrics, Time,
19+
Count, ExecutionPlanMetricsSet, Gauge, Label, MetricBuilder, MetricCategory,
20+
MetricType, PruningMetrics, RatioMergeStrategy, RatioMetrics, Time,
2121
};
2222

2323
/// Stores metrics about the parquet execution for a particular parquet file.
@@ -100,46 +100,51 @@ impl ParquetFileMetrics {
100100
filename: &str,
101101
metrics: &ExecutionPlanMetricsSet,
102102
) -> Self {
103+
// Share the filename label across all per-file metrics to avoid
104+
// allocating the same filename string for each metric.
105+
let filename_label = Label::new("filename", filename.to_string());
106+
let builder = MetricBuilder::new(metrics).with_label(filename_label);
107+
103108
// -----------------------
104109
// 'summary' level metrics
105110
// -----------------------
106-
let row_groups_pruned_bloom_filter = MetricBuilder::new(metrics)
107-
.with_new_label("filename", filename.to_string())
111+
let row_groups_pruned_bloom_filter = builder
112+
.clone()
108113
.with_type(MetricType::Summary)
109114
.pruning_metrics("row_groups_pruned_bloom_filter", partition);
110115

111-
let limit_pruned_row_groups = MetricBuilder::new(metrics)
112-
.with_new_label("filename", filename.to_string())
116+
let limit_pruned_row_groups = builder
117+
.clone()
113118
.with_type(MetricType::Summary)
114119
.pruning_metrics("limit_pruned_row_groups", partition);
115120

116-
let row_groups_pruned_statistics = MetricBuilder::new(metrics)
117-
.with_new_label("filename", filename.to_string())
121+
let row_groups_pruned_statistics = builder
122+
.clone()
118123
.with_type(MetricType::Summary)
119124
.pruning_metrics("row_groups_pruned_statistics", partition);
120125

121-
let page_index_pages_pruned = MetricBuilder::new(metrics)
122-
.with_new_label("filename", filename.to_string())
126+
let page_index_pages_pruned = builder
127+
.clone()
123128
.with_type(MetricType::Summary)
124129
.pruning_metrics("page_index_pages_pruned", partition);
125130

126-
let bytes_scanned = MetricBuilder::new(metrics)
127-
.with_new_label("filename", filename.to_string())
131+
let bytes_scanned = builder
132+
.clone()
128133
.with_type(MetricType::Summary)
129134
.with_category(MetricCategory::Bytes)
130135
.counter("bytes_scanned", partition);
131136

132-
let metadata_load_time = MetricBuilder::new(metrics)
133-
.with_new_label("filename", filename.to_string())
137+
let metadata_load_time = builder
138+
.clone()
134139
.with_type(MetricType::Summary)
135140
.subset_time("metadata_load_time", partition);
136141

137142
let files_ranges_pruned_statistics = MetricBuilder::new(metrics)
138143
.with_type(MetricType::Summary)
139144
.pruning_metrics("files_ranges_pruned_statistics", partition);
140145

141-
let scan_efficiency_ratio = MetricBuilder::new(metrics)
142-
.with_new_label("filename", filename.to_string())
146+
let scan_efficiency_ratio = builder
147+
.clone()
143148
.with_type(MetricType::Summary)
144149
.ratio_metrics_with_strategy(
145150
"scan_efficiency_ratio",
@@ -150,45 +155,44 @@ impl ParquetFileMetrics {
150155
// -----------------------
151156
// 'dev' level metrics
152157
// -----------------------
153-
let predicate_evaluation_errors = MetricBuilder::new(metrics)
154-
.with_new_label("filename", filename.to_string())
158+
let predicate_evaluation_errors = builder
159+
.clone()
155160
.with_category(MetricCategory::Rows)
156161
.counter("predicate_evaluation_errors", partition);
157162

158-
let pushdown_rows_pruned = MetricBuilder::new(metrics)
159-
.with_new_label("filename", filename.to_string())
163+
let pushdown_rows_pruned = builder
164+
.clone()
160165
.with_category(MetricCategory::Rows)
161166
.counter("pushdown_rows_pruned", partition);
162-
let pushdown_rows_matched = MetricBuilder::new(metrics)
163-
.with_new_label("filename", filename.to_string())
167+
let pushdown_rows_matched = builder
168+
.clone()
164169
.with_category(MetricCategory::Rows)
165170
.counter("pushdown_rows_matched", partition);
166171

167-
let row_pushdown_eval_time = MetricBuilder::new(metrics)
168-
.with_new_label("filename", filename.to_string())
172+
let row_pushdown_eval_time = builder
173+
.clone()
169174
.subset_time("row_pushdown_eval_time", partition);
170-
let statistics_eval_time = MetricBuilder::new(metrics)
171-
.with_new_label("filename", filename.to_string())
175+
let statistics_eval_time = builder
176+
.clone()
172177
.subset_time("statistics_eval_time", partition);
173-
let bloom_filter_eval_time = MetricBuilder::new(metrics)
174-
.with_new_label("filename", filename.to_string())
178+
let bloom_filter_eval_time = builder
179+
.clone()
175180
.subset_time("bloom_filter_eval_time", partition);
176181

177-
let page_index_eval_time = MetricBuilder::new(metrics)
178-
.with_new_label("filename", filename.to_string())
182+
let page_index_eval_time = builder
183+
.clone()
179184
.subset_time("page_index_eval_time", partition);
180185

181-
let page_index_rows_pruned = MetricBuilder::new(metrics)
182-
.with_new_label("filename", filename.to_string())
186+
let page_index_rows_pruned = builder
187+
.clone()
183188
.pruning_metrics("page_index_rows_pruned", partition);
184189

185-
let predicate_cache_inner_records = MetricBuilder::new(metrics)
186-
.with_new_label("filename", filename.to_string())
190+
let predicate_cache_inner_records = builder
191+
.clone()
187192
.with_category(MetricCategory::Rows)
188193
.gauge("predicate_cache_inner_records", partition);
189194

190-
let predicate_cache_records = MetricBuilder::new(metrics)
191-
.with_new_label("filename", filename.to_string())
195+
let predicate_cache_records = builder
192196
.with_category(MetricCategory::Rows)
193197
.gauge("predicate_cache_records", partition);
194198

datafusion/physical-expr-common/src/metrics/builder.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ use super::{
3131
/// Structure for constructing metrics, counters, timers, etc.
3232
///
3333
/// Note the use of `Cow<..>` is to avoid allocations in the common
34-
/// case of constant strings
34+
/// case of constant strings. Dynamically created label strings are shared when
35+
/// [`Label`] values are cloned.
3536
///
3637
/// ```rust
3738
/// use datafusion_physical_expr_common::metrics::*;
@@ -47,6 +48,7 @@ use super::{
4748
/// .with_new_label("filename", "my_awesome_file.parquet")
4849
/// .counter("num_bytes", partition);
4950
/// ```
51+
#[derive(Clone)]
5052
pub struct MetricBuilder<'a> {
5153
/// Location that the metric created by this builder will be added do
5254
metrics: &'a ExecutionPlanMetricsSet,

datafusion/physical-expr-common/src/metrics/mod.rs

Lines changed: 74 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use parking_lot::Mutex;
3030
use std::{
3131
borrow::Cow,
3232
fmt::{self, Debug, Display},
33+
hash::{Hash, Hasher},
3334
sync::Arc,
3435
vec::IntoIter,
3536
};
@@ -519,12 +520,14 @@ impl From<MetricsSet> for ExecutionPlanMetricsSet {
519520
/// telemetry]<https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/data-model.md>,
520521
/// etc.
521522
///
522-
/// As the name and value are expected to mostly be constant strings,
523-
/// use a [`Cow`] to avoid copying / allocations in this common case.
523+
/// As the name and value are expected to often be constant strings, borrowed
524+
/// static strings avoid allocations in that common case. Dynamic strings are
525+
/// stored behind [`Arc<str>`] so cloning labels does not copy the underlying
526+
/// string data.
524527
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
525528
pub struct Label {
526-
name: Cow<'static, str>,
527-
value: Cow<'static, str>,
529+
name: LabelValue,
530+
value: LabelValue,
528531
}
529532

530533
impl Label {
@@ -533,19 +536,19 @@ impl Label {
533536
name: impl Into<Cow<'static, str>>,
534537
value: impl Into<Cow<'static, str>>,
535538
) -> Self {
536-
let name = name.into();
537-
let value = value.into();
539+
let name = LabelValue::from(name.into());
540+
let value = LabelValue::from(value.into());
538541
Self { name, value }
539542
}
540543

541544
/// Returns the name of this label
542545
pub fn name(&self) -> &str {
543-
self.name.as_ref()
546+
self.name.as_str()
544547
}
545548

546549
/// Returns the value of this label
547550
pub fn value(&self) -> &str {
548-
self.value.as_ref()
551+
self.value.as_str()
549552
}
550553
}
551554

@@ -555,6 +558,60 @@ impl Display for Label {
555558
}
556559
}
557560

561+
/// Internal representation for label names and values.
562+
///
563+
/// `Static` preserves the existing allocation-free path for string literals.
564+
/// `Shared` stores dynamic strings behind [`Arc<str>`], so cloning a [`Label`]
565+
/// only increments an atomic reference count and does not allocate or copy the
566+
/// underlying string data.
567+
#[derive(Clone, Eq)]
568+
enum LabelValue {
569+
Static(&'static str),
570+
Shared(Arc<str>),
571+
}
572+
573+
impl LabelValue {
574+
fn as_str(&self) -> &str {
575+
match self {
576+
Self::Static(value) => value,
577+
Self::Shared(value) => value.as_ref(),
578+
}
579+
}
580+
}
581+
582+
impl From<Cow<'static, str>> for LabelValue {
583+
fn from(value: Cow<'static, str>) -> Self {
584+
match value {
585+
Cow::Borrowed(value) => Self::Static(value),
586+
Cow::Owned(value) => Self::Shared(Arc::from(value)),
587+
}
588+
}
589+
}
590+
591+
impl PartialEq for LabelValue {
592+
fn eq(&self, other: &Self) -> bool {
593+
self.as_str() == other.as_str()
594+
}
595+
}
596+
597+
impl Hash for LabelValue {
598+
fn hash<H: Hasher>(&self, state: &mut H) {
599+
self.as_str().hash(state);
600+
}
601+
}
602+
603+
impl Debug for LabelValue {
604+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
605+
Debug::fmt(self.as_str(), f)
606+
}
607+
}
608+
609+
impl Display for LabelValue {
610+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
611+
Display::fmt(self.as_str(), f)
612+
}
613+
}
614+
558615
#[cfg(test)]
559616
mod tests {
560617
use std::time::Duration;
@@ -609,6 +666,15 @@ mod tests {
609666
assert_eq!("output_rows{partition=2, foo=bar}=66", metric.to_string())
610667
}
611668

669+
#[test]
670+
fn test_label_owned_and_borrowed_values_are_equal() {
671+
let borrowed = Label::new("foo", "bar");
672+
let owned = Label::new("foo".to_string(), "bar".to_string());
673+
674+
assert_eq!(borrowed, owned);
675+
assert_eq!(borrowed.to_string(), owned.to_string());
676+
}
677+
612678
#[test]
613679
fn test_output_rows() {
614680
let metrics = ExecutionPlanMetricsSet::new();

0 commit comments

Comments
 (0)