Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
338 changes: 206 additions & 132 deletions parquet/src/arrow/push_decoder/mod.rs

Large diffs are not rendered by default.

16 changes: 7 additions & 9 deletions parquet/src/arrow/push_decoder/reader_builder/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use crate::arrow::in_memory_row_group::{ColumnChunkData, FetchRanges, InMemoryRo
use crate::errors::ParquetError;
use crate::file::metadata::ParquetMetaData;
use crate::file::page_index::offset_index::OffsetIndexMetaData;
use crate::file::reader::ChunkReader;
use crate::util::push_buffers::PushBuffers;
use bytes::Bytes;
use std::ops::Range;
Expand Down Expand Up @@ -55,7 +54,7 @@ impl DataRequest {
}

/// Returns the chunks from the buffers that satisfy this request
fn get_chunks(&self, buffers: &PushBuffers) -> Result<Vec<Bytes>, ParquetError> {
fn get_chunks(&self, buffers: &mut PushBuffers) -> Result<Vec<Bytes>, ParquetError> {
self.ranges
.iter()
.map(|range| {
Expand All @@ -72,10 +71,12 @@ impl DataRequest {
.collect()
}

/// Create a new InMemoryRowGroup, and fill it with provided data
/// Create a new InMemoryRowGroup, and fill it with provided data.
///
/// Assumes that all needed data is present in the buffers
/// and clears any explicitly requested ranges
/// Assumes that all needed data is present in the buffers.
/// Does **not** release any buffers — the caller is responsible for
/// calling [`PushBuffers::release_range`] at the appropriate time
/// (typically after all column chunks for a row group are complete).
pub fn try_into_in_memory_row_group<'a>(
self,
row_group_idx: usize,
Expand All @@ -88,7 +89,7 @@ impl DataRequest {

let Self {
column_chunks,
ranges,
ranges: _,
page_start_offsets,
} = self;

Expand All @@ -105,9 +106,6 @@ impl DataRequest {

in_memory_row_group.fill_column_chunks(projection, page_start_offsets, chunks);

// Clear the ranges that were explicitly requested
buffers.clear_ranges(&ranges);

Ok(in_memory_row_group)
}
}
Expand Down
64 changes: 56 additions & 8 deletions parquet/src/arrow/push_decoder/reader_builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::errors::ParquetError;
use crate::file::metadata::ParquetMetaData;
use crate::file::page_index::offset_index::OffsetIndexMetaData;
use crate::util::push_buffers::PushBuffers;
use crate::util::retention::RetentionSet;
use bytes::Bytes;
use data::DataRequest;
use filter::AdvanceResult;
Expand Down Expand Up @@ -168,6 +169,12 @@ pub(crate) struct RowGroupReaderBuilder {

/// The underlying data store
buffers: PushBuffers,

/// Optional retention filter. When present, incoming `push_data` buffers
/// are trimmed to only keep byte ranges the decoder will eventually need.
/// The set is extended with ranges the decoder explicitly requests via
/// `NeedsData`, so the filter only discards truly speculative prefetch.
retention: Option<RetentionSet>,
}

impl RowGroupReaderBuilder {
Expand All @@ -185,6 +192,7 @@ impl RowGroupReaderBuilder {
max_predicate_cache_size: usize,
buffers: PushBuffers,
row_selection_policy: RowSelectionPolicy,
retention: Option<RetentionSet>,
) -> Self {
Self {
batch_size,
Expand All @@ -199,22 +207,45 @@ impl RowGroupReaderBuilder {
row_selection_policy,
state: Some(RowGroupDecoderState::Finished),
buffers,
retention,
}
}

/// Push new data buffers that can be used to satisfy pending requests
/// Push new data buffers that can be used to satisfy pending requests.
///
/// When a [`RetentionSet`] is configured, incoming buffers are filtered so
/// that only byte ranges the decoder will eventually need are stored.
/// Portions outside the retention set are silently discarded.
pub fn push_data(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) {
self.buffers.push_ranges(ranges, buffers);
let (ranges, buffers) = match &self.retention {
Some(retention) => retention.filter(ranges, buffers),
None => (ranges, buffers),
};
if !ranges.is_empty() {
self.buffers.push_ranges(ranges, buffers);
}
}

/// Returns the total number of buffered bytes available
pub fn buffered_bytes(&self) -> u64 {
self.buffers.buffered_bytes()
}

/// Clear any staged ranges currently buffered for future decode work.
pub fn clear_all_ranges(&mut self) {
self.buffers.clear_all_ranges();
/// Release all staged ranges currently buffered for future decode work.
Comment thread
HippoBaro marked this conversation as resolved.
pub(crate) fn release_all(&mut self) {
self.buffers.release_all();
}

/// Release all column chunk byte ranges for a given row group.
///
/// This is safe to call even if some (or all) column chunks were never
/// fetched — [`PushBuffers::release_range`] is a no-op for ranges that
/// have no overlapping entry.
fn release_row_group(&mut self, row_group_idx: usize) {
for col in self.metadata.row_group(row_group_idx).columns() {
let (start, len) = col.byte_range();
self.buffers.release_range(start..start + len);
}
}

/// take the current state, leaving None in its place.
Expand Down Expand Up @@ -280,6 +311,13 @@ impl RowGroupReaderBuilder {
} => {
// put back the next state
self.state = Some(next_state);
// Extend the retention set with explicitly requested
// ranges so the IO layer's response is not filtered out.
if let DecodeResult::NeedsData(ref ranges) = result {
if let Some(retention) = &mut self.retention {
retention.extend(ranges);
}
}
return Ok(result);
}
// completed one internal state, maybe can proceed further
Expand Down Expand Up @@ -361,7 +399,9 @@ impl RowGroupReaderBuilder {

// If nothing is selected, we are done with this row group
if !plan_builder.selects_any() {
// ruled out entire row group
// ruled out entire row group — release any column chunks
// that were fetched for predicate evaluation.
self.release_row_group(row_group_idx);
self.filter = Some(filter_info.into_filter());
return Ok(NextState::result(
RowGroupDecoderState::Finished,
Expand Down Expand Up @@ -508,7 +548,9 @@ impl RowGroupReaderBuilder {
let rows_before = plan_builder.num_rows_selected().unwrap_or(row_count);

if rows_before == 0 {
// ruled out entire row group
// ruled out entire row group — release any column chunks
// that were fetched for predicate evaluation.
self.release_row_group(row_group_idx);
return Ok(NextState::result(
RowGroupDecoderState::Finished,
DecodeResult::Finished,
Expand All @@ -532,7 +574,9 @@ impl RowGroupReaderBuilder {
}

if rows_after == 0 {
// no rows left after applying limit/offset
// no rows left after applying limit/offset — release any
// column chunks that were fetched for predicate evaluation.
self.release_row_group(row_group_idx);
return Ok(NextState::result(
RowGroupDecoderState::Finished,
DecodeResult::Finished,
Expand Down Expand Up @@ -625,6 +669,10 @@ impl RowGroupReaderBuilder {
.build_array_reader(self.fields.as_deref(), &self.projection)
}?;

// Release column chunks now that all borrows from
// self.metadata / self.buffers are done.
self.release_row_group(row_group_idx);

let reader = ParquetRecordBatchReader::new(array_reader, plan);
NextState::result(RowGroupDecoderState::Finished, DecodeResult::Data(reader))
}
Expand Down
11 changes: 5 additions & 6 deletions parquet/src/arrow/push_decoder/remaining.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,6 @@ impl RemainingRowGroups {
self.row_group_reader_builder.buffered_bytes()
}

/// Clear any staged ranges currently buffered for future decode work
pub fn clear_all_ranges(&mut self) {
self.row_group_reader_builder.clear_all_ranges();
}

/// returns [`ParquetRecordBatchReader`] suitable for reading the next
/// group of rows from the Parquet data, or the list of data ranges still
/// needed to proceed
Expand Down Expand Up @@ -103,7 +98,11 @@ impl RemainingRowGroups {

// No current reader, proceed to the next row group if any
let row_group_idx = match self.row_groups.pop_front() {
None => return Ok(DecodeResult::Finished),
None => {
// We are done with the file, release all remaining buffers.
self.row_group_reader_builder.release_all();
return Ok(DecodeResult::Finished);
}
Some(idx) => idx,
};

Expand Down
20 changes: 7 additions & 13 deletions parquet/src/file/metadata/push_decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use crate::file::FOOTER_SIZE;
use crate::file::metadata::parser::{MetadataParser, parse_column_index, parse_offset_index};
use crate::file::metadata::{FooterTail, PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions};
use crate::file::page_index::index_reader::acc_range;
use crate::file::reader::ChunkReader;
use bytes::Bytes;
use std::ops::Range;
use std::sync::Arc;
Expand Down Expand Up @@ -358,11 +357,6 @@ impl ParquetMetaDataPushDecoder {
Ok(())
}

/// Clear any staged byte ranges currently buffered for future decode work.
pub fn clear_all_ranges(&mut self) {
self.buffers.clear_all_ranges();
}

/// Try to decode the metadata from the pushed data, returning the
/// decoded metadata or an error if not enough data is available.
pub fn try_decode(&mut self) -> Result<DecodeResult<ParquetMetaData>> {
Expand Down Expand Up @@ -397,10 +391,10 @@ impl ParquetMetaDataPushDecoder {
return Ok(needs_range(metadata_range));
}

let metadata = self.metadata_parser.decode_metadata(
&self.get_bytes(&metadata_range)?,
footer_tail.is_encrypted_footer(),
)?;
let metadata_bytes = self.get_bytes(&metadata_range)?;
let metadata = self
.metadata_parser
.decode_metadata(&metadata_bytes, footer_tail.is_encrypted_footer())?;
// Note: ReadingPageIndex first checks if page indexes are needed
// and is a no-op if not
self.state = DecodeState::ReadingPageIndex(Box::new(metadata));
Expand Down Expand Up @@ -445,7 +439,7 @@ impl ParquetMetaDataPushDecoder {
}

/// Returns the bytes for the given range from the internal buffer
fn get_bytes(&self, range: &Range<u64>) -> Result<Bytes> {
fn get_bytes(&mut self, range: &Range<u64>) -> Result<Bytes> {
let start = range.start;
let raw_len = range.end - range.start;
let len: usize = raw_len.try_into().map_err(|_| {
Expand Down Expand Up @@ -579,7 +573,7 @@ mod tests {
}

#[test]
fn test_metadata_decoder_clear_all_ranges() {
fn test_metadata_decoder_release_all() {
let file_len = test_file_len();
let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();

Expand All @@ -588,7 +582,7 @@ mod tests {
.unwrap();
assert_eq!(metadata_decoder.buffers.buffered_bytes(), test_file_len());

metadata_decoder.clear_all_ranges();
metadata_decoder.buffers.release_all();
assert_eq!(metadata_decoder.buffers.buffered_bytes(), 0);

let ranges = expect_needs_data(metadata_decoder.try_decode());
Expand Down
4 changes: 3 additions & 1 deletion parquet/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ pub mod bit_util;
mod bit_pack;
pub(crate) mod interner;

pub mod push_buffers;
pub(crate) mod push_buffers;
#[cfg(feature = "arrow")]
pub(crate) mod retention;
#[cfg(any(test, feature = "test_common"))]
pub(crate) mod test_common;
pub mod utf8;
Expand Down
Loading
Loading