Skip to content

Commit c5db269

Browse files
committed
Lazy register parquet file metrics
1 parent c8b784a commit c5db269

14 files changed

Lines changed: 719 additions & 201 deletions

File tree

datafusion/core/src/datasource/physical_plan/parquet.rs

Lines changed: 42 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -892,8 +892,11 @@ mod tests {
892892
.await;
893893
// There should be no predicate evaluation errors
894894
let metrics = rt.parquet_exec.metrics().unwrap();
895-
assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0);
896-
assert_eq!(get_value(&metrics, "pushdown_rows_matched"), 0);
895+
assert_eq!(
896+
get_value_or_zero(&metrics, "predicate_evaluation_errors"),
897+
0
898+
);
899+
assert_eq!(get_value_or_zero(&metrics, "pushdown_rows_matched"), 0);
897900
assert_eq!(rt.batches.unwrap().len(), 0);
898901

899902
// Predicate should prune no row groups
@@ -905,8 +908,11 @@ mod tests {
905908
.await;
906909
// There should be no predicate evaluation errors
907910
let metrics = rt.parquet_exec.metrics().unwrap();
908-
assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0);
909-
assert_eq!(get_value(&metrics, "pushdown_rows_matched"), 0);
911+
assert_eq!(
912+
get_value_or_zero(&metrics, "predicate_evaluation_errors"),
913+
0
914+
);
915+
assert_eq!(get_value_or_zero(&metrics, "pushdown_rows_matched"), 0);
910916
let read = rt
911917
.batches
912918
.unwrap()
@@ -934,7 +940,10 @@ mod tests {
934940
.await;
935941
// There should be no predicate evaluation errors
936942
let metrics = rt.parquet_exec.metrics().unwrap();
937-
assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0);
943+
assert_eq!(
944+
get_value_or_zero(&metrics, "predicate_evaluation_errors"),
945+
0
946+
);
938947
assert_eq!(rt.batches.unwrap().len(), 0);
939948

940949
// Predicate should prune no row groups
@@ -946,7 +955,10 @@ mod tests {
946955
.await;
947956
// There should be no predicate evaluation errors
948957
let metrics = rt.parquet_exec.metrics().unwrap();
949-
assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0);
958+
assert_eq!(
959+
get_value_or_zero(&metrics, "predicate_evaluation_errors"),
960+
0
961+
);
950962
let read = rt
951963
.batches
952964
.unwrap()
@@ -986,15 +998,21 @@ mod tests {
986998
.await;
987999
// There should be no predicate evaluation errors and we keep 1 row
9881000
let metrics = rt.parquet_exec.metrics().unwrap();
989-
assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0);
1001+
assert_eq!(
1002+
get_value_or_zero(&metrics, "predicate_evaluation_errors"),
1003+
0
1004+
);
9901005
let read = rt
9911006
.batches
9921007
.unwrap()
9931008
.iter()
9941009
.map(|b| b.num_rows())
9951010
.sum::<usize>();
9961011
assert_eq!(read, 1, "Expected 1 rows to match the predicate");
997-
assert_eq!(get_value(&metrics, "row_groups_pruned_statistics"), 0);
1012+
assert_eq!(
1013+
get_value_or_zero(&metrics, "row_groups_pruned_statistics"),
1014+
0
1015+
);
9981016
assert_eq!(get_value(&metrics, "page_index_rows_pruned"), 2);
9991017
assert_eq!(get_value(&metrics, "page_index_pages_pruned"), 1);
10001018
assert_eq!(get_value(&metrics, "pushdown_rows_pruned"), 1);
@@ -1012,7 +1030,10 @@ mod tests {
10121030
.await;
10131031
// There should be no predicate evaluation errors and we keep 0 rows
10141032
let metrics = rt.parquet_exec.metrics().unwrap();
1015-
assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0);
1033+
assert_eq!(
1034+
get_value_or_zero(&metrics, "predicate_evaluation_errors"),
1035+
0
1036+
);
10161037
let read = rt
10171038
.batches
10181039
.unwrap()
@@ -2147,6 +2168,18 @@ mod tests {
21472168
}
21482169
}
21492170

2171+
fn get_value_or_zero(metrics: &MetricsSet, metric_name: &str) -> usize {
2172+
metrics
2173+
.sum_by_name(metric_name)
2174+
.map(|v| match v {
2175+
MetricValue::PruningMetrics {
2176+
pruning_metrics, ..
2177+
} => pruning_metrics.pruned(),
2178+
_ => v.as_usize(),
2179+
})
2180+
.unwrap_or(0)
2181+
}
2182+
21502183
fn get_pruning_metric(metrics: &MetricsSet, metric_name: &str) -> (usize, usize) {
21512184
match metrics.sum_by_name(metric_name) {
21522185
Some(MetricValue::PruningMetrics {

datafusion/core/tests/parquet/external_access_plan.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ async fn skip_all() {
8484
.await;
8585

8686
// Verify that skipping all row groups skips reading any data at all
87-
let bytes_scanned = metric_value(&parquet_metrics, "bytes_scanned").unwrap();
87+
let bytes_scanned = metric_value(&parquet_metrics, "bytes_scanned").unwrap_or(0);
8888
assert_eq!(bytes_scanned, 0, "metrics : {parquet_metrics:#?}",);
8989
}
9090

datafusion/core/tests/parquet/filter_pushdown.rs

Lines changed: 62 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -544,9 +544,17 @@ impl<'a> TestCase<'a> {
544544
PushdownExpected::None
545545
};
546546

547-
let pushdown_rows_pruned = get_value(&metrics, "pushdown_rows_pruned");
547+
let pushdown_rows_pruned = match pushdown_expected {
548+
PushdownExpected::None => {
549+
metric_value_or_zero(&metrics, "pushdown_rows_pruned")
550+
}
551+
PushdownExpected::Some => {
552+
expect_metric_value(&metrics, "pushdown_rows_pruned")
553+
}
554+
};
555+
let pushdown_rows_matched =
556+
metric_value_or_zero(&metrics, "pushdown_rows_matched");
548557
println!(" pushdown_rows_pruned: {pushdown_rows_pruned}");
549-
let pushdown_rows_matched = get_value(&metrics, "pushdown_rows_matched");
550558
println!(" pushdown_rows_matched: {pushdown_rows_matched}");
551559

552560
match pushdown_expected {
@@ -562,11 +570,6 @@ impl<'a> TestCase<'a> {
562570
}
563571
};
564572

565-
let (page_index_rows_pruned, page_index_rows_matched) =
566-
get_pruning_metrics(&metrics, "page_index_rows_pruned");
567-
println!(" page_index_rows_pruned: {page_index_rows_pruned}");
568-
println!(" page_index_rows_matched: {page_index_rows_matched}");
569-
570573
let page_index_filtering_expected = if scan_options.enable_page_index {
571574
self.page_index_filtering_expected
572575
} else {
@@ -577,9 +580,17 @@ impl<'a> TestCase<'a> {
577580

578581
match page_index_filtering_expected {
579582
PageIndexFilteringExpected::None => {
583+
let (page_index_rows_pruned, page_index_rows_matched) =
584+
pruning_metrics_or_zero(&metrics, "page_index_rows_pruned");
585+
println!(" page_index_rows_pruned: {page_index_rows_pruned}");
586+
println!(" page_index_rows_matched: {page_index_rows_matched}");
580587
assert_eq!(page_index_rows_pruned, 0);
581588
}
582589
PageIndexFilteringExpected::Some => {
590+
let (page_index_rows_pruned, page_index_rows_matched) =
591+
expect_pruning_metrics(&metrics, "page_index_rows_pruned");
592+
println!(" page_index_rows_pruned: {page_index_rows_pruned}");
593+
println!(" page_index_rows_matched: {page_index_rows_matched}");
583594
assert!(
584595
page_index_rows_pruned > 0,
585596
"Expected to filter rows via page index but none were",
@@ -591,29 +602,55 @@ impl<'a> TestCase<'a> {
591602
}
592603
}
593604

594-
fn get_pruning_metrics(metrics: &MetricsSet, metric_name: &str) -> (usize, usize) {
605+
fn pruning_metrics_or_zero(metrics: &MetricsSet, metric_name: &str) -> (usize, usize) {
606+
get_pruning_metrics(metrics, metric_name, true)
607+
}
608+
609+
fn expect_pruning_metrics(metrics: &MetricsSet, metric_name: &str) -> (usize, usize) {
610+
get_pruning_metrics(metrics, metric_name, false)
611+
}
612+
613+
fn get_pruning_metrics(
614+
metrics: &MetricsSet,
615+
metric_name: &str,
616+
allow_missing: bool,
617+
) -> (usize, usize) {
595618
match metrics.sum_by_name(metric_name) {
596619
Some(MetricValue::PruningMetrics {
597620
pruning_metrics, ..
598621
}) => (pruning_metrics.pruned(), pruning_metrics.matched()),
599622
Some(_) => {
600623
panic!("Metric '{metric_name}' is not a pruning metric in\n\n{metrics:#?}")
601624
}
602-
None => panic!(
603-
"Expected metric not found. Looking for '{metric_name}' in\n\n{metrics:#?}"
604-
),
625+
None if allow_missing => (0, 0),
626+
None => panic!("Expected metric '{metric_name}' not found in\n\n{metrics:#?}"),
605627
}
606628
}
607629

608-
fn get_value(metrics: &MetricsSet, metric_name: &str) -> usize {
630+
fn metric_value_or_zero(metrics: &MetricsSet, metric_name: &str) -> usize {
631+
get_value(metrics, metric_name, true)
632+
}
633+
634+
fn expect_metric_value(metrics: &MetricsSet, metric_name: &str) -> usize {
635+
get_value(metrics, metric_name, false)
636+
}
637+
638+
fn metric_value_for_expected(
639+
metrics: &MetricsSet,
640+
metric_name: &str,
641+
expected: usize,
642+
) -> usize {
643+
get_value(metrics, metric_name, expected == 0)
644+
}
645+
646+
fn get_value(metrics: &MetricsSet, metric_name: &str, allow_missing: bool) -> usize {
609647
match metrics.sum_by_name(metric_name) {
610648
Some(MetricValue::PruningMetrics {
611649
pruning_metrics, ..
612650
}) => pruning_metrics.pruned(),
613651
Some(v) => v.as_usize(),
614-
None => panic!(
615-
"Expected metric not found. Looking for '{metric_name}' in\n\n{metrics:#?}"
616-
),
652+
None if allow_missing => 0,
653+
None => panic!("Expected metric '{metric_name}' not found in\n\n{metrics:#?}"),
617654
}
618655
}
619656

@@ -736,11 +773,19 @@ impl PredicateCacheTest {
736773

737774
// verify the predicate cache metrics
738775
assert_eq!(
739-
get_value(&metrics, "predicate_cache_inner_records"),
776+
metric_value_for_expected(
777+
&metrics,
778+
"predicate_cache_inner_records",
779+
expected_inner_records
780+
),
740781
expected_inner_records
741782
);
742783
assert_eq!(
743-
get_value(&metrics, "predicate_cache_records"),
784+
metric_value_for_expected(
785+
&metrics,
786+
"predicate_cache_records",
787+
expected_records
788+
),
744789
expected_records
745790
);
746791
Ok(())

datafusion/core/tests/parquet/mod.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,20 @@ impl TestOutput {
267267
}
268268
}
269269

270+
pub(crate) fn zero_if_metric_absent(
271+
actual: Option<usize>,
272+
expected: Option<usize>,
273+
) -> Option<usize> {
274+
// Lazy parquet metrics may represent zero as an absent metric. Tests that
275+
// specifically enforce lazy-skipping zero metrics live with the metrics
276+
// implementation; these integration tests only compare scan behavior.
277+
if expected == Some(0) {
278+
Some(actual.unwrap_or(0))
279+
} else {
280+
actual
281+
}
282+
}
283+
270284
/// Creates an execution context that has an external table "t"
271285
/// registered pointing at a parquet file made with `make_test_file`
272286
/// and the appropriate scenario

datafusion/core/tests/parquet/page_pruning.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
use std::sync::Arc;
1919

2020
use crate::parquet::Unit::Page;
21-
use crate::parquet::{ContextWithParquet, Scenario};
21+
use crate::parquet::{ContextWithParquet, Scenario, zero_if_metric_absent};
2222

2323
use arrow::array::{Int32Array, RecordBatch};
2424
use arrow::datatypes::{DataType, Field, Schema};
@@ -238,8 +238,14 @@ async fn test_prune(
238238
.await;
239239

240240
println!("{}", output.description());
241-
assert_eq!(output.predicate_evaluation_errors(), expected_errors);
242-
assert_eq!(output.row_pages_pruned(), expected_row_pages_pruned);
241+
assert_eq!(
242+
zero_if_metric_absent(output.predicate_evaluation_errors(), expected_errors),
243+
expected_errors
244+
);
245+
assert_eq!(
246+
zero_if_metric_absent(output.row_pages_pruned(), expected_row_pages_pruned),
247+
expected_row_pages_pruned
248+
);
243249
assert_eq!(
244250
output.result_rows,
245251
expected_results,
@@ -360,7 +366,10 @@ async fn prune_date64() {
360366

361367
println!("{}", output.description());
362368
// This should prune out groups without error
363-
assert_eq!(output.predicate_evaluation_errors(), Some(0));
369+
assert_eq!(
370+
zero_if_metric_absent(output.predicate_evaluation_errors(), Some(0)),
371+
Some(0)
372+
);
364373
assert_eq!(output.row_pages_pruned(), Some(15));
365374
assert_eq!(output.result_rows, 1, "{}", output.description());
366375
}

0 commit comments

Comments
 (0)