diff --git a/datafusion/datasource-parquet/src/metrics.rs b/datafusion/datasource-parquet/src/metrics.rs index 262dde024a527..4bf009afd6d63 100644 --- a/datafusion/datasource-parquet/src/metrics.rs +++ b/datafusion/datasource-parquet/src/metrics.rs @@ -15,9 +15,11 @@ // specific language governing permissions and limitations // under the License. +use std::sync::Arc; + use datafusion_physical_plan::metrics::{ - Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricCategory, MetricType, - PruningMetrics, RatioMergeStrategy, RatioMetrics, Time, + Count, ExecutionPlanMetricsSet, Gauge, Label, MetricBuilder, MetricCategory, + MetricType, PruningMetrics, RatioMergeStrategy, RatioMetrics, Time, }; /// Stores metrics about the parquet execution for a particular parquet file. @@ -100,37 +102,42 @@ impl ParquetFileMetrics { filename: &str, metrics: &ExecutionPlanMetricsSet, ) -> Self { + // Share the filename label across all per-file metrics to avoid + // allocating the same filename string for each metric. + let filename_label = Label::new("filename", Arc::::from(filename)); + let builder = MetricBuilder::new(metrics).with_label(filename_label); + // ----------------------- // 'summary' level metrics // ----------------------- - let row_groups_pruned_bloom_filter = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) + let row_groups_pruned_bloom_filter = builder + .clone() .with_type(MetricType::Summary) .pruning_metrics("row_groups_pruned_bloom_filter", partition); - let limit_pruned_row_groups = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) + let limit_pruned_row_groups = builder + .clone() .with_type(MetricType::Summary) .pruning_metrics("limit_pruned_row_groups", partition); - let row_groups_pruned_statistics = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) + let row_groups_pruned_statistics = builder + .clone() .with_type(MetricType::Summary) .pruning_metrics("row_groups_pruned_statistics", partition); - let page_index_pages_pruned = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) + let page_index_pages_pruned = builder + .clone() .with_type(MetricType::Summary) .pruning_metrics("page_index_pages_pruned", partition); - let bytes_scanned = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) + let bytes_scanned = builder + .clone() .with_type(MetricType::Summary) .with_category(MetricCategory::Bytes) .counter("bytes_scanned", partition); - let metadata_load_time = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) + let metadata_load_time = builder + .clone() .with_type(MetricType::Summary) .subset_time("metadata_load_time", partition); @@ -138,8 +145,8 @@ impl ParquetFileMetrics { .with_type(MetricType::Summary) .pruning_metrics("files_ranges_pruned_statistics", partition); - let scan_efficiency_ratio = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) + let scan_efficiency_ratio = builder + .clone() .with_type(MetricType::Summary) .ratio_metrics_with_strategy( "scan_efficiency_ratio", @@ -150,45 +157,44 @@ impl ParquetFileMetrics { // ----------------------- // 'dev' level metrics // ----------------------- - let predicate_evaluation_errors = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) + let predicate_evaluation_errors = builder + .clone() .with_category(MetricCategory::Rows) .counter("predicate_evaluation_errors", partition); - let pushdown_rows_pruned = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) + let pushdown_rows_pruned = builder + .clone() .with_category(MetricCategory::Rows) .counter("pushdown_rows_pruned", partition); - let pushdown_rows_matched = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) + let pushdown_rows_matched = builder + .clone() .with_category(MetricCategory::Rows) .counter("pushdown_rows_matched", partition); - let row_pushdown_eval_time = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) + let row_pushdown_eval_time = builder + .clone() .subset_time("row_pushdown_eval_time", partition); - let statistics_eval_time = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) + let statistics_eval_time = builder + .clone() .subset_time("statistics_eval_time", partition); - let bloom_filter_eval_time = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) + let bloom_filter_eval_time = builder + .clone() .subset_time("bloom_filter_eval_time", partition); - let page_index_eval_time = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) + let page_index_eval_time = builder + .clone() .subset_time("page_index_eval_time", partition); - let page_index_rows_pruned = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) + let page_index_rows_pruned = builder + .clone() .pruning_metrics("page_index_rows_pruned", partition); - let predicate_cache_inner_records = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) + let predicate_cache_inner_records = builder + .clone() .with_category(MetricCategory::Rows) .gauge("predicate_cache_inner_records", partition); - let predicate_cache_records = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) + let predicate_cache_records = builder .with_category(MetricCategory::Rows) .gauge("predicate_cache_records", partition); diff --git a/datafusion/physical-expr-common/src/metrics/builder.rs b/datafusion/physical-expr-common/src/metrics/builder.rs index e9c0b76af2582..de9d1e03d88df 100644 --- a/datafusion/physical-expr-common/src/metrics/builder.rs +++ b/datafusion/physical-expr-common/src/metrics/builder.rs @@ -25,13 +25,15 @@ use crate::metrics::{ }; use super::{ - Count, ExecutionPlanMetricsSet, Gauge, Label, Metric, MetricValue, Time, Timestamp, + Count, ExecutionPlanMetricsSet, Gauge, Label, LabelValue, Metric, MetricValue, Time, + Timestamp, }; /// Structure for constructing metrics, counters, timers, etc. /// /// Note the use of `Cow<..>` is to avoid allocations in the common -/// case of constant strings +/// case of constant strings. Dynamically created label strings are shared when +/// [`Label`] values are cloned. /// /// ```rust /// use datafusion_physical_expr_common::metrics::*; @@ -47,6 +49,7 @@ use super::{ /// .with_new_label("filename", "my_awesome_file.parquet") /// .counter("num_bytes", partition); /// ``` +#[derive(Clone)] pub struct MetricBuilder<'a> { /// Location that the metric created by this builder will be added do metrics: &'a ExecutionPlanMetricsSet, @@ -108,7 +111,10 @@ impl<'a> MetricBuilder<'a> { name: impl Into>, value: impl Into>, ) -> Self { - self.with_label(Label::new(name.into(), value.into())) + self.with_label(Label::new( + LabelValue::from(name.into()), + LabelValue::from(value.into()), + )) } /// Set the partition of the metric being constructed diff --git a/datafusion/physical-expr-common/src/metrics/mod.rs b/datafusion/physical-expr-common/src/metrics/mod.rs index eecd8cfabd5eb..0a03075b91094 100644 --- a/datafusion/physical-expr-common/src/metrics/mod.rs +++ b/datafusion/physical-expr-common/src/metrics/mod.rs @@ -30,6 +30,7 @@ use parking_lot::Mutex; use std::{ borrow::Cow, fmt::{self, Debug, Display}, + hash::{Hash, Hasher}, sync::Arc, vec::IntoIter, }; @@ -519,20 +520,19 @@ impl From for ExecutionPlanMetricsSet { /// telemetry], /// etc. /// -/// As the name and value are expected to mostly be constant strings, -/// use a [`Cow`] to avoid copying / allocations in this common case. +/// As the name and value are expected to often be constant strings, borrowed +/// static strings avoid allocations in that common case. Dynamic strings are +/// stored behind [`Arc`] so cloning labels does not copy the underlying +/// string data. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct Label { - name: Cow<'static, str>, - value: Cow<'static, str>, + name: LabelValue, + value: LabelValue, } impl Label { /// Create a new [`Label`] - pub fn new( - name: impl Into>, - value: impl Into>, - ) -> Self { + pub fn new(name: impl Into, value: impl Into) -> Self { let name = name.into(); let value = value.into(); Self { name, value } @@ -540,12 +540,12 @@ impl Label { /// Returns the name of this label pub fn name(&self) -> &str { - self.name.as_ref() + self.name.as_str() } /// Returns the value of this label pub fn value(&self) -> &str { - self.value.as_ref() + self.value.as_str() } } @@ -555,6 +555,89 @@ impl Display for Label { } } +/// A label name or value. +/// +/// String literals preserve the existing allocation-free path. Dynamic strings +/// can be stored behind [`Arc`], so cloning a [`Label`] only increments an +/// atomic reference count and does not allocate or copy the underlying string +/// data. +#[derive(Clone)] +pub struct LabelValue(LabelValueInner); + +/// Internal representation for label names and values. +/// +/// `LabelValue` is public because `Label::new` accepts it, but these storage +/// variants are implementation details. Keeping them private prevents external +/// code from constructing or matching on `Static` and `Shared` directly. +#[derive(Clone)] +enum LabelValueInner { + Static(&'static str), + Shared(Arc), +} + +impl LabelValue { + /// Return this label value as a string slice. + pub fn as_str(&self) -> &str { + match &self.0 { + LabelValueInner::Static(value) => value, + LabelValueInner::Shared(value) => value.as_ref(), + } + } +} + +impl From<&'static str> for LabelValue { + fn from(value: &'static str) -> Self { + Self(LabelValueInner::Static(value)) + } +} + +impl From for LabelValue { + fn from(value: String) -> Self { + Self(LabelValueInner::Shared(Arc::from(value))) + } +} + +impl From> for LabelValue { + fn from(value: Arc) -> Self { + Self(LabelValueInner::Shared(value)) + } +} + +impl From> for LabelValue { + fn from(value: Cow<'static, str>) -> Self { + match value { + Cow::Borrowed(value) => value.into(), + Cow::Owned(value) => value.into(), + } + } +} + +impl PartialEq for LabelValue { + fn eq(&self, other: &Self) -> bool { + self.as_str() == other.as_str() + } +} + +impl Eq for LabelValue {} + +impl Hash for LabelValue { + fn hash(&self, state: &mut H) { + self.as_str().hash(state); + } +} + +impl Debug for LabelValue { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + Debug::fmt(self.as_str(), f) + } +} + +impl Display for LabelValue { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + Display::fmt(self.as_str(), f) + } +} + #[cfg(test)] mod tests { use std::time::Duration; @@ -609,6 +692,18 @@ mod tests { assert_eq!("output_rows{partition=2, foo=bar}=66", metric.to_string()) } + #[test] + fn test_label_owned_and_borrowed_values_are_equal() { + let borrowed = Label::new("foo", "bar"); + let owned = Label::new("foo".to_string(), "bar".to_string()); + let shared = Label::new("foo", Arc::::from("bar")); + + assert_eq!(borrowed, owned); + assert_eq!(borrowed, shared); + assert_eq!(borrowed.to_string(), owned.to_string()); + assert_eq!(borrowed.to_string(), shared.to_string()); + } + #[test] fn test_output_rows() { let metrics = ExecutionPlanMetricsSet::new();