Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 42 additions & 36 deletions datafusion/datasource-parquet/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use datafusion_physical_plan::metrics::{
Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricCategory, MetricType,
PruningMetrics, RatioMergeStrategy, RatioMetrics, Time,
Count, ExecutionPlanMetricsSet, Gauge, Label, MetricBuilder, MetricCategory,
MetricType, PruningMetrics, RatioMergeStrategy, RatioMetrics, Time,
};

/// Stores metrics about the parquet execution for a particular parquet file.
Expand Down Expand Up @@ -100,46 +102,51 @@ impl ParquetFileMetrics {
filename: &str,
metrics: &ExecutionPlanMetricsSet,
) -> Self {
// Share the filename label across all per-file metrics to avoid
// allocating the same filename string for each metric.
let filename_label = Label::new("filename", Arc::<str>::from(filename));
let builder = MetricBuilder::new(metrics).with_label(filename_label);

// -----------------------
// 'summary' level metrics
// -----------------------
let row_groups_pruned_bloom_filter = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
let row_groups_pruned_bloom_filter = builder
.clone()
.with_type(MetricType::Summary)
.pruning_metrics("row_groups_pruned_bloom_filter", partition);

let limit_pruned_row_groups = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
let limit_pruned_row_groups = builder
.clone()
.with_type(MetricType::Summary)
.pruning_metrics("limit_pruned_row_groups", partition);

let row_groups_pruned_statistics = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
let row_groups_pruned_statistics = builder
.clone()
.with_type(MetricType::Summary)
.pruning_metrics("row_groups_pruned_statistics", partition);

let page_index_pages_pruned = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
let page_index_pages_pruned = builder
.clone()
.with_type(MetricType::Summary)
.pruning_metrics("page_index_pages_pruned", partition);

let bytes_scanned = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
let bytes_scanned = builder
.clone()
.with_type(MetricType::Summary)
.with_category(MetricCategory::Bytes)
.counter("bytes_scanned", partition);

let metadata_load_time = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
let metadata_load_time = builder
.clone()
.with_type(MetricType::Summary)
.subset_time("metadata_load_time", partition);

let files_ranges_pruned_statistics = MetricBuilder::new(metrics)
.with_type(MetricType::Summary)
.pruning_metrics("files_ranges_pruned_statistics", partition);

let scan_efficiency_ratio = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
let scan_efficiency_ratio = builder
.clone()
.with_type(MetricType::Summary)
.ratio_metrics_with_strategy(
"scan_efficiency_ratio",
Expand All @@ -150,45 +157,44 @@ impl ParquetFileMetrics {
// -----------------------
// 'dev' level metrics
// -----------------------
let predicate_evaluation_errors = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
let predicate_evaluation_errors = builder
.clone()
.with_category(MetricCategory::Rows)
.counter("predicate_evaluation_errors", partition);

let pushdown_rows_pruned = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
let pushdown_rows_pruned = builder
.clone()
.with_category(MetricCategory::Rows)
.counter("pushdown_rows_pruned", partition);
let pushdown_rows_matched = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
let pushdown_rows_matched = builder
.clone()
.with_category(MetricCategory::Rows)
.counter("pushdown_rows_matched", partition);

let row_pushdown_eval_time = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
let row_pushdown_eval_time = builder
.clone()
.subset_time("row_pushdown_eval_time", partition);
let statistics_eval_time = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
let statistics_eval_time = builder
.clone()
.subset_time("statistics_eval_time", partition);
let bloom_filter_eval_time = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
let bloom_filter_eval_time = builder
.clone()
.subset_time("bloom_filter_eval_time", partition);

let page_index_eval_time = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
let page_index_eval_time = builder
.clone()
.subset_time("page_index_eval_time", partition);

let page_index_rows_pruned = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
let page_index_rows_pruned = builder
.clone()
.pruning_metrics("page_index_rows_pruned", partition);

let predicate_cache_inner_records = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
let predicate_cache_inner_records = builder
.clone()
.with_category(MetricCategory::Rows)
.gauge("predicate_cache_inner_records", partition);

let predicate_cache_records = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
let predicate_cache_records = builder
.with_category(MetricCategory::Rows)
.gauge("predicate_cache_records", partition);

Expand Down
12 changes: 9 additions & 3 deletions datafusion/physical-expr-common/src/metrics/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ use crate::metrics::{
};

use super::{
Count, ExecutionPlanMetricsSet, Gauge, Label, Metric, MetricValue, Time, Timestamp,
Count, ExecutionPlanMetricsSet, Gauge, Label, LabelValue, Metric, MetricValue, Time,
Timestamp,
};

/// Structure for constructing metrics, counters, timers, etc.
///
/// Note the use of `Cow<..>` is to avoid allocations in the common
/// case of constant strings
/// case of constant strings. Dynamically created label strings are shared when
/// [`Label`] values are cloned.
///
/// ```rust
/// use datafusion_physical_expr_common::metrics::*;
Expand All @@ -47,6 +49,7 @@ use super::{
/// .with_new_label("filename", "my_awesome_file.parquet")
/// .counter("num_bytes", partition);
/// ```
#[derive(Clone)]
pub struct MetricBuilder<'a> {
/// Location that the metric created by this builder will be added do
metrics: &'a ExecutionPlanMetricsSet,
Expand Down Expand Up @@ -108,7 +111,10 @@ impl<'a> MetricBuilder<'a> {
name: impl Into<Cow<'static, str>>,
value: impl Into<Cow<'static, str>>,
) -> Self {
self.with_label(Label::new(name.into(), value.into()))
self.with_label(Label::new(
LabelValue::from(name.into()),
LabelValue::from(value.into()),
))
}

/// Set the partition of the metric being constructed
Expand Down
115 changes: 105 additions & 10 deletions datafusion/physical-expr-common/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use parking_lot::Mutex;
use std::{
borrow::Cow,
fmt::{self, Debug, Display},
hash::{Hash, Hasher},
sync::Arc,
vec::IntoIter,
};
Expand Down Expand Up @@ -519,33 +520,32 @@ impl From<MetricsSet> for ExecutionPlanMetricsSet {
/// telemetry]<https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/data-model.md>,
/// etc.
///
/// As the name and value are expected to mostly be constant strings,
/// use a [`Cow`] to avoid copying / allocations in this common case.
/// As the name and value are expected to often be constant strings, borrowed
/// static strings avoid allocations in that common case. Dynamic strings are
/// stored behind [`Arc<str>`] so cloning labels does not copy the underlying
/// string data.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Label {
name: Cow<'static, str>,
value: Cow<'static, str>,
name: LabelValue,
value: LabelValue,
}

impl Label {
/// Create a new [`Label`]
pub fn new(
name: impl Into<Cow<'static, str>>,
value: impl Into<Cow<'static, str>>,
) -> Self {
pub fn new(name: impl Into<LabelValue>, value: impl Into<LabelValue>) -> Self {
let name = name.into();
let value = value.into();
Self { name, value }
}

/// Returns the name of this label
pub fn name(&self) -> &str {
self.name.as_ref()
self.name.as_str()
}

/// Returns the value of this label
pub fn value(&self) -> &str {
self.value.as_ref()
self.value.as_str()
}
}

Expand All @@ -555,6 +555,89 @@ impl Display for Label {
}
}

/// A label name or value.
///
/// String literals preserve the existing allocation-free path. Dynamic strings
/// can be stored behind [`Arc<str>`], so cloning a [`Label`] only increments an
/// atomic reference count and does not allocate or copy the underlying string
/// data.
#[derive(Clone)]
pub struct LabelValue(LabelValueInner);

/// Internal representation for label names and values.
///
/// `LabelValue` is public because `Label::new` accepts it, but these storage
/// variants are implementation details. Keeping them private prevents external
/// code from constructing or matching on `Static` and `Shared` directly.
#[derive(Clone)]
enum LabelValueInner {
Static(&'static str),
Shared(Arc<str>),
}

impl LabelValue {
/// Return this label value as a string slice.
pub fn as_str(&self) -> &str {
match &self.0 {
LabelValueInner::Static(value) => value,
LabelValueInner::Shared(value) => value.as_ref(),
}
}
}

impl From<&'static str> for LabelValue {
fn from(value: &'static str) -> Self {
Self(LabelValueInner::Static(value))
}
}

impl From<String> for LabelValue {
fn from(value: String) -> Self {
Self(LabelValueInner::Shared(Arc::from(value)))
}
}

impl From<Arc<str>> for LabelValue {
fn from(value: Arc<str>) -> Self {
Self(LabelValueInner::Shared(value))
}
}

impl From<Cow<'static, str>> for LabelValue {
fn from(value: Cow<'static, str>) -> Self {
match value {
Cow::Borrowed(value) => value.into(),
Cow::Owned(value) => value.into(),
}
}
}

impl PartialEq for LabelValue {
fn eq(&self, other: &Self) -> bool {
self.as_str() == other.as_str()
}
}

impl Eq for LabelValue {}

impl Hash for LabelValue {
fn hash<H: Hasher>(&self, state: &mut H) {
self.as_str().hash(state);
}
}

impl Debug for LabelValue {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Debug::fmt(self.as_str(), f)
}
}

impl Display for LabelValue {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Display::fmt(self.as_str(), f)
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;
Expand Down Expand Up @@ -609,6 +692,18 @@ mod tests {
assert_eq!("output_rows{partition=2, foo=bar}=66", metric.to_string())
}

#[test]
fn test_label_owned_and_borrowed_values_are_equal() {
let borrowed = Label::new("foo", "bar");
let owned = Label::new("foo".to_string(), "bar".to_string());
let shared = Label::new("foo", Arc::<str>::from("bar"));

assert_eq!(borrowed, owned);
assert_eq!(borrowed, shared);
assert_eq!(borrowed.to_string(), owned.to_string());
assert_eq!(borrowed.to_string(), shared.to_string());
}

#[test]
fn test_output_rows() {
let metrics = ExecutionPlanMetricsSet::new();
Expand Down
Loading