Skip to content

Commit cfcc878

Browse files
committed
Conditionally build page pruning predicates
Page pruning predicates in the Parquet opener are constructed regardless of whether enable_page_index is set. Under high query load, this uses significant CPU time although these predicates are created and discarded quickly. This commit reorders the predicate creation flow to only construct page pruning predicates if enable_page_index is enabled. Regular predicates are created always as before.
1 parent 1629420 commit cfcc878

File tree

2 files changed

+28
-33
lines changed

2 files changed

+28
-33
lines changed

datafusion/datasource-parquet/src/opener.rs

Lines changed: 24 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ impl FileOpener for ParquetOpener {
285285
}
286286

287287
// Build predicates for this specific file
288-
let (pruning_predicate, page_pruning_predicate) = build_pruning_predicates(
288+
let pruning_predicate = build_pruning_predicates(
289289
predicate.as_ref(),
290290
&predicate_file_schema,
291291
&predicate_creation_errors,
@@ -294,15 +294,26 @@ impl FileOpener for ParquetOpener {
294294
// The page index is not stored inline in the parquet footer so the
295295
// code above may not have read the page index structures yet. If we
296296
// need them for reading and they aren't yet loaded, we need to load them now.
297-
if should_enable_page_index(enable_page_index, &page_pruning_predicate) {
298-
reader_metadata = load_page_index(
299-
reader_metadata,
300-
&mut async_file_reader,
301-
// Since we're manually loading the page index the option here should not matter but we pass it in for consistency
302-
options.with_page_index(true),
303-
)
304-
.await?;
305-
}
297+
let page_pruning_predicate = match (enable_page_index, &predicate) {
298+
(true, Some(predicate)) => {
299+
let page_pruning_predicate =
300+
build_page_pruning_predicate(&predicate, &predicate_file_schema);
301+
if page_pruning_predicate.filter_number() > 0 {
302+
reader_metadata = load_page_index(
303+
reader_metadata,
304+
&mut async_file_reader,
305+
// Since we're manually loading the page index the option here should not matter but we pass it in for consistency
306+
options.with_page_index(true),
307+
)
308+
.await?;
309+
310+
Some(page_pruning_predicate)
311+
} else {
312+
None
313+
}
314+
}
315+
_ => None,
316+
};
306317

307318
metadata_timer.stop();
308319

@@ -666,20 +677,16 @@ pub(crate) fn build_pruning_predicates(
666677
predicate: Option<&Arc<dyn PhysicalExpr>>,
667678
file_schema: &SchemaRef,
668679
predicate_creation_errors: &Count,
669-
) -> (
670-
Option<Arc<PruningPredicate>>,
671-
Option<Arc<PagePruningAccessPlanFilter>>,
672-
) {
680+
) -> Option<Arc<PruningPredicate>> {
673681
let Some(predicate) = predicate.as_ref() else {
674-
return (None, None);
682+
return None;
675683
};
676684
let pruning_predicate = build_pruning_predicate(
677685
Arc::clone(predicate),
678686
file_schema,
679687
predicate_creation_errors,
680688
);
681-
let page_pruning_predicate = build_page_pruning_predicate(predicate, file_schema);
682-
(pruning_predicate, Some(page_pruning_predicate))
689+
pruning_predicate
683690
}
684691

685692
/// Returns a `ArrowReaderMetadata` with the page index loaded, loading
@@ -713,18 +720,6 @@ async fn load_page_index<T: AsyncFileReader>(
713720
}
714721
}
715722

716-
fn should_enable_page_index(
717-
enable_page_index: bool,
718-
page_pruning_predicate: &Option<Arc<PagePruningAccessPlanFilter>>,
719-
) -> bool {
720-
enable_page_index
721-
&& page_pruning_predicate.is_some()
722-
&& page_pruning_predicate
723-
.as_ref()
724-
.map(|p| p.filter_number() > 0)
725-
.unwrap_or(false)
726-
}
727-
728723
#[cfg(test)]
729724
mod test {
730725
use std::sync::Arc;

datafusion/datasource-parquet/src/source.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -659,19 +659,19 @@ impl FileSource for ParquetSource {
659659

660660
write!(f, "{predicate_string}")?;
661661

662-
// Try to build a the pruning predicates.
662+
// Try to build the pruning predicates.
663663
// These are only generated here because it's useful to have *some*
664664
// idea of what pushdown is happening when viewing plans.
665-
// However it is important to note that these predicates are *not*
665+
// However, it is important to note that these predicates are *not*
666666
// necessarily the predicates that are actually evaluated:
667667
// the actual predicates are built in reference to the physical schema of
668668
// each file, which we do not have at this point and hence cannot use.
669-
// Instead we use the logical schema of the file (the table schema without partition columns).
669+
// Instead, we use the logical schema of the file (the table schema without partition columns).
670670
if let (Some(file_schema), Some(predicate)) =
671671
(&self.file_schema, &self.predicate)
672672
{
673673
let predicate_creation_errors = Count::new();
674-
if let (Some(pruning_predicate), _) = build_pruning_predicates(
674+
if let Some(pruning_predicate) = build_pruning_predicates(
675675
Some(predicate),
676676
file_schema,
677677
&predicate_creation_errors,

0 commit comments

Comments
 (0)