Skip to content

Commit 18f2742

Browse files
committed
fix: report fully matched page-index skips
1 parent 0881481 commit 18f2742

10 files changed

Lines changed: 246 additions & 104 deletions

File tree

Cargo.lock

Lines changed: 98 additions & 82 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/datasource-parquet/src/access_plan.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,11 @@ impl ParquetAccessPlan {
203203
self.should_scan(idx) && self.fully_matched[idx]
204204
}
205205

206+
/// Returns the fully matched row group flags.
207+
pub(crate) fn fully_matched(&self) -> &Vec<bool> {
208+
&self.fully_matched
209+
}
210+
206211
/// Return true if any scanned row group is fully matched.
207212
fn has_fully_matched(&self) -> bool {
208213
self.row_group_index_iter()

datafusion/datasource-parquet/src/metrics.rs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use datafusion_physical_plan::metrics::{
1919
Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricCategory, MetricType,
2020
PruningMetrics, RatioMergeStrategy, RatioMetrics, Time,
2121
};
22+
use std::sync::{Arc, OnceLock};
2223

2324
/// Stores metrics about the parquet execution for a particular parquet file.
2425
///
@@ -67,6 +68,11 @@ pub struct ParquetFileMetrics {
6768
pub page_index_rows_pruned: PruningMetrics,
6869
/// Total pages filtered or matched by parquet page index
6970
pub page_index_pages_pruned: PruningMetrics,
71+
/// Lazily registered counter for pages whose page-index pruning was skipped
72+
/// because the containing row group was fully matched by row-group statistics.
73+
///
74+
/// These pages are still scanned; only page-index predicate evaluation is skipped.
75+
page_index_pages_skipped_by_fully_matched: LazyParquetSummaryCount,
7076
/// Total time spent evaluating parquet page index filters
7177
pub page_index_eval_time: Time,
7278
/// Total time spent reading and parsing metadata from the footer
@@ -204,6 +210,12 @@ impl ParquetFileMetrics {
204210
row_pushdown_eval_time,
205211
page_index_rows_pruned,
206212
page_index_pages_pruned,
213+
page_index_pages_skipped_by_fully_matched: LazyParquetSummaryCount::new(
214+
"page_index_pages_skipped_by_fully_matched",
215+
partition,
216+
filename,
217+
metrics,
218+
),
207219
statistics_eval_time,
208220
bloom_filter_eval_time,
209221
page_index_eval_time,
@@ -213,4 +225,58 @@ impl ParquetFileMetrics {
213225
predicate_cache_records,
214226
}
215227
}
228+
229+
/// Record pages whose page-index pruning was skipped because the containing
230+
/// row group was fully matched by row-group statistics.
231+
pub(crate) fn add_page_index_pages_skipped_by_fully_matched(&self, n: usize) {
232+
if n == 0 {
233+
return;
234+
}
235+
236+
self.page_index_pages_skipped_by_fully_matched.add(n);
237+
}
238+
}
239+
240+
#[derive(Debug, Clone)]
241+
struct LazyParquetSummaryCount {
242+
inner: Arc<LazyParquetSummaryCountInner>,
243+
}
244+
245+
#[derive(Debug)]
246+
struct LazyParquetSummaryCountInner {
247+
count: OnceLock<Count>,
248+
name: &'static str,
249+
partition: usize,
250+
filename: String,
251+
metrics: ExecutionPlanMetricsSet,
252+
}
253+
254+
impl LazyParquetSummaryCount {
255+
fn new(
256+
name: &'static str,
257+
partition: usize,
258+
filename: &str,
259+
metrics: &ExecutionPlanMetricsSet,
260+
) -> Self {
261+
Self {
262+
inner: Arc::new(LazyParquetSummaryCountInner {
263+
count: OnceLock::new(),
264+
name,
265+
partition,
266+
filename: filename.to_string(),
267+
metrics: metrics.clone(),
268+
}),
269+
}
270+
}
271+
272+
fn add(&self, n: usize) {
273+
let count = self.inner.count.get_or_init(|| {
274+
MetricBuilder::new(&self.inner.metrics)
275+
.with_new_label("filename", self.inner.filename.clone())
276+
.with_type(MetricType::Summary)
277+
.with_category(MetricCategory::Rows)
278+
.counter(self.inner.name, self.inner.partition)
279+
});
280+
count.add(n);
281+
}
216282
}

datafusion/datasource-parquet/src/page_filter.rs

Lines changed: 61 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use datafusion_pruning::PruningPredicate;
3535

3636
use log::{debug, trace};
3737
use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
38+
use parquet::arrow::parquet_column;
3839
use parquet::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex};
3940
use parquet::file::page_index::offset_index::PageLocation;
4041
use parquet::schema::types::SchemaDescriptor;
@@ -193,21 +194,24 @@ impl PagePruningAccessPlanFilter {
193194
let mut total_pages_skip = 0;
194195
// track the total number of pages that should not be skipped
195196
let mut total_pages_select = 0;
196-
// track rows that were already proven fully matched at row group
197-
// level and therefore did not need page-index predicate evaluation
198-
let mut total_rows_fully_matched = 0;
197+
// track pages for which page-index pruning was skipped because the
198+
// containing row group was already proven fully matched by statistics
199+
let mut total_pages_skipped_by_fully_matched = 0;
199200

200201
// for each row group specified in the access plan
201202
let row_group_indexes = access_plan.row_group_indexes();
202203
for row_group_index in row_group_indexes {
203204
// Skip page pruning for fully matched row groups: all rows are
204205
// known to satisfy the predicate, so page-level pruning is wasted work.
205206
if access_plan.is_fully_matched(row_group_index) {
206-
// Page metrics count evaluated page-index pruning work; this
207-
// branch only records rows already proven fully matched.
208-
let row_count = groups[row_group_index].num_rows() as usize;
209-
total_select += row_count;
210-
total_rows_fully_matched += row_count;
207+
let page_count = fully_matched_page_count(
208+
row_group_index,
209+
page_index_predicates,
210+
arrow_schema,
211+
parquet_schema,
212+
parquet_metadata,
213+
);
214+
total_pages_skipped_by_fully_matched += page_count;
211215

212216
continue;
213217
}
@@ -218,10 +222,13 @@ impl PagePruningAccessPlanFilter {
218222
let mut matched_pages_in_group: Option<HashSet<usize>> = None;
219223

220224
for predicate in page_index_predicates {
221-
let column = predicate
222-
.required_columns()
223-
.single_column()
224-
.expect("Page pruning requires single column predicates");
225+
let Some(column) = predicate.required_columns().single_column() else {
226+
debug!(
227+
"Ignoring multi-column page pruning predicate: {:?}",
228+
predicate.predicate_expr()
229+
);
230+
continue;
231+
};
225232

226233
let converter = StatisticsConverter::try_new(
227234
column.name(),
@@ -318,15 +325,15 @@ impl PagePruningAccessPlanFilter {
318325
file_metrics
319326
.page_index_rows_pruned
320327
.add_matched(total_select);
321-
file_metrics
322-
.page_index_rows_pruned
323-
.add_fully_matched(total_rows_fully_matched);
324328
file_metrics
325329
.page_index_pages_pruned
326330
.add_pruned(total_pages_skip);
327331
file_metrics
328332
.page_index_pages_pruned
329333
.add_matched(total_pages_select);
334+
file_metrics.add_page_index_pages_skipped_by_fully_matched(
335+
total_pages_skipped_by_fully_matched,
336+
);
330337
access_plan
331338
}
332339

@@ -346,6 +353,45 @@ fn update_selection(
346353
}
347354
}
348355

356+
/// Returns the number of pages for which page-index pruning is skipped because
357+
/// the containing row group is fully matched by row-group statistics.
358+
fn fully_matched_page_count(
359+
row_group_index: usize,
360+
page_index_predicates: &[PruningPredicate],
361+
arrow_schema: &Schema,
362+
parquet_schema: &SchemaDescriptor,
363+
parquet_metadata: &ParquetMetaData,
364+
) -> usize {
365+
let Some(offset_index) = parquet_metadata.offset_index() else {
366+
return 0;
367+
};
368+
369+
let Some(row_group_offsets) = offset_index.get(row_group_index) else {
370+
return 0;
371+
};
372+
373+
for predicate in page_index_predicates {
374+
let Some(column) = predicate.required_columns().single_column() else {
375+
continue;
376+
};
377+
378+
let Some((parquet_column_index, _)) =
379+
parquet_column(parquet_schema, arrow_schema, column.name())
380+
else {
381+
continue;
382+
};
383+
384+
let Some(offset_index_metadata) = row_group_offsets.get(parquet_column_index)
385+
else {
386+
continue;
387+
};
388+
389+
return offset_index_metadata.page_locations().len();
390+
}
391+
392+
0
393+
}
394+
349395
/// Returns a [`RowSelection`] for the rows in this row group to scan, in addition to a vec of
350396
/// booleans that state if each page was matched (true) or not (false).
351397
///

datafusion/datasource-parquet/src/row_group_filter.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,11 @@ impl RowGroupAccessPlanFilter {
7474
self.access_plan
7575
}
7676

77+
/// Returns the is_fully_matched vector.
78+
pub fn is_fully_matched(&self) -> &Vec<bool> {
79+
self.access_plan.fully_matched()
80+
}
81+
7782
/// Prunes the access plan based on the limit and fully contained row groups.
7883
///
7984
/// The pruning works by leveraging the concept of fully matched row groups. Consider a query like:

datafusion/physical-expr-common/src/metrics/value.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1010,7 +1010,10 @@ impl MetricValue {
10101010
Self::SpilledBytes(_) => 11,
10111011
Self::SpilledRows(_) => 12,
10121012
Self::CurrentMemoryUsage(_) => 13,
1013-
Self::Count { .. } => 14,
1013+
Self::Count { name, .. } => match name.as_ref() {
1014+
"page_index_pages_skipped_by_fully_matched" => 8,
1015+
_ => 14,
1016+
},
10141017
Self::Gauge { .. } => 15,
10151018
Self::Time { .. } => 16,
10161019
Self::Ratio { .. } => 17,

datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ Plan with Metrics
104104
03)----ProjectionExec: expr=[id@0 as id, value@1 as v, value@1 + id@0 as name], metrics=[output_rows=10, <slt:ignore>]
105105
04)------FilterExec: value@1 > 3, metrics=[output_rows=10, <slt:ignore>, selectivity=100% (10/10)]
106106
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, metrics=[output_rows=10, <slt:ignore>]
107-
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/test_data.parquet]]}, projection=[id, value], file_type=parquet, predicate=value@1 > 3 AND DynamicFilter [ value@1 IS NULL OR value@1 > 800 ], pruning_predicate=value_null_count@1 != row_count@2 AND value_max@0 > 3 AND (value_null_count@1 > 0 OR value_null_count@1 != row_count@2 AND value_max@0 > 800), required_guarantees=[], metrics=[output_rows=10, elapsed_compute=<slt:ignore>, output_bytes=80.0 B, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched -> 1 fully matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=210, metadata_load_time=<slt:ignore>, scan_efficiency_ratio=18.31% (210/1.15 K)]
107+
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/test_data.parquet]]}, projection=[id, value], file_type=parquet, predicate=value@1 > 3 AND DynamicFilter [ value@1 IS NULL OR value@1 > 800 ], pruning_predicate=value_null_count@1 != row_count@2 AND value_max@0 > 3 AND (value_null_count@1 > 0 OR value_null_count@1 != row_count@2 AND value_max@0 > 800), required_guarantees=[], metrics=[output_rows=10, elapsed_compute=<slt:ignore>, output_bytes=80.0 B, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched -> 1 fully matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_pages_skipped_by_fully_matched=1, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=210, metadata_load_time=<slt:ignore>, scan_efficiency_ratio=18.31% (210/1.15 K)]
108108

109109
statement ok
110110
set datafusion.explain.analyze_level = dev;

datafusion/sqllogictest/test_files/explain_analyze.slt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ explain analyze select * from cat_tracking where species > 'M' AND s >= 50 order
247247
----
248248
Plan with Metrics
249249
01)SortExec: TopK(fetch=3), expr=[species@0 ASC NULLS LAST], preserve_partitioning=[false], filter=[species@0 < Nlpine Sheep], metrics=[output_rows=3]
250-
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/explain_analyze/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=2 total → 2 matched, limit_pruned_row_groups=0 total → 0 matched, scan_efficiency_ratio=22.13% (521/2.35 K)]
250+
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/explain_analyze/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=2 total → 2 matched, page_index_pages_skipped_by_fully_matched=1, limit_pruned_row_groups=0 total → 0 matched, scan_efficiency_ratio=22.13% (521/2.35 K)]
251251

252252
statement ok
253253
reset datafusion.explain.analyze_categories;
@@ -262,7 +262,7 @@ explain analyze select * from cat_tracking where species > 'M' AND s >= 50 order
262262
----
263263
Plan with Metrics
264264
01)SortExec: TopK(fetch=3), expr=[species@0 ASC NULLS LAST], preserve_partitioning=[false], filter=[species@0 < Nlpine Sheep], metrics=[output_rows=3, output_bytes=<slt:ignore>]
265-
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/explain_analyze/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, output_bytes=<slt:ignore>, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=2 total → 2 matched, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=<slt:ignore>, scan_efficiency_ratio=<slt:ignore>]
265+
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/explain_analyze/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, output_bytes=<slt:ignore>, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=2 total → 2 matched, page_index_pages_skipped_by_fully_matched=1, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=<slt:ignore>, scan_efficiency_ratio=<slt:ignore>]
266266

267267
statement ok
268268
reset datafusion.explain.analyze_categories;
@@ -277,7 +277,7 @@ explain analyze select * from cat_tracking where species > 'M' AND s >= 50 order
277277
----
278278
Plan with Metrics
279279
01)SortExec: TopK(fetch=3), expr=[species@0 ASC NULLS LAST], preserve_partitioning=[false], filter=[species@0 < Nlpine Sheep], metrics=[output_rows=3, output_bytes=<slt:ignore>]
280-
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/explain_analyze/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, output_bytes=<slt:ignore>, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=2 total → 2 matched, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=<slt:ignore>, scan_efficiency_ratio=<slt:ignore>]
280+
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/explain_analyze/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, output_bytes=<slt:ignore>, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=2 total → 2 matched, page_index_pages_skipped_by_fully_matched=1, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=<slt:ignore>, scan_efficiency_ratio=<slt:ignore>]
281281

282282
statement ok
283283
reset datafusion.explain.analyze_categories;

0 commit comments

Comments
 (0)