From 76c028492c13c9319ca189416a84b0dd132e26ca Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Sun, 12 Apr 2026 22:52:31 +0800 Subject: [PATCH] feat(parquet): add with_fully_matched_row_groups to skip filter for fully matched row groups When row group statistics prove that all rows in a row group satisfy the filter predicate, the RowFilter evaluation can be skipped entirely for those row groups. This avoids the cost of decoding filter columns and evaluating the predicate expression. Adds `with_fully_matched_row_groups(Vec)` to ArrowReaderBuilder which flows through to RowGroupReaderBuilder. When processing a fully matched row group, the Start state transitions directly to StartData, bypassing all filter evaluation. Co-Authored-By: Claude Opus 4.6 --- parquet/src/arrow/arrow_reader/mod.rs | 31 +++++++++++++++++++ parquet/src/arrow/async_reader/mod.rs | 2 ++ parquet/src/arrow/push_decoder/mod.rs | 2 ++ .../arrow/push_decoder/reader_builder/mod.rs | 25 +++++++++++++++ 4 files changed, 60 insertions(+) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 1b02c4ae25d3..1b94df5b4104 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -139,6 +139,12 @@ pub struct ArrowReaderBuilder { pub(crate) metrics: ArrowReaderMetrics, pub(crate) max_predicate_cache_size: usize, + + /// Row groups where ALL rows are known to match the filter predicate. + /// + /// For these row groups, the [`RowFilter`] evaluation is skipped entirely + /// since the predicate is guaranteed to be true for every row. + pub(crate) fully_matched_row_groups: Option>, } impl Debug for ArrowReaderBuilder { @@ -178,6 +184,7 @@ impl ArrowReaderBuilder { offset: None, metrics: ArrowReaderMetrics::Disabled, max_predicate_cache_size: 100 * 1024 * 1024, // 100MB default cache size + fully_matched_row_groups: None, } } @@ -344,6 +351,28 @@ impl ArrowReaderBuilder { } } + /// Specify row groups where ALL rows are known to match the filter predicate. + /// + /// For these row groups, the [`RowFilter`] evaluation (set via + /// [`Self::with_row_filter`]) is skipped entirely since the predicate is + /// guaranteed to be `true` for every row. This avoids the cost of decoding + /// filter columns and evaluating the predicate expression for those row + /// groups. + /// + /// This is typically determined by evaluating row group statistics: if the + /// statistics prove that all rows satisfy the predicate, the row group is + /// "fully matched." + /// + /// The provided indices must be a subset of the row groups specified via + /// [`Self::with_row_groups`] (or all row groups if none were specified). + /// Indices not present in the row group list are ignored. + pub fn with_fully_matched_row_groups(self, row_groups: Vec) -> Self { + Self { + fully_matched_row_groups: Some(row_groups), + ..self + } + } + /// Provide a limit to the number of rows to be read /// /// The limit will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`] @@ -1188,6 +1217,8 @@ impl ParquetRecordBatchReaderBuilder { metrics, // Not used for the sync reader, see https://github.com/apache/arrow-rs/issues/8000 max_predicate_cache_size: _, + // Not used for the sync reader (single row group per reader) + fully_matched_row_groups: _, } = self; // Try to avoid allocate large buffer diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 9e45a0c3168c..90e02fa03570 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -497,6 +497,7 @@ impl ParquetRecordBatchStreamBuilder { offset, metrics, max_predicate_cache_size, + fully_matched_row_groups, } = self; // Ensure schema of ParquetRecordBatchStream respects projection, and does @@ -522,6 +523,7 @@ impl ParquetRecordBatchStreamBuilder { offset, metrics, max_predicate_cache_size, + fully_matched_row_groups, } .build()?; diff --git a/parquet/src/arrow/push_decoder/mod.rs b/parquet/src/arrow/push_decoder/mod.rs index 24384471a4ec..8ba2870bfc85 100644 --- a/parquet/src/arrow/push_decoder/mod.rs +++ b/parquet/src/arrow/push_decoder/mod.rs @@ -176,6 +176,7 @@ impl ParquetPushDecoderBuilder { metrics, row_selection_policy, max_predicate_cache_size, + fully_matched_row_groups, } = self; // If no row groups were specified, read all of them @@ -197,6 +198,7 @@ impl ParquetPushDecoderBuilder { max_predicate_cache_size, buffers, row_selection_policy, + fully_matched_row_groups, ); // Initialize the decoder with the configured options diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs b/parquet/src/arrow/push_decoder/reader_builder/mod.rs index 922d8070c064..ca798d387d39 100644 --- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs +++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs @@ -38,6 +38,7 @@ use bytes::Bytes; use data::DataRequest; use filter::AdvanceResult; use filter::FilterInfo; +use std::collections::HashSet; use std::ops::Range; use std::sync::{Arc, RwLock}; @@ -160,6 +161,10 @@ pub(crate) struct RowGroupReaderBuilder { /// Strategy for materialising row selections row_selection_policy: RowSelectionPolicy, + /// Row groups where ALL rows are known to match the filter predicate. + /// For these row groups, filter evaluation is skipped entirely. + fully_matched_row_groups: HashSet, + /// Current state of the decoder. /// /// It is taken when processing, and must be put back before returning @@ -185,6 +190,7 @@ impl RowGroupReaderBuilder { max_predicate_cache_size: usize, buffers: PushBuffers, row_selection_policy: RowSelectionPolicy, + fully_matched_row_groups: Option>, ) -> Self { Self { batch_size, @@ -197,6 +203,9 @@ impl RowGroupReaderBuilder { metrics, max_predicate_cache_size, row_selection_policy, + fully_matched_row_groups: fully_matched_row_groups + .map(|v| v.into_iter().collect()) + .unwrap_or_default(), state: Some(RowGroupDecoderState::Finished), buffers, } @@ -328,6 +337,22 @@ impl RowGroupReaderBuilder { })); }; + // Skip filter for fully matched row groups: all rows are known + // to satisfy the predicate based on row group statistics, so + // evaluating the filter would be wasted work. + if self + .fully_matched_row_groups + .contains(&row_group_info.row_group_idx) + { + // Put the filter back for subsequent non-fully-matched row groups + self.filter = Some(filter); + return Ok(NextState::again(RowGroupDecoderState::StartData { + row_group_info, + column_chunks, + cache_info: None, + })); + } + // we have predicates to evaluate let cache_projection = self.compute_cache_projection(row_group_info.row_group_idx, &filter);