Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,12 @@ pub struct ArrowReaderBuilder<T> {
pub(crate) metrics: ArrowReaderMetrics,

pub(crate) max_predicate_cache_size: usize,

/// Row groups where ALL rows are known to match the filter predicate.
///
/// For these row groups, the [`RowFilter`] evaluation is skipped entirely
/// since the predicate is guaranteed to be true for every row.
pub(crate) fully_matched_row_groups: Option<Vec<usize>>,
}

impl<T: Debug> Debug for ArrowReaderBuilder<T> {
Expand Down Expand Up @@ -178,6 +184,7 @@ impl<T> ArrowReaderBuilder<T> {
offset: None,
metrics: ArrowReaderMetrics::Disabled,
max_predicate_cache_size: 100 * 1024 * 1024, // 100MB default cache size
fully_matched_row_groups: None,
}
}

Expand Down Expand Up @@ -344,6 +351,28 @@ impl<T> ArrowReaderBuilder<T> {
}
}

/// Specify row groups where ALL rows are known to match the filter predicate.
///
/// For these row groups, the [`RowFilter`] evaluation (set via
/// [`Self::with_row_filter`]) is skipped entirely since the predicate is
/// guaranteed to be `true` for every row. This avoids the cost of decoding
/// filter columns and evaluating the predicate expression for those row
/// groups.
///
/// This is typically determined by evaluating row group statistics: if the
/// statistics prove that all rows satisfy the predicate, the row group is
/// "fully matched."
///
/// The provided indices must be a subset of the row groups specified via
/// [`Self::with_row_groups`] (or all row groups if none were specified).
/// Indices not present in the row group list are ignored.
pub fn with_fully_matched_row_groups(self, row_groups: Vec<usize>) -> Self {
Self {
fully_matched_row_groups: Some(row_groups),
..self
}
}

/// Provide a limit to the number of rows to be read
///
/// The limit will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`]
Expand Down Expand Up @@ -1188,6 +1217,8 @@ impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
metrics,
// Not used for the sync reader, see https://github.com/apache/arrow-rs/issues/8000
max_predicate_cache_size: _,
// Not used for the sync reader (single row group per reader)
fully_matched_row_groups: _,
} = self;

// Try to avoid allocate large buffer
Expand Down
2 changes: 2 additions & 0 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,7 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
offset,
metrics,
max_predicate_cache_size,
fully_matched_row_groups,
} = self;

// Ensure schema of ParquetRecordBatchStream respects projection, and does
Expand All @@ -522,6 +523,7 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
offset,
metrics,
max_predicate_cache_size,
fully_matched_row_groups,
}
.build()?;

Expand Down
2 changes: 2 additions & 0 deletions parquet/src/arrow/push_decoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ impl ParquetPushDecoderBuilder {
metrics,
row_selection_policy,
max_predicate_cache_size,
fully_matched_row_groups,
} = self;

// If no row groups were specified, read all of them
Expand All @@ -197,6 +198,7 @@ impl ParquetPushDecoderBuilder {
max_predicate_cache_size,
buffers,
row_selection_policy,
fully_matched_row_groups,
);

// Initialize the decoder with the configured options
Expand Down
25 changes: 25 additions & 0 deletions parquet/src/arrow/push_decoder/reader_builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use bytes::Bytes;
use data::DataRequest;
use filter::AdvanceResult;
use filter::FilterInfo;
use std::collections::HashSet;
use std::ops::Range;
use std::sync::{Arc, RwLock};

Expand Down Expand Up @@ -160,6 +161,10 @@ pub(crate) struct RowGroupReaderBuilder {
/// Strategy for materialising row selections
row_selection_policy: RowSelectionPolicy,

/// Row groups where ALL rows are known to match the filter predicate.
/// For these row groups, filter evaluation is skipped entirely.
fully_matched_row_groups: HashSet<usize>,

/// Current state of the decoder.
///
/// It is taken when processing, and must be put back before returning
Expand All @@ -185,6 +190,7 @@ impl RowGroupReaderBuilder {
max_predicate_cache_size: usize,
buffers: PushBuffers,
row_selection_policy: RowSelectionPolicy,
fully_matched_row_groups: Option<Vec<usize>>,
) -> Self {
Self {
batch_size,
Expand All @@ -197,6 +203,9 @@ impl RowGroupReaderBuilder {
metrics,
max_predicate_cache_size,
row_selection_policy,
fully_matched_row_groups: fully_matched_row_groups
.map(|v| v.into_iter().collect())
.unwrap_or_default(),
state: Some(RowGroupDecoderState::Finished),
buffers,
}
Expand Down Expand Up @@ -328,6 +337,22 @@ impl RowGroupReaderBuilder {
}));
};

// Skip filter for fully matched row groups: all rows are known
// to satisfy the predicate based on row group statistics, so
// evaluating the filter would be wasted work.
if self
.fully_matched_row_groups
.contains(&row_group_info.row_group_idx)
{
// Put the filter back for subsequent non-fully-matched row groups
self.filter = Some(filter);
return Ok(NextState::again(RowGroupDecoderState::StartData {
row_group_info,
column_chunks,
cache_info: None,
}));
}

// we have predicates to evaluate
let cache_projection =
self.compute_cache_projection(row_group_info.row_group_idx, &filter);
Expand Down
Loading