diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 1b02c4ae25d3..1b94df5b4104 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -139,6 +139,12 @@ pub struct ArrowReaderBuilder { pub(crate) metrics: ArrowReaderMetrics, pub(crate) max_predicate_cache_size: usize, + + /// Row groups where ALL rows are known to match the filter predicate. + /// + /// For these row groups, the [`RowFilter`] evaluation is skipped entirely + /// since the predicate is guaranteed to be true for every row. + pub(crate) fully_matched_row_groups: Option>, } impl Debug for ArrowReaderBuilder { @@ -178,6 +184,7 @@ impl ArrowReaderBuilder { offset: None, metrics: ArrowReaderMetrics::Disabled, max_predicate_cache_size: 100 * 1024 * 1024, // 100MB default cache size + fully_matched_row_groups: None, } } @@ -344,6 +351,28 @@ impl ArrowReaderBuilder { } } + /// Specify row groups where ALL rows are known to match the filter predicate. + /// + /// For these row groups, the [`RowFilter`] evaluation (set via + /// [`Self::with_row_filter`]) is skipped entirely since the predicate is + /// guaranteed to be `true` for every row. This avoids the cost of decoding + /// filter columns and evaluating the predicate expression for those row + /// groups. + /// + /// This is typically determined by evaluating row group statistics: if the + /// statistics prove that all rows satisfy the predicate, the row group is + /// "fully matched." + /// + /// The provided indices must be a subset of the row groups specified via + /// [`Self::with_row_groups`] (or all row groups if none were specified). + /// Indices not present in the row group list are ignored. + pub fn with_fully_matched_row_groups(self, row_groups: Vec) -> Self { + Self { + fully_matched_row_groups: Some(row_groups), + ..self + } + } + /// Provide a limit to the number of rows to be read /// /// The limit will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`] @@ -1188,6 +1217,8 @@ impl ParquetRecordBatchReaderBuilder { metrics, // Not used for the sync reader, see https://github.com/apache/arrow-rs/issues/8000 max_predicate_cache_size: _, + // Not used for the sync reader (single row group per reader) + fully_matched_row_groups: _, } = self; // Try to avoid allocate large buffer diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 9e45a0c3168c..90e02fa03570 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -497,6 +497,7 @@ impl ParquetRecordBatchStreamBuilder { offset, metrics, max_predicate_cache_size, + fully_matched_row_groups, } = self; // Ensure schema of ParquetRecordBatchStream respects projection, and does @@ -522,6 +523,7 @@ impl ParquetRecordBatchStreamBuilder { offset, metrics, max_predicate_cache_size, + fully_matched_row_groups, } .build()?; diff --git a/parquet/src/arrow/push_decoder/mod.rs b/parquet/src/arrow/push_decoder/mod.rs index 24384471a4ec..8ba2870bfc85 100644 --- a/parquet/src/arrow/push_decoder/mod.rs +++ b/parquet/src/arrow/push_decoder/mod.rs @@ -176,6 +176,7 @@ impl ParquetPushDecoderBuilder { metrics, row_selection_policy, max_predicate_cache_size, + fully_matched_row_groups, } = self; // If no row groups were specified, read all of them @@ -197,6 +198,7 @@ impl ParquetPushDecoderBuilder { max_predicate_cache_size, buffers, row_selection_policy, + fully_matched_row_groups, ); // Initialize the decoder with the configured options diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs b/parquet/src/arrow/push_decoder/reader_builder/mod.rs index 922d8070c064..ca798d387d39 100644 --- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs +++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs @@ -38,6 +38,7 @@ use bytes::Bytes; use data::DataRequest; use filter::AdvanceResult; use filter::FilterInfo; +use std::collections::HashSet; use std::ops::Range; use std::sync::{Arc, RwLock}; @@ -160,6 +161,10 @@ pub(crate) struct RowGroupReaderBuilder { /// Strategy for materialising row selections row_selection_policy: RowSelectionPolicy, + /// Row groups where ALL rows are known to match the filter predicate. + /// For these row groups, filter evaluation is skipped entirely. + fully_matched_row_groups: HashSet, + /// Current state of the decoder. /// /// It is taken when processing, and must be put back before returning @@ -185,6 +190,7 @@ impl RowGroupReaderBuilder { max_predicate_cache_size: usize, buffers: PushBuffers, row_selection_policy: RowSelectionPolicy, + fully_matched_row_groups: Option>, ) -> Self { Self { batch_size, @@ -197,6 +203,9 @@ impl RowGroupReaderBuilder { metrics, max_predicate_cache_size, row_selection_policy, + fully_matched_row_groups: fully_matched_row_groups + .map(|v| v.into_iter().collect()) + .unwrap_or_default(), state: Some(RowGroupDecoderState::Finished), buffers, } @@ -328,6 +337,22 @@ impl RowGroupReaderBuilder { })); }; + // Skip filter for fully matched row groups: all rows are known + // to satisfy the predicate based on row group statistics, so + // evaluating the filter would be wasted work. + if self + .fully_matched_row_groups + .contains(&row_group_info.row_group_idx) + { + // Put the filter back for subsequent non-fully-matched row groups + self.filter = Some(filter); + return Ok(NextState::again(RowGroupDecoderState::StartData { + row_group_info, + column_chunks, + cache_info: None, + })); + } + // we have predicates to evaluate let cache_projection = self.compute_cache_projection(row_group_info.row_group_idx, &filter);