Skip to content

Commit 769ad4f

Browse files
Add internal repartition metrics
1 parent 937dfda commit 769ad4f

15 files changed

Lines changed: 676 additions & 57 deletions

File tree

datafusion/common/src/config.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1385,6 +1385,7 @@ config_namespace! {
13851385
/// Verbosity level for "EXPLAIN ANALYZE". Default is "dev"
13861386
/// "summary" shows common metrics for high-level insights.
13871387
/// "dev" provides deep operator-level introspection for developers.
1388+
/// "internal" provides low-level kernel debugging metrics.
13881389
pub analyze_level: MetricType, default = MetricType::Dev
13891390

13901391
/// Which metric categories to include in "EXPLAIN ANALYZE" output.

datafusion/common/src/format.rs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,8 @@ impl ConfigField for ExplainFormat {
210210
///
211211
/// The `datafusion.explain.analyze_level` configuration controls which
212212
/// type is shown:
213-
/// - `"dev"` (the default): all metrics are shown.
213+
/// - `"dev"` (the default): summary and developer-facing metrics are shown.
214+
/// - `"internal"`: all metrics are shown, including kernel debugging metrics.
214215
/// - `"summary"`: only metrics tagged as `Summary` are shown.
215216
///
216217
/// This is orthogonal to [`MetricCategory`], which filters by *what kind*
@@ -228,19 +229,30 @@ pub enum MetricType {
228229
Summary,
229230
/// For deep operator-level introspection for developers
230231
Dev,
232+
/// For low-level kernel debugging and DataFusion development
233+
Internal,
231234
}
232235

233236
impl MetricType {
234237
/// Returns the set of metric types that should be shown for this level.
235238
///
236-
/// `Dev` is a superset of `Summary`: when the user selects
237-
/// `analyze_level = 'dev'`, both `Summary` and `Dev` metrics are shown.
239+
/// `Dev` is a superset of `Summary`, and `Internal` is a superset of
240+
/// both: when the user selects `analyze_level = 'internal'`, `Summary`,
241+
/// `Dev`, and `Internal` metrics are shown.
238242
pub fn included_types(self) -> Vec<MetricType> {
239243
match self {
240244
MetricType::Summary => vec![MetricType::Summary],
241245
MetricType::Dev => vec![MetricType::Summary, MetricType::Dev],
246+
MetricType::Internal => {
247+
vec![MetricType::Summary, MetricType::Dev, MetricType::Internal]
248+
}
242249
}
243250
}
251+
252+
/// Returns true if `self` includes metrics tagged as `metric_type`.
253+
pub fn includes(self, metric_type: MetricType) -> bool {
254+
self.included_types().contains(&metric_type)
255+
}
244256
}
245257

246258
impl FromStr for MetricType {
@@ -250,8 +262,9 @@ impl FromStr for MetricType {
250262
match s.trim().to_lowercase().as_str() {
251263
"summary" => Ok(Self::Summary),
252264
"dev" => Ok(Self::Dev),
265+
"internal" => Ok(Self::Internal),
253266
other => Err(DataFusionError::Configuration(format!(
254-
"Invalid explain analyze level. Expected 'summary' or 'dev'. Got '{other}'"
267+
"Invalid explain analyze level. Expected 'summary', 'dev', or 'internal'. Got '{other}'"
255268
))),
256269
}
257270
}
@@ -262,6 +275,7 @@ impl Display for MetricType {
262275
match self {
263276
Self::Summary => write!(f, "summary"),
264277
Self::Dev => write!(f, "dev"),
278+
Self::Internal => write!(f, "internal"),
265279
}
266280
}
267281
}

datafusion/core/tests/sql/explain_analyze.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,25 @@ async fn collect_plan_with_categories(
236236
.to_string()
237237
}
238238

239+
async fn collect_plan_after_set(
240+
ctx: &SessionContext,
241+
level: &str,
242+
sql_str: &str,
243+
) -> String {
244+
ctx.sql(&format!("SET datafusion.explain.analyze_level = '{level}'"))
245+
.await
246+
.unwrap()
247+
.collect()
248+
.await
249+
.unwrap();
250+
251+
let dataframe = ctx.sql(sql_str).await.unwrap();
252+
let batches = dataframe.collect().await.unwrap();
253+
arrow::util::pretty::pretty_format_batches(&batches)
254+
.unwrap()
255+
.to_string()
256+
}
257+
239258
async fn collect_plan(sql_str: &str, level: MetricType) -> String {
240259
let ctx = SessionContext::new();
241260
collect_plan_with_context(sql_str, &ctx, level).await
@@ -257,6 +276,10 @@ async fn explain_analyze_level() {
257276
(MetricType::Dev, "output_rows", true),
258277
(MetricType::Dev, "output_bytes", true),
259278
(MetricType::Dev, "output_batches", true),
279+
(MetricType::Internal, "spill_count", true),
280+
(MetricType::Internal, "output_rows", true),
281+
(MetricType::Internal, "output_bytes", true),
282+
(MetricType::Internal, "output_batches", true),
260283
] {
261284
let plan = collect_plan(sql, level).await;
262285
assert_eq!(
@@ -267,6 +290,33 @@ async fn explain_analyze_level() {
267290
}
268291
}
269292

293+
#[tokio::test]
294+
async fn explain_analyze_internal_repartition_metrics() {
295+
let ctx = SessionContext::new_with_config(
296+
SessionConfig::new()
297+
.with_target_partitions(4)
298+
.with_batch_size(4096),
299+
);
300+
register_aggregate_csv_by_sql(&ctx).await;
301+
302+
let sql = "EXPLAIN ANALYZE \
303+
SELECT c1, count(*) \
304+
FROM aggregate_test_100 \
305+
GROUP BY c1";
306+
307+
let dev_plan = collect_plan_after_set(&ctx, "dev", sql).await;
308+
assert_contains!(&dev_plan, "RepartitionExec");
309+
assert_not_contains!(&dev_plan, "hash_compute_time");
310+
assert_not_contains!(&dev_plan, "route_time");
311+
assert_not_contains!(&dev_plan, "batch_build_time");
312+
313+
let internal_plan = collect_plan_after_set(&ctx, "internal", sql).await;
314+
assert_contains!(&internal_plan, "RepartitionExec");
315+
assert_contains!(&internal_plan, "hash_compute_time");
316+
assert_contains!(&internal_plan, "route_time");
317+
assert_contains!(&internal_plan, "batch_build_time");
318+
}
319+
270320
#[tokio::test]
271321
async fn explain_analyze_level_datasource_parquet() {
272322
let table_name = "tpch_lineitem_small";
@@ -284,6 +334,8 @@ async fn explain_analyze_level_datasource_parquet() {
284334
(MetricType::Summary, "page_index_eval_time", false),
285335
(MetricType::Dev, "metadata_load_time", true),
286336
(MetricType::Dev, "page_index_eval_time", true),
337+
(MetricType::Internal, "metadata_load_time", true),
338+
(MetricType::Internal, "page_index_eval_time", true),
287339
] {
288340
let plan = collect_plan_with_context(&sql, &ctx, level).await;
289341

datafusion/physical-expr-common/src/metrics/builder.rs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,19 @@ impl<'a> MetricBuilder<'a> {
117117
self
118118
}
119119

120+
/// Switches to a conditional builder for metrics controlled by
121+
/// `datafusion.explain.analyze_level`.
122+
///
123+
/// The returned builder preserves this builder's labels, category, and
124+
/// [`MetricType`], but its terminal methods return `Option<T>` and skip
125+
/// registration when `enabled_level` does not include this builder's
126+
/// [`MetricType`].
127+
pub fn if_enabled(self, enabled_level: MetricType) -> ConditionalMetricBuilder<'a> {
128+
ConditionalMetricBuilder {
129+
builder: enabled_level.includes(self.metric_type).then_some(self),
130+
}
131+
}
132+
120133
/// Consume self and create a metric of the specified value
121134
/// registered with the MetricsSet
122135
pub fn build(self, value: MetricValue) {
@@ -333,3 +346,51 @@ impl<'a> MetricBuilder<'a> {
333346
ratio_metrics
334347
}
335348
}
349+
350+
/// A conditional metric builder used after [`MetricBuilder::if_enabled`].
351+
///
352+
/// This type intentionally mirrors a subset of [`MetricBuilder`], but its
353+
/// terminal methods return `Option<T>`.
354+
/// - `Some(T)` means the metric was registered and can be updated.
355+
/// - `None` means the configured metric level does not include this metric,
356+
/// so the caller should do no metric work.
357+
pub struct ConditionalMetricBuilder<'a> {
358+
builder: Option<MetricBuilder<'a>>,
359+
}
360+
361+
impl<'a> ConditionalMetricBuilder<'a> {
362+
/// Add a label to the metric being constructed, if this builder is enabled.
363+
pub fn with_label(mut self, label: Label) -> Self {
364+
self.builder = self.builder.map(|builder| builder.with_label(label));
365+
self
366+
}
367+
368+
/// Set the semantic category for the metric being constructed, if this
369+
/// builder is enabled.
370+
pub fn with_category(mut self, category: MetricCategory) -> Self {
371+
self.builder = self.builder.map(|builder| builder.with_category(category));
372+
self
373+
}
374+
375+
/// Consume self and conditionally create a new timer for recording some
376+
/// subset of an operator's execution time.
377+
pub fn subset_time(
378+
self,
379+
subset_name: impl Into<Cow<'static, str>>,
380+
partition: usize,
381+
) -> Option<Time> {
382+
self.builder
383+
.map(|builder| builder.subset_time(subset_name, partition))
384+
}
385+
386+
/// Consume self and conditionally create a new counter for recording some
387+
/// arbitrary metric of an operator.
388+
pub fn counter(
389+
self,
390+
counter_name: impl Into<Cow<'static, str>>,
391+
partition: usize,
392+
) -> Option<Count> {
393+
self.builder
394+
.map(|builder| builder.counter(counter_name, partition))
395+
}
396+
}

datafusion/physical-expr-common/src/metrics/mod.rs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -623,6 +623,61 @@ mod tests {
623623
assert_eq!(metrics.clone_inner().output_rows().unwrap(), 20);
624624
}
625625

626+
#[test]
627+
fn test_conditional_metric_builder_uses_metric_type_level() {
628+
for (enabled_level, metric_type, should_register) in [
629+
(MetricType::Summary, MetricType::Summary, true),
630+
(MetricType::Summary, MetricType::Dev, false),
631+
(MetricType::Summary, MetricType::Internal, false),
632+
(MetricType::Dev, MetricType::Summary, true),
633+
(MetricType::Dev, MetricType::Dev, true),
634+
(MetricType::Dev, MetricType::Internal, false),
635+
(MetricType::Internal, MetricType::Summary, true),
636+
(MetricType::Internal, MetricType::Dev, true),
637+
(MetricType::Internal, MetricType::Internal, true),
638+
] {
639+
let metrics = ExecutionPlanMetricsSet::new();
640+
let metric = MetricBuilder::new(&metrics)
641+
.with_type(metric_type)
642+
.if_enabled(enabled_level)
643+
.subset_time("conditional_time", 0);
644+
645+
assert_eq!(
646+
metric.is_some(),
647+
should_register,
648+
"enabled_level={enabled_level:?} metric_type={metric_type:?}"
649+
);
650+
assert_eq!(
651+
metrics
652+
.clone_inner()
653+
.sum_by_name("conditional_time")
654+
.is_some(),
655+
should_register,
656+
"enabled_level={enabled_level:?} metric_type={metric_type:?}"
657+
);
658+
}
659+
660+
let metrics = ExecutionPlanMetricsSet::new();
661+
let label = Label::new("label", "value");
662+
let metric = MetricBuilder::new(&metrics)
663+
.with_type(MetricType::Internal)
664+
.with_category(MetricCategory::Timing)
665+
.if_enabled(MetricType::Internal)
666+
.with_label(label.clone())
667+
.subset_time("internal_time", 0);
668+
669+
assert!(metric.is_some());
670+
671+
let metrics = metrics.clone_inner();
672+
let metric = metrics
673+
.iter()
674+
.find(|metric| metric.value().name() == "internal_time")
675+
.expect("registered internal metric");
676+
assert_eq!(metric.metric_type(), MetricType::Internal);
677+
assert_eq!(metric.metric_category(), Some(MetricCategory::Timing));
678+
assert_eq!(metric.labels(), &[label]);
679+
}
680+
626681
#[test]
627682
fn test_elapsed_compute() {
628683
let metrics = ExecutionPlanMetricsSet::new();

0 commit comments

Comments
 (0)