Skip to content

Commit eb2ee9b

Browse files
committed
Lazy register parquet file metrics
1 parent c8b784a commit eb2ee9b

8 files changed

Lines changed: 587 additions & 149 deletions

File tree

datafusion/core/tests/parquet/external_access_plan.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ async fn skip_all() {
8484
.await;
8585

8686
// Verify that skipping all row groups skips reading any data at all
87-
let bytes_scanned = metric_value(&parquet_metrics, "bytes_scanned").unwrap();
87+
let bytes_scanned = metric_value(&parquet_metrics, "bytes_scanned").unwrap_or(0);
8888
assert_eq!(bytes_scanned, 0, "metrics : {parquet_metrics:#?}",);
8989
}
9090

datafusion/core/tests/parquet/filter_pushdown.rs

Lines changed: 62 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -544,9 +544,17 @@ impl<'a> TestCase<'a> {
544544
PushdownExpected::None
545545
};
546546

547-
let pushdown_rows_pruned = get_value(&metrics, "pushdown_rows_pruned");
547+
let pushdown_rows_pruned = match pushdown_expected {
548+
PushdownExpected::None => {
549+
metric_value_or_zero(&metrics, "pushdown_rows_pruned")
550+
}
551+
PushdownExpected::Some => {
552+
expect_metric_value(&metrics, "pushdown_rows_pruned")
553+
}
554+
};
555+
let pushdown_rows_matched =
556+
metric_value_or_zero(&metrics, "pushdown_rows_matched");
548557
println!(" pushdown_rows_pruned: {pushdown_rows_pruned}");
549-
let pushdown_rows_matched = get_value(&metrics, "pushdown_rows_matched");
550558
println!(" pushdown_rows_matched: {pushdown_rows_matched}");
551559

552560
match pushdown_expected {
@@ -562,11 +570,6 @@ impl<'a> TestCase<'a> {
562570
}
563571
};
564572

565-
let (page_index_rows_pruned, page_index_rows_matched) =
566-
get_pruning_metrics(&metrics, "page_index_rows_pruned");
567-
println!(" page_index_rows_pruned: {page_index_rows_pruned}");
568-
println!(" page_index_rows_matched: {page_index_rows_matched}");
569-
570573
let page_index_filtering_expected = if scan_options.enable_page_index {
571574
self.page_index_filtering_expected
572575
} else {
@@ -577,9 +580,17 @@ impl<'a> TestCase<'a> {
577580

578581
match page_index_filtering_expected {
579582
PageIndexFilteringExpected::None => {
583+
let (page_index_rows_pruned, page_index_rows_matched) =
584+
pruning_metrics_or_zero(&metrics, "page_index_rows_pruned");
585+
println!(" page_index_rows_pruned: {page_index_rows_pruned}");
586+
println!(" page_index_rows_matched: {page_index_rows_matched}");
580587
assert_eq!(page_index_rows_pruned, 0);
581588
}
582589
PageIndexFilteringExpected::Some => {
590+
let (page_index_rows_pruned, page_index_rows_matched) =
591+
expect_pruning_metrics(&metrics, "page_index_rows_pruned");
592+
println!(" page_index_rows_pruned: {page_index_rows_pruned}");
593+
println!(" page_index_rows_matched: {page_index_rows_matched}");
583594
assert!(
584595
page_index_rows_pruned > 0,
585596
"Expected to filter rows via page index but none were",
@@ -591,29 +602,55 @@ impl<'a> TestCase<'a> {
591602
}
592603
}
593604

594-
fn get_pruning_metrics(metrics: &MetricsSet, metric_name: &str) -> (usize, usize) {
605+
fn pruning_metrics_or_zero(metrics: &MetricsSet, metric_name: &str) -> (usize, usize) {
606+
get_pruning_metrics(metrics, metric_name, true)
607+
}
608+
609+
fn expect_pruning_metrics(metrics: &MetricsSet, metric_name: &str) -> (usize, usize) {
610+
get_pruning_metrics(metrics, metric_name, false)
611+
}
612+
613+
fn get_pruning_metrics(
614+
metrics: &MetricsSet,
615+
metric_name: &str,
616+
allow_missing: bool,
617+
) -> (usize, usize) {
595618
match metrics.sum_by_name(metric_name) {
596619
Some(MetricValue::PruningMetrics {
597620
pruning_metrics, ..
598621
}) => (pruning_metrics.pruned(), pruning_metrics.matched()),
599622
Some(_) => {
600623
panic!("Metric '{metric_name}' is not a pruning metric in\n\n{metrics:#?}")
601624
}
602-
None => panic!(
603-
"Expected metric not found. Looking for '{metric_name}' in\n\n{metrics:#?}"
604-
),
625+
None if allow_missing => (0, 0),
626+
None => panic!("Expected metric '{metric_name}' not found in\n\n{metrics:#?}"),
605627
}
606628
}
607629

608-
fn get_value(metrics: &MetricsSet, metric_name: &str) -> usize {
630+
fn metric_value_or_zero(metrics: &MetricsSet, metric_name: &str) -> usize {
631+
get_value(metrics, metric_name, true)
632+
}
633+
634+
fn expect_metric_value(metrics: &MetricsSet, metric_name: &str) -> usize {
635+
get_value(metrics, metric_name, false)
636+
}
637+
638+
fn metric_value_for_expected(
639+
metrics: &MetricsSet,
640+
metric_name: &str,
641+
expected: usize,
642+
) -> usize {
643+
get_value(metrics, metric_name, expected == 0)
644+
}
645+
646+
fn get_value(metrics: &MetricsSet, metric_name: &str, allow_missing: bool) -> usize {
609647
match metrics.sum_by_name(metric_name) {
610648
Some(MetricValue::PruningMetrics {
611649
pruning_metrics, ..
612650
}) => pruning_metrics.pruned(),
613651
Some(v) => v.as_usize(),
614-
None => panic!(
615-
"Expected metric not found. Looking for '{metric_name}' in\n\n{metrics:#?}"
616-
),
652+
None if allow_missing => 0,
653+
None => panic!("Expected metric '{metric_name}' not found in\n\n{metrics:#?}"),
617654
}
618655
}
619656

@@ -736,11 +773,19 @@ impl PredicateCacheTest {
736773

737774
// verify the predicate cache metrics
738775
assert_eq!(
739-
get_value(&metrics, "predicate_cache_inner_records"),
776+
metric_value_for_expected(
777+
&metrics,
778+
"predicate_cache_inner_records",
779+
expected_inner_records
780+
),
740781
expected_inner_records
741782
);
742783
assert_eq!(
743-
get_value(&metrics, "predicate_cache_records"),
784+
metric_value_for_expected(
785+
&metrics,
786+
"predicate_cache_records",
787+
expected_records
788+
),
744789
expected_records
745790
);
746791
Ok(())

datafusion/core/tests/parquet/mod.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,20 @@ impl TestOutput {
267267
}
268268
}
269269

270+
pub(crate) fn zero_if_metric_absent(
271+
actual: Option<usize>,
272+
expected: Option<usize>,
273+
) -> Option<usize> {
274+
// Lazy parquet metrics may represent zero as an absent metric. Tests that
275+
// specifically enforce lazy-skipping zero metrics live with the metrics
276+
// implementation; these integration tests only compare scan behavior.
277+
if expected == Some(0) {
278+
Some(actual.unwrap_or(0))
279+
} else {
280+
actual
281+
}
282+
}
283+
270284
/// Creates an execution context that has an external table "t"
271285
/// registered pointing at a parquet file made with `make_test_file`
272286
/// and the appropriate scenario

datafusion/core/tests/parquet/page_pruning.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
use std::sync::Arc;
1919

2020
use crate::parquet::Unit::Page;
21-
use crate::parquet::{ContextWithParquet, Scenario};
21+
use crate::parquet::{ContextWithParquet, Scenario, zero_if_metric_absent};
2222

2323
use arrow::array::{Int32Array, RecordBatch};
2424
use arrow::datatypes::{DataType, Field, Schema};
@@ -238,8 +238,14 @@ async fn test_prune(
238238
.await;
239239

240240
println!("{}", output.description());
241-
assert_eq!(output.predicate_evaluation_errors(), expected_errors);
242-
assert_eq!(output.row_pages_pruned(), expected_row_pages_pruned);
241+
assert_eq!(
242+
zero_if_metric_absent(output.predicate_evaluation_errors(), expected_errors),
243+
expected_errors
244+
);
245+
assert_eq!(
246+
zero_if_metric_absent(output.row_pages_pruned(), expected_row_pages_pruned),
247+
expected_row_pages_pruned
248+
);
243249
assert_eq!(
244250
output.result_rows,
245251
expected_results,
@@ -360,7 +366,10 @@ async fn prune_date64() {
360366

361367
println!("{}", output.description());
362368
// This should prune out groups without error
363-
assert_eq!(output.predicate_evaluation_errors(), Some(0));
369+
assert_eq!(
370+
zero_if_metric_absent(output.predicate_evaluation_errors(), Some(0)),
371+
Some(0)
372+
);
364373
assert_eq!(output.row_pages_pruned(), Some(15));
365374
assert_eq!(output.result_rows, 1, "{}", output.description());
366375
}

datafusion/core/tests/parquet/row_group_pruning.rs

Lines changed: 61 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use datafusion_common::{DataFusionError, ScalarValue};
2727
use itertools::Itertools;
2828

2929
use crate::parquet::Unit::RowGroup;
30-
use crate::parquet::{ContextWithParquet, Scenario};
30+
use crate::parquet::{ContextWithParquet, Scenario, zero_if_metric_absent};
3131
use datafusion_expr::{col, lit};
3232
struct RowGroupPruningTest {
3333
scenario: Scenario,
@@ -136,33 +136,51 @@ impl RowGroupPruningTest {
136136

137137
println!("{}", output.description());
138138
assert_eq!(
139-
output.predicate_evaluation_errors(),
139+
zero_if_metric_absent(
140+
output.predicate_evaluation_errors(),
141+
self.expected_errors
142+
),
140143
self.expected_errors,
141144
"mismatched predicate_evaluation error"
142145
);
143146
assert_eq!(
144-
output.row_groups_matched_statistics(),
147+
zero_if_metric_absent(
148+
output.row_groups_matched_statistics(),
149+
self.expected_row_group_matched_by_statistics
150+
),
145151
self.expected_row_group_matched_by_statistics,
146152
"mismatched row_groups_matched_statistics",
147153
);
148154
assert_eq!(
149-
output.row_groups_pruned_statistics(),
155+
zero_if_metric_absent(
156+
output.row_groups_pruned_statistics(),
157+
self.expected_row_group_pruned_by_statistics
158+
),
150159
self.expected_row_group_pruned_by_statistics,
151160
"mismatched row_groups_pruned_statistics",
152161
);
153162
assert_eq!(
154-
output.files_ranges_pruned_statistics(),
163+
zero_if_metric_absent(
164+
output.files_ranges_pruned_statistics(),
165+
self.expected_files_pruned_by_statistics
166+
),
155167
self.expected_files_pruned_by_statistics,
156168
"mismatched files_ranges_pruned_statistics",
157169
);
158170
let bloom_filter_metrics = output.row_groups_bloom_filter();
159171
assert_eq!(
160-
bloom_filter_metrics.as_ref().map(|pm| pm.total_matched()),
172+
zero_if_metric_absent(
173+
bloom_filter_metrics.as_ref().map(|pm| pm.total_matched()),
174+
self.expected_row_group_matched_by_bloom_filter
175+
),
161176
self.expected_row_group_matched_by_bloom_filter,
162177
"mismatched row_groups_matched_bloom_filter",
163178
);
164179
assert_eq!(
165-
bloom_filter_metrics.map(|pm| pm.total_pruned()),
180+
zero_if_metric_absent(
181+
bloom_filter_metrics.map(|pm| pm.total_pruned()),
182+
self.expected_row_group_pruned_by_bloom_filter
183+
),
166184
self.expected_row_group_pruned_by_bloom_filter,
167185
"mismatched row_groups_pruned_bloom_filter",
168186
);
@@ -196,32 +214,50 @@ impl RowGroupPruningTest {
196214

197215
println!("{}", output.description());
198216
assert_eq!(
199-
output.predicate_evaluation_errors(),
217+
zero_if_metric_absent(
218+
output.predicate_evaluation_errors(),
219+
self.expected_errors
220+
),
200221
self.expected_errors,
201222
"mismatched predicate_evaluation error"
202223
);
203224
assert_eq!(
204-
output.row_groups_matched_statistics(),
225+
zero_if_metric_absent(
226+
output.row_groups_matched_statistics(),
227+
self.expected_row_group_matched_by_statistics
228+
),
205229
self.expected_row_group_matched_by_statistics,
206230
"mismatched row_groups_matched_statistics",
207231
);
208232
assert_eq!(
209-
output.row_groups_fully_matched_statistics(),
233+
zero_if_metric_absent(
234+
output.row_groups_fully_matched_statistics(),
235+
self.expected_row_group_fully_matched_by_statistics
236+
),
210237
self.expected_row_group_fully_matched_by_statistics,
211238
"mismatched row_groups_fully_matched_statistics",
212239
);
213240
assert_eq!(
214-
output.row_groups_pruned_statistics(),
241+
zero_if_metric_absent(
242+
output.row_groups_pruned_statistics(),
243+
self.expected_row_group_pruned_by_statistics
244+
),
215245
self.expected_row_group_pruned_by_statistics,
216246
"mismatched row_groups_pruned_statistics",
217247
);
218248
assert_eq!(
219-
output.files_ranges_pruned_statistics(),
249+
zero_if_metric_absent(
250+
output.files_ranges_pruned_statistics(),
251+
self.expected_files_pruned_by_statistics
252+
),
220253
self.expected_files_pruned_by_statistics,
221254
"mismatched files_ranges_pruned_statistics",
222255
);
223256
assert_eq!(
224-
output.limit_pruned_row_groups(),
257+
zero_if_metric_absent(
258+
output.limit_pruned_row_groups(),
259+
self.expected_limit_pruned_row_groups
260+
),
225261
self.expected_limit_pruned_row_groups,
226262
"mismatched limit_pruned_row_groups",
227263
);
@@ -343,7 +379,10 @@ async fn prune_date64() {
343379

344380
println!("{}", output.description());
345381
// This should prune out groups without error
346-
assert_eq!(output.predicate_evaluation_errors(), Some(0));
382+
assert_eq!(
383+
zero_if_metric_absent(output.predicate_evaluation_errors(), Some(0)),
384+
Some(0)
385+
);
347386
// 'dates' table has 4 row groups, and only the first one is matched by the predicate
348387
assert_eq!(output.row_groups_matched_statistics(), Some(1));
349388
assert_eq!(output.row_groups_pruned_statistics(), Some(3));
@@ -383,9 +422,15 @@ async fn prune_disabled() {
383422
println!("{}", output.description());
384423

385424
// This should not prune any
386-
assert_eq!(output.predicate_evaluation_errors(), Some(0));
425+
assert_eq!(
426+
zero_if_metric_absent(output.predicate_evaluation_errors(), Some(0)),
427+
Some(0)
428+
);
387429
assert_eq!(output.row_groups_matched(), Some(4));
388-
assert_eq!(output.row_groups_pruned(), Some(0));
430+
assert_eq!(
431+
zero_if_metric_absent(output.row_groups_pruned(), Some(0)),
432+
Some(0)
433+
);
389434
assert_eq!(
390435
output.result_rows,
391436
expected_rows,

0 commit comments

Comments
 (0)