From e5905e20fcdc2b4d4c75fcf247120f4cc3943970 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Wed, 8 Apr 2026 21:43:18 +0200 Subject: [PATCH 1/3] Conditionally build page pruning predicates Page pruning predicates in the Parquet opener are constructed regardless of whether enable_page_index is set. Under high query load, this uses significant CPU time although these predicates are created and discarded quickly. This commit reorders the predicate creation flow to only construct page pruning predicates if enable_page_index is enabled. Regular predicates are created always as before. --- datafusion/datasource-parquet/src/opener.rs | 43 +++++++-------------- datafusion/datasource-parquet/src/source.rs | 8 ++-- 2 files changed, 19 insertions(+), 32 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 6621706c35c81..43482ac7872e8 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -772,12 +772,22 @@ impl MetadataLoadedParquetOpen { prepared.physical_file_schema = Arc::clone(&physical_file_schema); // Build predicates for this specific file - let (pruning_predicate, page_pruning_predicate) = build_pruning_predicates( + let pruning_predicate = build_pruning_predicates( prepared.predicate.as_ref(), &physical_file_schema, &prepared.predicate_creation_errors, ); + // Only build page pruning predicate if page index is enabled + let page_pruning_predicate = if prepared.enable_page_index { + prepared.predicate.as_ref().and_then(|predicate| { + let p = build_page_pruning_predicate(predicate, &physical_file_schema); + (p.filter_number() > 0).then_some(p) + }) + } else { + None + }; + Ok(FiltersPreparedParquetOpen { loaded: MetadataLoadedParquetOpen { prepared, @@ -797,10 +807,7 @@ impl FiltersPreparedParquetOpen { // metadata load above may not have read the page index structures yet. // If we need them for reading and they aren't yet loaded, we need to // load them now. - if should_enable_page_index( - self.loaded.prepared.enable_page_index, - &self.page_pruning_predicate, - ) { + if self.page_pruning_predicate.is_some() { self.loaded.reader_metadata = load_page_index( self.loaded.reader_metadata, &mut self.loaded.prepared.async_file_reader, @@ -1513,20 +1520,11 @@ pub(crate) fn build_pruning_predicates( predicate: Option<&Arc>, file_schema: &SchemaRef, predicate_creation_errors: &Count, -) -> ( - Option>, - Option>, -) { +) -> Option> { let Some(predicate) = predicate.as_ref() else { - return (None, None); + return None; }; - let pruning_predicate = build_pruning_predicate( - Arc::clone(predicate), - file_schema, - predicate_creation_errors, - ); - let page_pruning_predicate = build_page_pruning_predicate(predicate, file_schema); - (pruning_predicate, Some(page_pruning_predicate)) + build_pruning_predicate(Arc::clone(predicate), file_schema, predicate_creation_errors) } /// Returns a `ArrowReaderMetadata` with the page index loaded, loading @@ -1560,17 +1558,6 @@ async fn load_page_index( } } -fn should_enable_page_index( - enable_page_index: bool, - page_pruning_predicate: &Option>, -) -> bool { - enable_page_index - && page_pruning_predicate.is_some() - && page_pruning_predicate - .as_ref() - .map(|p| p.filter_number() > 0) - .unwrap_or(false) -} #[cfg(test)] mod test { diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 3a64137a2a3f8..d6ddd56e67685 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -627,17 +627,17 @@ impl FileSource for ParquetSource { write!(f, ", reverse_row_groups=true")?; } - // Try to build a the pruning predicates. + // Try to build the pruning predicates. // These are only generated here because it's useful to have *some* // idea of what pushdown is happening when viewing plans. - // However it is important to note that these predicates are *not* + // However, it is important to note that these predicates are *not* // necessarily the predicates that are actually evaluated: // the actual predicates are built in reference to the physical schema of // each file, which we do not have at this point and hence cannot use. - // Instead we use the logical schema of the file (the table schema without partition columns). + // Instead, we use the logical schema of the file (the table schema without partition columns). if let Some(predicate) = &self.predicate { let predicate_creation_errors = Count::new(); - if let (Some(pruning_predicate), _) = build_pruning_predicates( + if let Some(pruning_predicate) = build_pruning_predicates( Some(predicate), self.table_schema.table_schema(), &predicate_creation_errors, From 5bfde8f8b6dfe840012cf9316b0d91678114a970 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Thu, 9 Apr 2026 20:21:38 +0200 Subject: [PATCH 2/3] Add test and fix CI --- datafusion/datasource-parquet/src/opener.rs | 79 +++++++++++++++++++-- 1 file changed, 74 insertions(+), 5 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 43482ac7872e8..d835165201b30 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -1521,10 +1521,12 @@ pub(crate) fn build_pruning_predicates( file_schema: &SchemaRef, predicate_creation_errors: &Count, ) -> Option> { - let Some(predicate) = predicate.as_ref() else { - return None; - }; - build_pruning_predicate(Arc::clone(predicate), file_schema, predicate_creation_errors) + let predicate = predicate.as_ref()?; + build_pruning_predicate( + Arc::clone(predicate), + file_schema, + predicate_creation_errors, + ) } /// Returns a `ArrowReaderMetadata` with the page index loaded, loading @@ -1558,7 +1560,6 @@ async fn load_page_index( } } - #[cfg(test)] mod test { use std::sync::Arc; @@ -2551,4 +2552,72 @@ mod test { "Reverse scan with non-contiguous row groups should correctly map RowSelection" ); } + + /// Test that page pruning predicates are only built and applied when `enable_page_index` is true. + /// + /// The file has a single row group with 10 pages (10 rows each, values 1..100). + /// With page index enabled, pages whose max value <= 90 are pruned, returning only + /// the last page (rows 91..100). With page index disabled, all 100 rows are returned + /// since neither pushdown nor row-group pruning is active. + #[tokio::test] + async fn test_page_pruning_predicate_respects_enable_page_index() { + use parquet::file::properties::WriterProperties; + + let store = Arc::new(InMemory::new()) as Arc; + + // 100 rows with values 1..=100, written as a single row group with 10 rows per page + let values: Vec = (1..=100).collect(); + let batch = record_batch!(( + "a", + Int32, + values.iter().map(|v| Some(*v)).collect::>() + )) + .unwrap(); + let props = WriterProperties::builder() + .set_data_page_row_count_limit(10) + .set_write_batch_size(10) + .build(); + let schema = batch.schema(); + let data_size = write_parquet_batches( + Arc::clone(&store), + "test.parquet", + vec![batch], + Some(props), + ) + .await; + + let file = PartitionedFile::new("test.parquet".to_string(), data_size as u64); + + // predicate: a > 90 — should allow page index to prune first 9 pages + let predicate = logical2physical(&col("a").gt(lit(90i32)), &schema); + + let make_opener = |enable_page_index| { + let mut opener = ParquetOpenerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(Arc::clone(&schema)) + .with_predicate(Arc::clone(&predicate)) + // disable pushdown and row-group pruning so the only pruning path is page index + .with_pushdown_filters(false) + .with_row_group_stats_pruning(false) + .build(); + opener.enable_page_index = enable_page_index; + opener + }; + let (_, rows_with_page_index) = count_batches_and_rows( + make_opener(true).open(file.clone()).unwrap().await.unwrap(), + ) + .await; + let (_, rows_without_page_index) = + count_batches_and_rows(make_opener(false).open(file).unwrap().await.unwrap()) + .await; + + assert_eq!( + rows_with_page_index, 10, + "page index should prune 9 of 10 pages" + ); + assert_eq!( + rows_without_page_index, 100, + "without page index all rows are returned" + ); + } } From 590a5178c8ffb17873f612a9c1da234fc1a18ff3 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Thu, 9 Apr 2026 20:39:00 +0200 Subject: [PATCH 3/3] Add builder method --- datafusion/datasource-parquet/src/opener.rs | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index d835165201b30..0ea0cae189d84 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -1689,6 +1689,12 @@ mod test { self } + /// Enable page index. + fn with_enable_page_index(mut self, enable: bool) -> Self { + self.enable_page_index = enable; + self + } + /// Set reverse row groups flag. fn with_reverse_row_groups(mut self, enable: bool) -> Self { self.reverse_row_groups = enable; @@ -2592,16 +2598,15 @@ mod test { let predicate = logical2physical(&col("a").gt(lit(90i32)), &schema); let make_opener = |enable_page_index| { - let mut opener = ParquetOpenerBuilder::new() + ParquetOpenerBuilder::new() .with_store(Arc::clone(&store)) .with_schema(Arc::clone(&schema)) .with_predicate(Arc::clone(&predicate)) + .with_enable_page_index(enable_page_index) // disable pushdown and row-group pruning so the only pruning path is page index .with_pushdown_filters(false) .with_row_group_stats_pruning(false) - .build(); - opener.enable_page_index = enable_page_index; - opener + .build() }; let (_, rows_with_page_index) = count_batches_and_rows( make_opener(true).open(file.clone()).unwrap().await.unwrap(),