Skip to content

Commit fc7d5ce

Browse files
committed
fix: report fully matched page-index skips
1 parent 0881481 commit fc7d5ce

10 files changed

Lines changed: 213 additions & 104 deletions

File tree

Cargo.lock

Lines changed: 98 additions & 82 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/datasource-parquet/src/access_plan.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,11 @@ impl ParquetAccessPlan {
203203
self.should_scan(idx) && self.fully_matched[idx]
204204
}
205205

206+
/// Returns the fully matched row group flags.
207+
pub(crate) fn fully_matched(&self) -> &Vec<bool> {
208+
&self.fully_matched
209+
}
210+
206211
/// Return true if any scanned row group is fully matched.
207212
fn has_fully_matched(&self) -> bool {
208213
self.row_group_index_iter()

datafusion/datasource-parquet/src/metrics.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use datafusion_physical_plan::metrics::{
1919
Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricCategory, MetricType,
2020
PruningMetrics, RatioMergeStrategy, RatioMetrics, Time,
2121
};
22+
use std::sync::{Arc, OnceLock};
2223

2324
/// Stores metrics about the parquet execution for a particular parquet file.
2425
///
@@ -67,6 +68,11 @@ pub struct ParquetFileMetrics {
6768
pub page_index_rows_pruned: PruningMetrics,
6869
/// Total pages filtered or matched by parquet page index
6970
pub page_index_pages_pruned: PruningMetrics,
71+
/// Lazily registered counter for pages whose page-index pruning was skipped
72+
/// because the containing row group was fully matched by row-group statistics.
73+
///
74+
/// These pages are still scanned; only page-index predicate evaluation is skipped.
75+
page_index_pages_skipped_by_fully_matched: Arc<OnceLock<Count>>,
7076
/// Total time spent evaluating parquet page index filters
7177
pub page_index_eval_time: Time,
7278
/// Total time spent reading and parsing metadata from the footer
@@ -91,6 +97,10 @@ pub struct ParquetFileMetrics {
9197
/// number of rows that were stored in the cache after evaluating predicates
9298
/// reused for the output.
9399
pub predicate_cache_records: Gauge,
100+
101+
partition: usize,
102+
filename: String,
103+
metrics: ExecutionPlanMetricsSet,
94104
}
95105

96106
impl ParquetFileMetrics {
@@ -204,13 +214,36 @@ impl ParquetFileMetrics {
204214
row_pushdown_eval_time,
205215
page_index_rows_pruned,
206216
page_index_pages_pruned,
217+
page_index_pages_skipped_by_fully_matched: Arc::new(OnceLock::new()),
207218
statistics_eval_time,
208219
bloom_filter_eval_time,
209220
page_index_eval_time,
210221
metadata_load_time,
211222
scan_efficiency_ratio,
212223
predicate_cache_inner_records,
213224
predicate_cache_records,
225+
partition,
226+
filename: filename.to_string(),
227+
metrics: metrics.clone(),
228+
}
229+
}
230+
231+
/// Record pages whose page-index pruning was skipped because the containing
232+
/// row group was fully matched by row-group statistics.
233+
pub(crate) fn add_page_index_pages_skipped_by_fully_matched(&self, n: usize) {
234+
if n == 0 {
235+
return;
214236
}
237+
238+
let count = self
239+
.page_index_pages_skipped_by_fully_matched
240+
.get_or_init(|| {
241+
MetricBuilder::new(&self.metrics)
242+
.with_new_label("filename", self.filename.clone())
243+
.with_type(MetricType::Summary)
244+
.with_category(MetricCategory::Rows)
245+
.counter("page_index_pages_skipped_by_fully_matched", self.partition)
246+
});
247+
count.add(n);
215248
}
216249
}

datafusion/datasource-parquet/src/page_filter.rs

Lines changed: 61 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use datafusion_pruning::PruningPredicate;
3535

3636
use log::{debug, trace};
3737
use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
38+
use parquet::arrow::parquet_column;
3839
use parquet::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex};
3940
use parquet::file::page_index::offset_index::PageLocation;
4041
use parquet::schema::types::SchemaDescriptor;
@@ -193,21 +194,24 @@ impl PagePruningAccessPlanFilter {
193194
let mut total_pages_skip = 0;
194195
// track the total number of pages that should not be skipped
195196
let mut total_pages_select = 0;
196-
// track rows that were already proven fully matched at row group
197-
// level and therefore did not need page-index predicate evaluation
198-
let mut total_rows_fully_matched = 0;
197+
// track pages for which page-index pruning was skipped because the
198+
// containing row group was already proven fully matched by statistics
199+
let mut total_pages_skipped_by_fully_matched = 0;
199200

200201
// for each row group specified in the access plan
201202
let row_group_indexes = access_plan.row_group_indexes();
202203
for row_group_index in row_group_indexes {
203204
// Skip page pruning for fully matched row groups: all rows are
204205
// known to satisfy the predicate, so page-level pruning is wasted work.
205206
if access_plan.is_fully_matched(row_group_index) {
206-
// Page metrics count evaluated page-index pruning work; this
207-
// branch only records rows already proven fully matched.
208-
let row_count = groups[row_group_index].num_rows() as usize;
209-
total_select += row_count;
210-
total_rows_fully_matched += row_count;
207+
let page_count = fully_matched_page_count(
208+
row_group_index,
209+
page_index_predicates,
210+
arrow_schema,
211+
parquet_schema,
212+
parquet_metadata,
213+
);
214+
total_pages_skipped_by_fully_matched += page_count;
211215

212216
continue;
213217
}
@@ -218,10 +222,13 @@ impl PagePruningAccessPlanFilter {
218222
let mut matched_pages_in_group: Option<HashSet<usize>> = None;
219223

220224
for predicate in page_index_predicates {
221-
let column = predicate
222-
.required_columns()
223-
.single_column()
224-
.expect("Page pruning requires single column predicates");
225+
let Some(column) = predicate.required_columns().single_column() else {
226+
debug!(
227+
"Ignoring multi-column page pruning predicate: {:?}",
228+
predicate.predicate_expr()
229+
);
230+
continue;
231+
};
225232

226233
let converter = StatisticsConverter::try_new(
227234
column.name(),
@@ -318,15 +325,15 @@ impl PagePruningAccessPlanFilter {
318325
file_metrics
319326
.page_index_rows_pruned
320327
.add_matched(total_select);
321-
file_metrics
322-
.page_index_rows_pruned
323-
.add_fully_matched(total_rows_fully_matched);
324328
file_metrics
325329
.page_index_pages_pruned
326330
.add_pruned(total_pages_skip);
327331
file_metrics
328332
.page_index_pages_pruned
329333
.add_matched(total_pages_select);
334+
file_metrics.add_page_index_pages_skipped_by_fully_matched(
335+
total_pages_skipped_by_fully_matched,
336+
);
330337
access_plan
331338
}
332339

@@ -346,6 +353,45 @@ fn update_selection(
346353
}
347354
}
348355

356+
/// Returns the number of pages for which page-index pruning is skipped because
357+
/// the containing row group is fully matched by row-group statistics.
358+
fn fully_matched_page_count(
359+
row_group_index: usize,
360+
page_index_predicates: &[PruningPredicate],
361+
arrow_schema: &Schema,
362+
parquet_schema: &SchemaDescriptor,
363+
parquet_metadata: &ParquetMetaData,
364+
) -> usize {
365+
let Some(offset_index) = parquet_metadata.offset_index() else {
366+
return 0;
367+
};
368+
369+
let Some(row_group_offsets) = offset_index.get(row_group_index) else {
370+
return 0;
371+
};
372+
373+
for predicate in page_index_predicates {
374+
let Some(column) = predicate.required_columns().single_column() else {
375+
continue;
376+
};
377+
378+
let Some((parquet_column_index, _)) =
379+
parquet_column(parquet_schema, arrow_schema, column.name())
380+
else {
381+
continue;
382+
};
383+
384+
let Some(offset_index_metadata) = row_group_offsets.get(parquet_column_index)
385+
else {
386+
continue;
387+
};
388+
389+
return offset_index_metadata.page_locations().len();
390+
}
391+
392+
0
393+
}
394+
349395
/// Returns a [`RowSelection`] for the rows in this row group to scan, in addition to a vec of
350396
/// booleans that state if each page was matched (true) or not (false).
351397
///

datafusion/datasource-parquet/src/row_group_filter.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,11 @@ impl RowGroupAccessPlanFilter {
7474
self.access_plan
7575
}
7676

77+
/// Returns the is_fully_matched vector.
78+
pub fn is_fully_matched(&self) -> &Vec<bool> {
79+
self.access_plan.fully_matched()
80+
}
81+
7782
/// Prunes the access plan based on the limit and fully contained row groups.
7883
///
7984
/// The pruning works by leveraging the concept of fully matched row groups. Consider a query like:

datafusion/physical-expr-common/src/metrics/value.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1010,7 +1010,10 @@ impl MetricValue {
10101010
Self::SpilledBytes(_) => 11,
10111011
Self::SpilledRows(_) => 12,
10121012
Self::CurrentMemoryUsage(_) => 13,
1013-
Self::Count { .. } => 14,
1013+
Self::Count { name, .. } => match name.as_ref() {
1014+
"page_index_pages_skipped_by_fully_matched" => 8,
1015+
_ => 14,
1016+
},
10141017
Self::Gauge { .. } => 15,
10151018
Self::Time { .. } => 16,
10161019
Self::Ratio { .. } => 17,

datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ Plan with Metrics
104104
03)----ProjectionExec: expr=[id@0 as id, value@1 as v, value@1 + id@0 as name], metrics=[output_rows=10, <slt:ignore>]
105105
04)------FilterExec: value@1 > 3, metrics=[output_rows=10, <slt:ignore>, selectivity=100% (10/10)]
106106
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, metrics=[output_rows=10, <slt:ignore>]
107-
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/test_data.parquet]]}, projection=[id, value], file_type=parquet, predicate=value@1 > 3 AND DynamicFilter [ value@1 IS NULL OR value@1 > 800 ], pruning_predicate=value_null_count@1 != row_count@2 AND value_max@0 > 3 AND (value_null_count@1 > 0 OR value_null_count@1 != row_count@2 AND value_max@0 > 800), required_guarantees=[], metrics=[output_rows=10, elapsed_compute=<slt:ignore>, output_bytes=80.0 B, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched -> 1 fully matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=210, metadata_load_time=<slt:ignore>, scan_efficiency_ratio=18.31% (210/1.15 K)]
107+
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/test_data.parquet]]}, projection=[id, value], file_type=parquet, predicate=value@1 > 3 AND DynamicFilter [ value@1 IS NULL OR value@1 > 800 ], pruning_predicate=value_null_count@1 != row_count@2 AND value_max@0 > 3 AND (value_null_count@1 > 0 OR value_null_count@1 != row_count@2 AND value_max@0 > 800), required_guarantees=[], metrics=[output_rows=10, elapsed_compute=<slt:ignore>, output_bytes=80.0 B, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched -> 1 fully matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_pages_skipped_by_fully_matched=1, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=210, metadata_load_time=<slt:ignore>, scan_efficiency_ratio=18.31% (210/1.15 K)]
108108

109109
statement ok
110110
set datafusion.explain.analyze_level = dev;

datafusion/sqllogictest/test_files/explain_analyze.slt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ explain analyze select * from cat_tracking where species > 'M' AND s >= 50 order
247247
----
248248
Plan with Metrics
249249
01)SortExec: TopK(fetch=3), expr=[species@0 ASC NULLS LAST], preserve_partitioning=[false], filter=[species@0 < Nlpine Sheep], metrics=[output_rows=3]
250-
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/explain_analyze/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=2 total → 2 matched, limit_pruned_row_groups=0 total → 0 matched, scan_efficiency_ratio=22.13% (521/2.35 K)]
250+
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/explain_analyze/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=2 total → 2 matched, page_index_pages_skipped_by_fully_matched=1, limit_pruned_row_groups=0 total → 0 matched, scan_efficiency_ratio=22.13% (521/2.35 K)]
251251

252252
statement ok
253253
reset datafusion.explain.analyze_categories;
@@ -262,7 +262,7 @@ explain analyze select * from cat_tracking where species > 'M' AND s >= 50 order
262262
----
263263
Plan with Metrics
264264
01)SortExec: TopK(fetch=3), expr=[species@0 ASC NULLS LAST], preserve_partitioning=[false], filter=[species@0 < Nlpine Sheep], metrics=[output_rows=3, output_bytes=<slt:ignore>]
265-
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/explain_analyze/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, output_bytes=<slt:ignore>, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=2 total → 2 matched, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=<slt:ignore>, scan_efficiency_ratio=<slt:ignore>]
265+
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/explain_analyze/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, output_bytes=<slt:ignore>, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=2 total → 2 matched, page_index_pages_skipped_by_fully_matched=1, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=<slt:ignore>, scan_efficiency_ratio=<slt:ignore>]
266266

267267
statement ok
268268
reset datafusion.explain.analyze_categories;
@@ -277,7 +277,7 @@ explain analyze select * from cat_tracking where species > 'M' AND s >= 50 order
277277
----
278278
Plan with Metrics
279279
01)SortExec: TopK(fetch=3), expr=[species@0 ASC NULLS LAST], preserve_partitioning=[false], filter=[species@0 < Nlpine Sheep], metrics=[output_rows=3, output_bytes=<slt:ignore>]
280-
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/explain_analyze/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, output_bytes=<slt:ignore>, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=2 total → 2 matched, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=<slt:ignore>, scan_efficiency_ratio=<slt:ignore>]
280+
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/explain_analyze/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, output_bytes=<slt:ignore>, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=2 total → 2 matched, page_index_pages_skipped_by_fully_matched=1, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=<slt:ignore>, scan_efficiency_ratio=<slt:ignore>]
281281

282282
statement ok
283283
reset datafusion.explain.analyze_categories;

0 commit comments

Comments
 (0)