diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 6621706c35c8..0ea0cae189d8 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -772,12 +772,22 @@ impl MetadataLoadedParquetOpen { prepared.physical_file_schema = Arc::clone(&physical_file_schema); // Build predicates for this specific file - let (pruning_predicate, page_pruning_predicate) = build_pruning_predicates( + let pruning_predicate = build_pruning_predicates( prepared.predicate.as_ref(), &physical_file_schema, &prepared.predicate_creation_errors, ); + // Only build page pruning predicate if page index is enabled + let page_pruning_predicate = if prepared.enable_page_index { + prepared.predicate.as_ref().and_then(|predicate| { + let p = build_page_pruning_predicate(predicate, &physical_file_schema); + (p.filter_number() > 0).then_some(p) + }) + } else { + None + }; + Ok(FiltersPreparedParquetOpen { loaded: MetadataLoadedParquetOpen { prepared, @@ -797,10 +807,7 @@ impl FiltersPreparedParquetOpen { // metadata load above may not have read the page index structures yet. // If we need them for reading and they aren't yet loaded, we need to // load them now. - if should_enable_page_index( - self.loaded.prepared.enable_page_index, - &self.page_pruning_predicate, - ) { + if self.page_pruning_predicate.is_some() { self.loaded.reader_metadata = load_page_index( self.loaded.reader_metadata, &mut self.loaded.prepared.async_file_reader, @@ -1513,20 +1520,13 @@ pub(crate) fn build_pruning_predicates( predicate: Option<&Arc>, file_schema: &SchemaRef, predicate_creation_errors: &Count, -) -> ( - Option>, - Option>, -) { - let Some(predicate) = predicate.as_ref() else { - return (None, None); - }; - let pruning_predicate = build_pruning_predicate( +) -> Option> { + let predicate = predicate.as_ref()?; + build_pruning_predicate( Arc::clone(predicate), file_schema, predicate_creation_errors, - ); - let page_pruning_predicate = build_page_pruning_predicate(predicate, file_schema); - (pruning_predicate, Some(page_pruning_predicate)) + ) } /// Returns a `ArrowReaderMetadata` with the page index loaded, loading @@ -1560,18 +1560,6 @@ async fn load_page_index( } } -fn should_enable_page_index( - enable_page_index: bool, - page_pruning_predicate: &Option>, -) -> bool { - enable_page_index - && page_pruning_predicate.is_some() - && page_pruning_predicate - .as_ref() - .map(|p| p.filter_number() > 0) - .unwrap_or(false) -} - #[cfg(test)] mod test { use std::sync::Arc; @@ -1701,6 +1689,12 @@ mod test { self } + /// Enable page index. + fn with_enable_page_index(mut self, enable: bool) -> Self { + self.enable_page_index = enable; + self + } + /// Set reverse row groups flag. fn with_reverse_row_groups(mut self, enable: bool) -> Self { self.reverse_row_groups = enable; @@ -2564,4 +2558,71 @@ mod test { "Reverse scan with non-contiguous row groups should correctly map RowSelection" ); } + + /// Test that page pruning predicates are only built and applied when `enable_page_index` is true. + /// + /// The file has a single row group with 10 pages (10 rows each, values 1..100). + /// With page index enabled, pages whose max value <= 90 are pruned, returning only + /// the last page (rows 91..100). With page index disabled, all 100 rows are returned + /// since neither pushdown nor row-group pruning is active. + #[tokio::test] + async fn test_page_pruning_predicate_respects_enable_page_index() { + use parquet::file::properties::WriterProperties; + + let store = Arc::new(InMemory::new()) as Arc; + + // 100 rows with values 1..=100, written as a single row group with 10 rows per page + let values: Vec = (1..=100).collect(); + let batch = record_batch!(( + "a", + Int32, + values.iter().map(|v| Some(*v)).collect::>() + )) + .unwrap(); + let props = WriterProperties::builder() + .set_data_page_row_count_limit(10) + .set_write_batch_size(10) + .build(); + let schema = batch.schema(); + let data_size = write_parquet_batches( + Arc::clone(&store), + "test.parquet", + vec![batch], + Some(props), + ) + .await; + + let file = PartitionedFile::new("test.parquet".to_string(), data_size as u64); + + // predicate: a > 90 — should allow page index to prune first 9 pages + let predicate = logical2physical(&col("a").gt(lit(90i32)), &schema); + + let make_opener = |enable_page_index| { + ParquetOpenerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(Arc::clone(&schema)) + .with_predicate(Arc::clone(&predicate)) + .with_enable_page_index(enable_page_index) + // disable pushdown and row-group pruning so the only pruning path is page index + .with_pushdown_filters(false) + .with_row_group_stats_pruning(false) + .build() + }; + let (_, rows_with_page_index) = count_batches_and_rows( + make_opener(true).open(file.clone()).unwrap().await.unwrap(), + ) + .await; + let (_, rows_without_page_index) = + count_batches_and_rows(make_opener(false).open(file).unwrap().await.unwrap()) + .await; + + assert_eq!( + rows_with_page_index, 10, + "page index should prune 9 of 10 pages" + ); + assert_eq!( + rows_without_page_index, 100, + "without page index all rows are returned" + ); + } } diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 3a64137a2a3f..d6ddd56e6768 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -627,17 +627,17 @@ impl FileSource for ParquetSource { write!(f, ", reverse_row_groups=true")?; } - // Try to build a the pruning predicates. + // Try to build the pruning predicates. // These are only generated here because it's useful to have *some* // idea of what pushdown is happening when viewing plans. - // However it is important to note that these predicates are *not* + // However, it is important to note that these predicates are *not* // necessarily the predicates that are actually evaluated: // the actual predicates are built in reference to the physical schema of // each file, which we do not have at this point and hence cannot use. - // Instead we use the logical schema of the file (the table schema without partition columns). + // Instead, we use the logical schema of the file (the table schema without partition columns). if let Some(predicate) = &self.predicate { let predicate_creation_errors = Count::new(); - if let (Some(pruning_predicate), _) = build_pruning_predicates( + if let Some(pruning_predicate) = build_pruning_predicates( Some(predicate), self.table_schema.table_schema(), &predicate_creation_errors,