diff --git a/parquet/src/arrow/push_decoder/mod.rs b/parquet/src/arrow/push_decoder/mod.rs index 24384471a4ec..c4d1c42a4b6a 100644 --- a/parquet/src/arrow/push_decoder/mod.rs +++ b/parquet/src/arrow/push_decoder/mod.rs @@ -28,6 +28,7 @@ use crate::arrow::arrow_reader::{ use crate::errors::ParquetError; use crate::file::metadata::ParquetMetaData; use crate::util::push_buffers::PushBuffers; +use crate::util::retention::RetentionSet; use arrow_array::RecordBatch; use bytes::Bytes; use reader_builder::RowGroupReaderBuilder; @@ -185,6 +186,7 @@ impl ParquetPushDecoderBuilder { // Prepare to build RowGroup readers let file_len = 0; // not used in push decoder let buffers = PushBuffers::new(file_len); + let retention = RetentionSet::from_row_groups(&parquet_metadata, &row_groups); let row_group_reader_builder = RowGroupReaderBuilder::new( batch_size, projection, @@ -197,6 +199,7 @@ impl ParquetPushDecoderBuilder { max_predicate_cache_size, buffers, row_selection_policy, + Some(retention), ); // Initialize the decoder with the configured options @@ -326,22 +329,25 @@ impl ParquetPushDecoder { Ok(decode_result) } - /// Push data into the decoder for processing + /// Push data into the decoder for processing. /// /// This is a convenience wrapper around [`Self::push_ranges`] for pushing a - /// single range of data. - /// - /// Note this can be the entire file or just a part of it. If it is part of the file, - /// the ranges should correspond to the data ranges requested by the decoder. - /// - /// See example in [`ParquetPushDecoderBuilder`] + /// single range of data. See [`Self::push_ranges`] for details. pub fn push_range(&mut self, range: Range, data: Bytes) -> Result<(), ParquetError> { self.push_ranges(vec![range], vec![data]) } - /// Push data into the decoder for processing + /// Push data into the decoder for processing. /// - /// This should correspond to the data ranges requested by the decoder + /// Each `(range, data)` pair associates a byte range in the Parquet file + /// with its contents. The pushed buffers do not need to align with the + /// ranges requested by [`DecodeResult::NeedsData`]: they may be smaller + /// (the decoder stitches adjacent buffers), larger (e.g. coalesced + /// fetches), or even cover offsets not yet requested (prefetch). + /// + /// The only requirement is that, by the time [`Self::try_decode`] is + /// called, the union of all pushed ranges must cover every byte the + /// decoder requeted for the current decode step. pub fn push_ranges( &mut self, ranges: Vec>, @@ -365,15 +371,6 @@ impl ParquetPushDecoder { pub fn buffered_bytes(&self) -> u64 { self.state.buffered_bytes() } - - /// Clear any staged byte ranges currently buffered for future decode work. - /// - /// This clears byte ranges still owned by the decoder's internal - /// `PushBuffers`. It does not affect any data that has already been handed - /// off to an active [`ParquetRecordBatchReader`]. - pub fn clear_all_ranges(&mut self) { - self.state.clear_all_ranges(); - } } /// Internal state machine for the [`ParquetPushDecoder`] @@ -582,20 +579,6 @@ impl ParquetDecoderState { ParquetDecoderState::Finished => 0, } } - - /// Clear any staged ranges currently buffered in the decoder. - fn clear_all_ranges(&mut self) { - match self { - ParquetDecoderState::ReadingRowGroup { - remaining_row_groups, - } => remaining_row_groups.clear_all_ranges(), - ParquetDecoderState::DecodingRowGroup { - record_batch_reader: _, - remaining_row_groups, - } => remaining_row_groups.clear_all_ranges(), - ParquetDecoderState::Finished => {} - } - } } #[cfg(test)] @@ -612,12 +595,36 @@ mod test { use arrow_array::cast::AsArray; use arrow_array::types::Int64Type; use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringViewArray}; - use arrow_select::concat::concat_batches; use bytes::Bytes; use std::fmt::Debug; use std::ops::Range; use std::sync::{Arc, LazyLock}; + /// Row group orderings to exercise in order-agnostic tests. + /// Each entry is (row_groups, expected_batches_in_decode_order). + /// + /// The test file has two row groups (200 rows each). We exercise + /// every useful ordering: forward, reverse, and single-RG to + /// confirm the decoder is fully order-independent. + fn all_orderings() -> Vec<(Vec, Vec)> { + vec![ + // Forward: RG0 then RG1 + ( + vec![0, 1], + vec![TEST_BATCH.slice(0, 200), TEST_BATCH.slice(200, 200)], + ), + // Reverse: RG1 then RG0 + ( + vec![1, 0], + vec![TEST_BATCH.slice(200, 200), TEST_BATCH.slice(0, 200)], + ), + // Single: only RG0 + (vec![0], vec![TEST_BATCH.slice(0, 200)]), + // Single: only RG1 + (vec![1], vec![TEST_BATCH.slice(200, 200)]), + ] + } + /// Test decoder struct size (as they are copied around on each transition, they /// should not grow too large) #[test] @@ -629,112 +636,58 @@ mod test { /// available in memory #[test] fn test_decoder_all_data() { - let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) - .unwrap() - .build() - .unwrap(); - - decoder - .push_range(test_file_range(), TEST_FILE_DATA.clone()) - .unwrap(); - - let results = vec![ - // first row group should be decoded without needing more data - expect_data(decoder.try_decode()), - // second row group should be decoded without needing more data - expect_data(decoder.try_decode()), - ]; - expect_finished(decoder.try_decode()); - - let all_output = concat_batches(&TEST_BATCH.schema(), &results).unwrap(); - // Check that the output matches the input batch - assert_eq!(all_output, *TEST_BATCH); + for (row_groups, expected_batches) in all_orderings() { + let mut decoder = + ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .with_row_groups(row_groups) + .build() + .unwrap(); + + decoder + .push_range(test_file_range(), TEST_FILE_DATA.clone()) + .unwrap(); + + let results: Vec<_> = expected_batches + .iter() + .map(|_| expect_data(decoder.try_decode())) + .collect(); + expect_finished(decoder.try_decode()); + + for (result, expected) in results.iter().zip(&expected_batches) { + assert_eq!(result, expected); + } + } } /// Decode the entire file incrementally, simulating a scenario where data is /// fetched as needed #[test] fn test_decoder_incremental() { - let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) - .unwrap() - .build() - .unwrap(); - - let mut results = vec![]; - - // First row group, expect a single request - let ranges = expect_needs_data(decoder.try_decode()); - let num_bytes_requested: u64 = ranges.iter().map(|r| r.end - r.start).sum(); - push_ranges_to_decoder(&mut decoder, ranges); - // The decoder should currently only store the data it needs to decode the first row group - assert_eq!(decoder.buffered_bytes(), num_bytes_requested); - results.push(expect_data(decoder.try_decode())); - // the decoder should have consumed the data for the first row group and freed it - assert_eq!(decoder.buffered_bytes(), 0); - - // Second row group, - let ranges = expect_needs_data(decoder.try_decode()); - let num_bytes_requested: u64 = ranges.iter().map(|r| r.end - r.start).sum(); - push_ranges_to_decoder(&mut decoder, ranges); - // The decoder should currently only store the data it needs to decode the second row group - assert_eq!(decoder.buffered_bytes(), num_bytes_requested); - results.push(expect_data(decoder.try_decode())); - // the decoder should have consumed the data for the second row group and freed it - assert_eq!(decoder.buffered_bytes(), 0); - expect_finished(decoder.try_decode()); - - // Check that the output matches the input batch - let all_output = concat_batches(&TEST_BATCH.schema(), &results).unwrap(); - assert_eq!(all_output, *TEST_BATCH); - } - - /// Releasing staged ranges should free speculative buffers without affecting - /// the active row group reader. - #[test] - fn test_decoder_clear_all_ranges() { - let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) - .unwrap() - .with_batch_size(100) - .build() - .unwrap(); - - decoder - .push_range(test_file_range(), TEST_FILE_DATA.clone()) - .unwrap(); - assert_eq!(decoder.buffered_bytes(), test_file_len()); - - // The current row group reader is built from the prefetched bytes, but - // the speculative full-file range remains staged in the decoder. - let batch1 = expect_data(decoder.try_decode()); - assert_eq!(batch1, TEST_BATCH.slice(0, 100)); - assert_eq!(decoder.buffered_bytes(), test_file_len()); - - // All of the buffer is released - decoder.clear_all_ranges(); - assert_eq!(decoder.buffered_bytes(), 0); - - // The active reader still owns the current row group's bytes, so it can - // continue decoding without consulting PushBuffers. - let batch2 = expect_data(decoder.try_decode()); - assert_eq!(batch2, TEST_BATCH.slice(100, 100)); - assert_eq!(decoder.buffered_bytes(), 0); - - // Moving to the next row group now requires the decoder to ask for data - // again because the staged speculative ranges were released. - let ranges = expect_needs_data(decoder.try_decode()); - let num_bytes_requested: u64 = ranges.iter().map(|r| r.end - r.start).sum(); - push_ranges_to_decoder(&mut decoder, ranges); - assert_eq!(decoder.buffered_bytes(), num_bytes_requested); - - let batch3 = expect_data(decoder.try_decode()); - assert_eq!(batch3, TEST_BATCH.slice(200, 100)); - assert_eq!(decoder.buffered_bytes(), 0); - - let batch4 = expect_data(decoder.try_decode()); - assert_eq!(batch4, TEST_BATCH.slice(300, 100)); - assert_eq!(decoder.buffered_bytes(), 0); + for (row_groups, expected_batches) in all_orderings() { + let mut decoder = + ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .with_row_groups(row_groups) + .build() + .unwrap(); + + let mut results = vec![]; + + for _ in &expected_batches { + let ranges = expect_needs_data(decoder.try_decode()); + let num_bytes_requested: u64 = ranges.iter().map(|r| r.end - r.start).sum(); + push_ranges_to_decoder(&mut decoder, ranges); + assert_eq!(decoder.buffered_bytes(), num_bytes_requested); + results.push(expect_data(decoder.try_decode())); + assert_eq!(decoder.buffered_bytes(), 0); + } + expect_finished(decoder.try_decode()); - expect_finished(decoder.try_decode()); + for (result, expected) in results.iter().zip(&expected_batches) { + assert_eq!(result, expected); + } + } } /// Decode the entire file incrementally, simulating partial reads @@ -1167,6 +1120,127 @@ mod test { expect_finished(decoder.try_decode()); } + /// Decode the file pushed as fixed-size streaming parts, simulating a + /// single GET request that yields part-sized buffers. Part boundaries are + /// intentionally misaligned with column chunk / page boundaries. + #[test] + fn test_decoder_streaming_parts() { + let part_size = 512usize; // misaligned with column chunks + for (row_groups, expected_batches) in all_orderings() { + let mut decoder = + ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .with_row_groups(row_groups) + .build() + .unwrap(); + + // Push the entire file as fixed-size parts. + let file_len = TEST_FILE_DATA.len(); + let mut offset = 0usize; + while offset < file_len { + let end = (offset + part_size).min(file_len); + let range = (offset as u64)..(end as u64); + let data = TEST_FILE_DATA.slice(offset..end); + decoder.push_range(range, data).unwrap(); + offset = end; + } + + // Decode all row groups — stitching should handle cross-part reads. + let results: Vec<_> = expected_batches + .iter() + .map(|_| expect_data(decoder.try_decode())) + .collect(); + expect_finished(decoder.try_decode()); + + for (result, expected) in results.iter().zip(&expected_batches) { + assert_eq!(result, expected); + } + } + } + + /// Push the entire file, decode both row groups, and verify that the + /// internal automatic release frees buffers after each row group. + #[test] + fn test_decoder_automatic_release() { + let metadata = test_file_parquet_metadata(); + for (row_groups, expected_batches) in all_orderings() { + let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(metadata.clone()) + .unwrap() + .with_row_groups(row_groups.clone()) + .build() + .unwrap(); + + // Total retained bytes = sum of column chunk sizes for queued RGs + // (the retention filter discards footer and non-queued data). + let retained_bytes: u64 = row_groups + .iter() + .flat_map(|&rg| metadata.row_group(rg).columns().iter()) + .map(|c| c.byte_range().1) + .sum(); + + decoder + .push_range(test_file_range(), TEST_FILE_DATA.clone()) + .unwrap(); + assert_eq!(decoder.buffered_bytes(), retained_bytes); + + // Decode each row group and verify incremental release. + let mut released: u64 = 0; + for (i, (expected, &rg_idx)) in + expected_batches.iter().zip(row_groups.iter()).enumerate() + { + let batch = expect_data(decoder.try_decode()); + assert_eq!(&batch, expected, "mismatch at row group {i}"); + + let rg_size: u64 = metadata + .row_group(rg_idx) + .columns() + .iter() + .map(|c| c.byte_range().1) + .sum(); + released += rg_size; + assert_eq!(decoder.buffered_bytes(), retained_bytes - released); + } + expect_finished(decoder.try_decode()); + } + } + + /// Push the entire file to a decoder that only reads one row group. + /// Verify that speculative prefetch data for the other row group is + /// never admitted, so buffered_bytes is zero after decoding. + #[test] + fn test_decoder_filters_speculative_prefetch() { + let metadata = test_file_parquet_metadata(); + for rg_idx in 0..metadata.num_row_groups() { + let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(metadata.clone()) + .unwrap() + .with_row_groups(vec![rg_idx]) + .build() + .unwrap(); + + // IO layer pushes the entire file (simulating aggressive prefetch). + decoder + .push_range(test_file_range(), TEST_FILE_DATA.clone()) + .unwrap(); + + // Only the selected row group's column chunks should be retained. + let rg_size: u64 = metadata + .row_group(rg_idx) + .columns() + .iter() + .map(|c| c.byte_range().1) + .sum(); + assert_eq!(decoder.buffered_bytes(), rg_size); + + // Decode successfully. + let batch = expect_data(decoder.try_decode()); + assert_eq!(batch.num_rows(), 200); + expect_finished(decoder.try_decode()); + + // All data released. + assert_eq!(decoder.buffered_bytes(), 0); + } + } + /// Returns a batch with 400 rows, with 3 columns: "a", "b", "c" /// /// Note c is a different types (so the data page sizes will be different) diff --git a/parquet/src/arrow/push_decoder/reader_builder/data.rs b/parquet/src/arrow/push_decoder/reader_builder/data.rs index 6fbc2090b06e..77f750fa9790 100644 --- a/parquet/src/arrow/push_decoder/reader_builder/data.rs +++ b/parquet/src/arrow/push_decoder/reader_builder/data.rs @@ -23,7 +23,6 @@ use crate::arrow::in_memory_row_group::{ColumnChunkData, FetchRanges, InMemoryRo use crate::errors::ParquetError; use crate::file::metadata::ParquetMetaData; use crate::file::page_index::offset_index::OffsetIndexMetaData; -use crate::file::reader::ChunkReader; use crate::util::push_buffers::PushBuffers; use bytes::Bytes; use std::ops::Range; @@ -55,7 +54,7 @@ impl DataRequest { } /// Returns the chunks from the buffers that satisfy this request - fn get_chunks(&self, buffers: &PushBuffers) -> Result, ParquetError> { + fn get_chunks(&self, buffers: &mut PushBuffers) -> Result, ParquetError> { self.ranges .iter() .map(|range| { @@ -72,10 +71,12 @@ impl DataRequest { .collect() } - /// Create a new InMemoryRowGroup, and fill it with provided data + /// Create a new InMemoryRowGroup, and fill it with provided data. /// - /// Assumes that all needed data is present in the buffers - /// and clears any explicitly requested ranges + /// Assumes that all needed data is present in the buffers. + /// Does **not** release any buffers — the caller is responsible for + /// calling [`PushBuffers::release_range`] at the appropriate time + /// (typically after all column chunks for a row group are complete). pub fn try_into_in_memory_row_group<'a>( self, row_group_idx: usize, @@ -88,7 +89,7 @@ impl DataRequest { let Self { column_chunks, - ranges, + ranges: _, page_start_offsets, } = self; @@ -105,9 +106,6 @@ impl DataRequest { in_memory_row_group.fill_column_chunks(projection, page_start_offsets, chunks); - // Clear the ranges that were explicitly requested - buffers.clear_ranges(&ranges); - Ok(in_memory_row_group) } } diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs b/parquet/src/arrow/push_decoder/reader_builder/mod.rs index 922d8070c064..ba9dd44d244c 100644 --- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs +++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs @@ -34,6 +34,7 @@ use crate::errors::ParquetError; use crate::file::metadata::ParquetMetaData; use crate::file::page_index::offset_index::OffsetIndexMetaData; use crate::util::push_buffers::PushBuffers; +use crate::util::retention::RetentionSet; use bytes::Bytes; use data::DataRequest; use filter::AdvanceResult; @@ -168,6 +169,12 @@ pub(crate) struct RowGroupReaderBuilder { /// The underlying data store buffers: PushBuffers, + + /// Optional retention filter. When present, incoming `push_data` buffers + /// are trimmed to only keep byte ranges the decoder will eventually need. + /// The set is extended with ranges the decoder explicitly requests via + /// `NeedsData`, so the filter only discards truly speculative prefetch. + retention: Option, } impl RowGroupReaderBuilder { @@ -185,6 +192,7 @@ impl RowGroupReaderBuilder { max_predicate_cache_size: usize, buffers: PushBuffers, row_selection_policy: RowSelectionPolicy, + retention: Option, ) -> Self { Self { batch_size, @@ -199,12 +207,23 @@ impl RowGroupReaderBuilder { row_selection_policy, state: Some(RowGroupDecoderState::Finished), buffers, + retention, } } - /// Push new data buffers that can be used to satisfy pending requests + /// Push new data buffers that can be used to satisfy pending requests. + /// + /// When a [`RetentionSet`] is configured, incoming buffers are filtered so + /// that only byte ranges the decoder will eventually need are stored. + /// Portions outside the retention set are silently discarded. pub fn push_data(&mut self, ranges: Vec>, buffers: Vec) { - self.buffers.push_ranges(ranges, buffers); + let (ranges, buffers) = match &self.retention { + Some(retention) => retention.filter(ranges, buffers), + None => (ranges, buffers), + }; + if !ranges.is_empty() { + self.buffers.push_ranges(ranges, buffers); + } } /// Returns the total number of buffered bytes available @@ -212,9 +231,21 @@ impl RowGroupReaderBuilder { self.buffers.buffered_bytes() } - /// Clear any staged ranges currently buffered for future decode work. - pub fn clear_all_ranges(&mut self) { - self.buffers.clear_all_ranges(); + /// Release all staged ranges currently buffered for future decode work. + pub(crate) fn release_all(&mut self) { + self.buffers.release_all(); + } + + /// Release all column chunk byte ranges for a given row group. + /// + /// This is safe to call even if some (or all) column chunks were never + /// fetched — [`PushBuffers::release_range`] is a no-op for ranges that + /// have no overlapping entry. + fn release_row_group(&mut self, row_group_idx: usize) { + for col in self.metadata.row_group(row_group_idx).columns() { + let (start, len) = col.byte_range(); + self.buffers.release_range(start..start + len); + } } /// take the current state, leaving None in its place. @@ -280,6 +311,13 @@ impl RowGroupReaderBuilder { } => { // put back the next state self.state = Some(next_state); + // Extend the retention set with explicitly requested + // ranges so the IO layer's response is not filtered out. + if let DecodeResult::NeedsData(ref ranges) = result { + if let Some(retention) = &mut self.retention { + retention.extend(ranges); + } + } return Ok(result); } // completed one internal state, maybe can proceed further @@ -361,7 +399,9 @@ impl RowGroupReaderBuilder { // If nothing is selected, we are done with this row group if !plan_builder.selects_any() { - // ruled out entire row group + // ruled out entire row group — release any column chunks + // that were fetched for predicate evaluation. + self.release_row_group(row_group_idx); self.filter = Some(filter_info.into_filter()); return Ok(NextState::result( RowGroupDecoderState::Finished, @@ -508,7 +548,9 @@ impl RowGroupReaderBuilder { let rows_before = plan_builder.num_rows_selected().unwrap_or(row_count); if rows_before == 0 { - // ruled out entire row group + // ruled out entire row group — release any column chunks + // that were fetched for predicate evaluation. + self.release_row_group(row_group_idx); return Ok(NextState::result( RowGroupDecoderState::Finished, DecodeResult::Finished, @@ -532,7 +574,9 @@ impl RowGroupReaderBuilder { } if rows_after == 0 { - // no rows left after applying limit/offset + // no rows left after applying limit/offset — release any + // column chunks that were fetched for predicate evaluation. + self.release_row_group(row_group_idx); return Ok(NextState::result( RowGroupDecoderState::Finished, DecodeResult::Finished, @@ -625,6 +669,10 @@ impl RowGroupReaderBuilder { .build_array_reader(self.fields.as_deref(), &self.projection) }?; + // Release column chunks now that all borrows from + // self.metadata / self.buffers are done. + self.release_row_group(row_group_idx); + let reader = ParquetRecordBatchReader::new(array_reader, plan); NextState::result(RowGroupDecoderState::Finished, DecodeResult::Data(reader)) } diff --git a/parquet/src/arrow/push_decoder/remaining.rs b/parquet/src/arrow/push_decoder/remaining.rs index 2986ca0da8d8..b18fc96c45c6 100644 --- a/parquet/src/arrow/push_decoder/remaining.rs +++ b/parquet/src/arrow/push_decoder/remaining.rs @@ -70,11 +70,6 @@ impl RemainingRowGroups { self.row_group_reader_builder.buffered_bytes() } - /// Clear any staged ranges currently buffered for future decode work - pub fn clear_all_ranges(&mut self) { - self.row_group_reader_builder.clear_all_ranges(); - } - /// returns [`ParquetRecordBatchReader`] suitable for reading the next /// group of rows from the Parquet data, or the list of data ranges still /// needed to proceed @@ -103,7 +98,11 @@ impl RemainingRowGroups { // No current reader, proceed to the next row group if any let row_group_idx = match self.row_groups.pop_front() { - None => return Ok(DecodeResult::Finished), + None => { + // We are done with the file, release all remaining buffers. + self.row_group_reader_builder.release_all(); + return Ok(DecodeResult::Finished); + } Some(idx) => idx, }; diff --git a/parquet/src/file/metadata/push_decoder.rs b/parquet/src/file/metadata/push_decoder.rs index 7e4beb5ad9c2..2ad1e0c232a8 100644 --- a/parquet/src/file/metadata/push_decoder.rs +++ b/parquet/src/file/metadata/push_decoder.rs @@ -23,7 +23,6 @@ use crate::file::FOOTER_SIZE; use crate::file::metadata::parser::{MetadataParser, parse_column_index, parse_offset_index}; use crate::file::metadata::{FooterTail, PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions}; use crate::file::page_index::index_reader::acc_range; -use crate::file::reader::ChunkReader; use bytes::Bytes; use std::ops::Range; use std::sync::Arc; @@ -358,11 +357,6 @@ impl ParquetMetaDataPushDecoder { Ok(()) } - /// Clear any staged byte ranges currently buffered for future decode work. - pub fn clear_all_ranges(&mut self) { - self.buffers.clear_all_ranges(); - } - /// Try to decode the metadata from the pushed data, returning the /// decoded metadata or an error if not enough data is available. pub fn try_decode(&mut self) -> Result> { @@ -397,10 +391,10 @@ impl ParquetMetaDataPushDecoder { return Ok(needs_range(metadata_range)); } - let metadata = self.metadata_parser.decode_metadata( - &self.get_bytes(&metadata_range)?, - footer_tail.is_encrypted_footer(), - )?; + let metadata_bytes = self.get_bytes(&metadata_range)?; + let metadata = self + .metadata_parser + .decode_metadata(&metadata_bytes, footer_tail.is_encrypted_footer())?; // Note: ReadingPageIndex first checks if page indexes are needed // and is a no-op if not self.state = DecodeState::ReadingPageIndex(Box::new(metadata)); @@ -445,7 +439,7 @@ impl ParquetMetaDataPushDecoder { } /// Returns the bytes for the given range from the internal buffer - fn get_bytes(&self, range: &Range) -> Result { + fn get_bytes(&mut self, range: &Range) -> Result { let start = range.start; let raw_len = range.end - range.start; let len: usize = raw_len.try_into().map_err(|_| { @@ -579,7 +573,7 @@ mod tests { } #[test] - fn test_metadata_decoder_clear_all_ranges() { + fn test_metadata_decoder_release_all() { let file_len = test_file_len(); let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap(); @@ -588,7 +582,7 @@ mod tests { .unwrap(); assert_eq!(metadata_decoder.buffers.buffered_bytes(), test_file_len()); - metadata_decoder.clear_all_ranges(); + metadata_decoder.buffers.release_all(); assert_eq!(metadata_decoder.buffers.buffered_bytes(), 0); let ranges = expect_needs_data(metadata_decoder.try_decode()); diff --git a/parquet/src/util/mod.rs b/parquet/src/util/mod.rs index 145cdd693e59..50eeb8cf4f4f 100644 --- a/parquet/src/util/mod.rs +++ b/parquet/src/util/mod.rs @@ -20,7 +20,9 @@ pub mod bit_util; mod bit_pack; pub(crate) mod interner; -pub mod push_buffers; +pub(crate) mod push_buffers; +#[cfg(feature = "arrow")] +pub(crate) mod retention; #[cfg(any(test, feature = "test_common"))] pub(crate) mod test_common; pub mod utf8; diff --git a/parquet/src/util/push_buffers.rs b/parquet/src/util/push_buffers.rs index b8225ab3a1db..4d0a6bf535d0 100644 --- a/parquet/src/util/push_buffers.rs +++ b/parquet/src/util/push_buffers.rs @@ -16,38 +16,75 @@ // under the License. use crate::errors::ParquetError; -use crate::file::reader::{ChunkReader, Length}; -use bytes::Bytes; +use crate::file::reader::Length; +use bytes::{Bytes, BytesMut}; +use std::collections::BTreeMap; use std::fmt::Display; use std::ops::Range; +/// Value stored in the [`PushBuffers`] B-tree for each IO buffer. +/// The key is the buffer's start offset. +#[derive(Debug, Clone)] +struct BufferValue { + /// End offset (exclusive) of the byte range this buffer covers. + end: u64, + /// The raw data. + data: Bytes, + /// Number of bytes within this buffer that have not yet been released. + /// Initialized to `data.len()` and decremented by [`PushBuffers::release_range`]. + /// When this reaches zero the entry is dropped. + live_bytes: u64, +} + /// Holds multiple buffers of data /// /// This is the in-memory buffer for the ParquetDecoder and ParquetMetadataDecoders /// /// Features: -/// 1. Zero copy -/// 2. non contiguous ranges of bytes +/// 1. Non-contiguous ranges of bytes +/// 2. Stitching: reads that span multiple contiguous physical buffers are +/// resolved transparently. When a single buffer covers the request, the +/// result is zero-copy ([`Bytes::slice`]). When multiple buffers must be +/// stitched, the data is copied into a new allocation. +/// +/// # No Coalescing /// -/// # Non Coalescing +/// This buffer does not coalesce (merging adjacent ranges of bytes into a single +/// range). The IO layer is free to push arbitrarily-sized buffers; they will be +/// stitched on read if needed. Coalescing is left to the IO layer because it +/// would require an extra copy here, and because the optimal coalescing +/// strategy depends on the workload and storage medium (e.g. spinning disk, +/// NVMe, blob storage,) context that only the IO layer has. /// -/// This buffer does not coalesce (merging adjacent ranges of bytes into a -/// single range). Coalescing at this level would require copying the data but -/// the caller may already have the needed data in a single buffer which would -/// require no copying. +/// # No Speculative Prefetching /// -/// Thus, the implementation defers to the caller to coalesce subsequent requests -/// if desired. +/// This layer does not prefetch data ahead of what the decoder requests. +/// The IO layer is free to push buffers at offsets not yet requested by +/// the decoder — they will be held and stitched into future reads — but +/// the decision of *whether* to prefetch, and *how much*, is left to the +/// IO layer. Like coalescing, prefetching strategy depends on the storage +/// medium and access pattern. +/// +/// # Release model +/// +/// Callers release byte ranges via [`release_range`](Self::release_range). +/// Each buffer tracks a `live_bytes` counter that is decremented by the +/// overlap between the released range and the buffer's range. When the +/// counter reaches zero the buffer is dropped. +/// +/// **Caller invariant:** each byte offset should be released at most once. +/// Violating this causes double-counting, which may drop a buffer +/// prematurely. This is safe but wasteful: the decoder's `NeedsData` retry loop +/// will re-request the data. #[derive(Debug, Clone)] pub(crate) struct PushBuffers { /// the virtual "offset" of this buffers (added to any request) offset: u64, /// The total length of the file being decoded file_len: u64, - /// The ranges of data that are available for decoding (not adjusted for offset) - ranges: Vec>, - /// The buffers of data that can be used to decode the Parquet file - buffers: Vec, + /// IO buffers keyed by their start offset. Each value stores the end + /// offset, the data, and a live-byte counter for release tracking. + entries: BTreeMap, } impl Display for PushBuffers { @@ -58,15 +95,16 @@ impl Display for PushBuffers { self.offset, self.file_len )?; writeln!(f, "Available Ranges (w/ offset):")?; - for range in &self.ranges { + for (&start, entry) in &self.entries { writeln!( f, - " {}..{} ({}..{}): {} bytes", - range.start, - range.end, - range.start + self.offset, - range.end + self.offset, - range.end - range.start + " {}..{} ({}..{}): {} bytes ({} live)", + start, + entry.end, + start + self.offset, + entry.end + self.offset, + entry.end - start, + entry.live_bytes, )?; } @@ -75,17 +113,21 @@ impl Display for PushBuffers { } impl PushBuffers { - /// Create a new Buffers instance with the given file length + /// Create a new Buffers instance with the given file length. pub fn new(file_len: u64) -> Self { Self { offset: 0, file_len, - ranges: Vec::new(), - buffers: Vec::new(), + entries: BTreeMap::new(), } } - /// Push all the ranges and buffers + /// Push a new range and its associated buffer. + pub fn push_range(&mut self, range: Range, buffer: Bytes) { + self.push_ranges(vec![range], vec![buffer]); + } + + /// Push all the ranges and buffers. pub fn push_ranges(&mut self, ranges: Vec>, buffers: Vec) { assert_eq!( ranges.len(), @@ -93,30 +135,46 @@ impl PushBuffers { "Number of ranges must match number of buffers" ); for (range, buffer) in ranges.into_iter().zip(buffers.into_iter()) { - self.push_range(range, buffer); + assert_eq!( + (range.end - range.start) as usize, + buffer.len(), + "Range length must match buffer length" + ); + let live_bytes = buffer.len() as u64; + self.entries.insert( + range.start, + BufferValue { + end: range.end, + data: buffer, + live_bytes, + }, + ); } } - /// Push a new range and its associated buffer - pub fn push_range(&mut self, range: Range, buffer: Bytes) { - assert_eq!( - (range.end - range.start) as usize, - buffer.len(), - "Range length must match buffer length" - ); - self.ranges.push(range); - self.buffers.push(buffer); - } - - /// Returns true if the Buffers contains data for the given range + /// Returns true if the Buffers contains data for the given range. + /// + /// This supports stitching: the range may span multiple contiguous physical + /// buffers (e.g. fixed-size streaming parts that don't align with column + /// chunk boundaries). pub fn has_range(&self, range: &Range) -> bool { - self.ranges - .iter() - .any(|r| r.start <= range.start && r.end >= range.end) - } - - fn iter(&self) -> impl Iterator, &Bytes)> { - self.ranges.iter().zip(self.buffers.iter()) + // Find the last buffer with start <= range.start. + let (_, first) = match self.entries.range(..=range.start).next_back() { + Some(entry) => entry, + None => return false, + }; + let mut covered = first.end; + // Walk forward through contiguous buffers until we cover range.end. + for (&entry_start, entry) in self.entries.range((range.start + 1)..) { + if covered >= range.end { + break; + } + if entry_start > covered { + return false; + } + covered = covered.max(entry.end); + } + covered >= range.end } /// return the file length of the Parquet file being read @@ -124,41 +182,63 @@ impl PushBuffers { self.file_len } - /// Specify a new offset - pub fn with_offset(mut self, offset: u64) -> Self { - self.offset = offset; - self + /// Return the total of logically live (unreleased) buffered bytes. + #[cfg(feature = "arrow")] + pub fn buffered_bytes(&self) -> u64 { + self.entries.values().map(|e| e.live_bytes).sum() } - /// Return the total of all buffered ranges + /// Release all buffered ranges and their corresponding data. #[cfg(feature = "arrow")] - pub fn buffered_bytes(&self) -> u64 { - self.ranges.iter().map(|r| r.end - r.start).sum() + pub fn release_all(&mut self) { + self.entries.clear(); } - /// Clear any range and corresponding buffer that is exactly in the ranges_to_clear + /// Release the given byte range. + /// + /// For each buffer that overlaps `range`, the overlap is subtracted from + /// the buffer's live-byte counter. When the counter reaches zero the + /// buffer is dropped. + /// + /// Caller invariant: each byte offset should be released at most once. + /// Double-releasing the same offset will over-decrement the counter, which + /// may drop a buffer prematurely. This is safe (the decoder's `NeedsData` + /// retry loop will re-request the data) but wasteful. #[cfg(feature = "arrow")] - pub fn clear_ranges(&mut self, ranges_to_clear: &[Range]) { - let mut new_ranges = Vec::new(); - let mut new_buffers = Vec::new(); - - for (range, buffer) in self.iter() { - if !ranges_to_clear - .iter() - .any(|r| r.start == range.start && r.end == range.end) - { - new_ranges.push(range.clone()); - new_buffers.push(buffer.clone()); - } + pub fn release_range(&mut self, range: Range) { + if range.start >= range.end { + return; } - self.ranges = new_ranges; - self.buffers = new_buffers; - } - /// Clear all buffered ranges and their corresponding data - pub fn clear_all_ranges(&mut self) { - self.ranges.clear(); - self.buffers.clear(); + // Find the first entry that could overlap: the last entry with + // start <= range.start (its data may extend into the range). + let scan_start = self + .entries + .range(..=range.start) + .next_back() + .map(|(&k, _)| k) + .unwrap_or(range.start); + + // Walk only the overlapping entries, collecting dead keys. + let mut dead_keys = Vec::new(); + for (&start, value) in self.entries.range_mut(scan_start..range.end) { + if value.end <= range.start { + continue; + } + let overlap_start = start.max(range.start); + let overlap_end = value.end.min(range.end); + let overlap = overlap_end - overlap_start; + value.live_bytes = value + .live_bytes + .checked_sub(overlap) + .expect("release_range: overlap exceeds live_bytes — likely double-release"); + if value.live_bytes == 0 { + dead_keys.push(start); + } + } + for key in dead_keys { + self.entries.remove(&key); + } } } @@ -168,54 +248,410 @@ impl Length for PushBuffers { } } -/// less efficient implementation of Read for Buffers +/// Implementation of Read for Buffers with stitching across adjacent buffers. impl std::io::Read for PushBuffers { fn read(&mut self, buf: &mut [u8]) -> std::io::Result { - // Find the range that contains the start offset - let mut found = false; - for (range, data) in self.iter() { - if range.start <= self.offset && range.end >= self.offset + buf.len() as u64 { - // Found the range, figure out the starting offset in the buffer - let start_offset = (self.offset - range.start) as usize; - let end_offset = start_offset + buf.len(); - let slice = data.slice(start_offset..end_offset); - buf.copy_from_slice(slice.as_ref()); - found = true; + let needed = buf.len() as u64; + let end = self.offset + needed; + + // Find the last buffer with start <= self.offset. + let (&first_start, _) = + self.entries + .range(..=self.offset) + .next_back() + .ok_or_else(|| { + std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "No data available in Buffers", + ) + })?; + + let mut written = 0usize; + let mut cursor = self.offset; + for (&entry_start, entry) in self.entries.range(first_start..) { + if cursor >= end || entry_start > cursor { break; } + let buf_start = (cursor - entry_start) as usize; + let buf_end = ((end.min(entry.end)) - entry_start) as usize; + let chunk = &entry.data[buf_start..buf_end]; + buf[written..written + chunk.len()].copy_from_slice(chunk); + written += chunk.len(); + cursor = entry.end.max(cursor); } - if found { - // If we found the range, we can return the number of bytes read - // advance our offset - self.offset += buf.len() as u64; - Ok(buf.len()) - } else { - Err(std::io::Error::new( + + if written == 0 { + return Err(std::io::Error::new( std::io::ErrorKind::UnexpectedEof, "No data available in Buffers", - )) + )); } + self.offset += written as u64; + Ok(written) } } -impl ChunkReader for PushBuffers { - type T = Self; +impl PushBuffers { + /// Look up bytes, returning a zero-copy slice when a single buffer + /// covers the request, or stitching across contiguous buffers otherwise. + pub fn get_bytes(&mut self, start: u64, length: usize) -> Result { + let end = start + length as u64; - fn get_read(&self, start: u64) -> Result { - Ok(self.clone().with_offset(self.offset + start)) - } + // Find the last buffer with start_offset <= `start`. + let (&first_start, first) = self + .entries + .range(..=start) + .next_back() + .ok_or(ParquetError::NeedMoreDataRange(start..end))?; - fn get_bytes(&self, start: u64, length: usize) -> Result { - // find the range that contains the start offset - for (range, data) in self.iter() { - if range.start <= start && range.end >= start + length as u64 { - // Found the range, figure out the starting offset in the buffer - let start_offset = (start - range.start) as usize; - return Ok(data.slice(start_offset..start_offset + length)); + // Fast path: single buffer covers the entire request (zero-copy). + if first.end >= end { + let off = (start - first_start) as usize; + return Ok(first.data.slice(off..off + length)); + } + + // Slow path: stitch across multiple contiguous buffers. + let mut buf = BytesMut::with_capacity(length); + let mut cursor = start; + for (&entry_start, entry) in self.entries.range(first_start..) { + if cursor >= end { + break; + } + if entry_start > cursor { + return Err(ParquetError::NeedMoreDataRange(start..end)); } + let buf_start = (cursor - entry_start) as usize; + let buf_end = ((end.min(entry.end)) - entry_start) as usize; + buf.extend_from_slice(&entry.data[buf_start..buf_end]); + cursor = entry.end.max(cursor); + } + if cursor < end { + return Err(ParquetError::NeedMoreDataRange(start..end)); + } + Ok(buf.freeze()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::io::Read; + + /// Helper: create PushBuffers with the given (start, data) pairs. + fn make_buffers(parts: &[(u64, &[u8])]) -> PushBuffers { + let file_len = parts + .iter() + .map(|(s, d)| s + d.len() as u64) + .max() + .unwrap_or(0); + let mut pb = PushBuffers::new(file_len); + for &(start, data) in parts { + let end = start + data.len() as u64; + pb.push_range(start..end, Bytes::copy_from_slice(data)); + } + pb + } + + // --------------------------------------------------------------- + // has_range + // --------------------------------------------------------------- + + #[test] + fn has_range_single_buffer() { + let pb = make_buffers(&[(0, &[1, 2, 3, 4, 5])]); + assert!(pb.has_range(&(0..5))); + assert!(pb.has_range(&(1..4))); + assert!(!pb.has_range(&(0..6))); + } + + #[test] + fn has_range_two_adjacent_buffers() { + let pb = make_buffers(&[(0, &[1, 2, 3]), (3, &[4, 5, 6])]); + assert!(pb.has_range(&(0..6))); + assert!(pb.has_range(&(2..5))); + } + + #[test] + fn has_range_three_adjacent_buffers() { + let pb = make_buffers(&[(0, &[1, 2]), (2, &[3, 4]), (4, &[5, 6])]); + assert!(pb.has_range(&(0..6))); + assert!(pb.has_range(&(1..5))); + } + + #[test] + fn has_range_gap() { + let pb = make_buffers(&[(0, &[1, 2]), (5, &[6, 7])]); + assert!(!pb.has_range(&(0..7))); + assert!(!pb.has_range(&(1..6))); + assert!(pb.has_range(&(0..2))); + assert!(pb.has_range(&(5..7))); + } + + #[test] + fn has_range_before_any_buffer() { + let pb = make_buffers(&[(10, &[1, 2, 3])]); + assert!(!pb.has_range(&(0..5))); + } + + #[test] + fn has_range_after_all_buffers() { + let pb = make_buffers(&[(0, &[1, 2, 3])]); + assert!(!pb.has_range(&(5..10))); + } + + #[test] + fn has_range_overlapping_buffers() { + let pb = make_buffers(&[(0, &[1, 2, 3, 4, 5]), (3, &[4, 5, 6, 7, 8])]); + assert!(pb.has_range(&(0..8))); + assert!(pb.has_range(&(2..7))); + } + + // --------------------------------------------------------------- + // get_bytes + // --------------------------------------------------------------- + + #[test] + fn get_bytes_single_buffer() { + let mut pb = make_buffers(&[(0, &[10, 20, 30, 40, 50])]); + let b = pb.get_bytes(1, 3).unwrap(); + assert_eq!(&*b, &[20, 30, 40]); + } + + #[test] + fn get_bytes_stitching_two_buffers() { + let mut pb = make_buffers(&[(0, &[10, 20, 30]), (3, &[40, 50, 60])]); + let b = pb.get_bytes(1, 4).unwrap(); + assert_eq!(&*b, &[20, 30, 40, 50]); + } + + #[test] + fn get_bytes_stitching_three_buffers() { + let mut pb = make_buffers(&[(0, &[1, 2]), (2, &[3, 4]), (4, &[5, 6])]); + let b = pb.get_bytes(0, 6).unwrap(); + assert_eq!(&*b, &[1, 2, 3, 4, 5, 6]); + } + + #[test] + fn get_bytes_gap_returns_error() { + let mut pb = make_buffers(&[(0, &[1, 2]), (5, &[6, 7])]); + let err = pb.get_bytes(0, 7).unwrap_err(); + assert!(matches!(err, ParquetError::NeedMoreDataRange(_))); + } + + #[test] + fn get_bytes_before_any_buffer() { + let mut pb = make_buffers(&[(10, &[1, 2, 3])]); + let err = pb.get_bytes(0, 5).unwrap_err(); + assert!(matches!(err, ParquetError::NeedMoreDataRange(_))); + } + + #[test] + fn get_bytes_extends_past_last_buffer() { + let mut pb = make_buffers(&[(0, &[1, 2, 3])]); + let err = pb.get_bytes(0, 10).unwrap_err(); + assert!(matches!(err, ParquetError::NeedMoreDataRange(_))); + } + + #[test] + fn get_bytes_overlapping_buffers() { + let mut pb = make_buffers(&[(0, &[1, 2, 3, 4, 5]), (3, &[4, 5, 6, 7, 8])]); + let b = pb.get_bytes(0, 8).unwrap(); + assert_eq!(&*b, &[1, 2, 3, 4, 5, 6, 7, 8]); + } + + #[test] + fn get_bytes_overlapping_buffers_interior() { + let mut pb = make_buffers(&[(0, &[1, 2, 3, 4, 5]), (3, &[4, 5, 6, 7, 8])]); + let b = pb.get_bytes(2, 5).unwrap(); + assert_eq!(&*b, &[3, 4, 5, 6, 7]); + } + + // --------------------------------------------------------------- + // Read impl + // --------------------------------------------------------------- + + #[test] + fn read_stitching_across_buffers() { + let mut pb = make_buffers(&[(0, &[10, 20, 30]), (3, &[40, 50, 60])]); + let mut buf = [0u8; 6]; + let n = pb.read(&mut buf).unwrap(); + assert_eq!(n, 6); + assert_eq!(buf, [10, 20, 30, 40, 50, 60]); + } + + #[test] + fn read_sequential_across_buffers() { + let mut pb = make_buffers(&[(0, &[1, 2, 3]), (3, &[4, 5, 6])]); + let mut buf = [0u8; 4]; + let n = pb.read(&mut buf).unwrap(); + assert_eq!(n, 4); + assert_eq!(buf, [1, 2, 3, 4]); + let mut buf2 = [0u8; 2]; + let n = pb.read(&mut buf2).unwrap(); + assert_eq!(n, 2); + assert_eq!(buf2, [5, 6]); + } + + #[test] + fn read_overlapping_buffers() { + let mut pb = make_buffers(&[(0, &[1, 2, 3, 4, 5]), (3, &[4, 5, 6, 7, 8])]); + let mut buf = [0u8; 8]; + let n = pb.read(&mut buf).unwrap(); + assert_eq!(n, 8); + assert_eq!(buf, [1, 2, 3, 4, 5, 6, 7, 8]); + } + + #[test] + fn read_overlapping_buffers_sequential() { + let mut pb = make_buffers(&[(0, &[1, 2, 3, 4, 5]), (3, &[4, 5, 6, 7, 8])]); + let mut buf1 = [0u8; 4]; + let n = pb.read(&mut buf1).unwrap(); + assert_eq!(n, 4); + assert_eq!(buf1, [1, 2, 3, 4]); + let mut buf2 = [0u8; 4]; + let n = pb.read(&mut buf2).unwrap(); + assert_eq!(n, 4); + assert_eq!(buf2, [5, 6, 7, 8]); + } + + // --------------------------------------------------------------- + // Out-of-order push (BTreeMap keeps order automatically) + // --------------------------------------------------------------- + + #[test] + fn out_of_order_push_has_range() { + let mut pb = PushBuffers::new(6); + pb.push_range(3..6, Bytes::from_static(&[4, 5, 6])); + pb.push_range(0..3, Bytes::from_static(&[1, 2, 3])); + assert!(pb.has_range(&(0..6))); + assert!(pb.has_range(&(1..5))); + } + + #[test] + fn out_of_order_push_get_bytes() { + let mut pb = PushBuffers::new(9); + pb.push_range(6..9, Bytes::from_static(&[7, 8, 9])); + pb.push_range(0..3, Bytes::from_static(&[1, 2, 3])); + pb.push_range(3..6, Bytes::from_static(&[4, 5, 6])); + let b = pb.get_bytes(0, 9).unwrap(); + assert_eq!(&*b, &[1, 2, 3, 4, 5, 6, 7, 8, 9]); + } + + #[test] + fn out_of_order_push_read() { + let mut pb = PushBuffers::new(6); + pb.push_range(3..6, Bytes::from_static(&[4, 5, 6])); + pb.push_range(0..3, Bytes::from_static(&[1, 2, 3])); + let mut buf = [0u8; 6]; + let n = pb.read(&mut buf).unwrap(); + assert_eq!(n, 6); + assert_eq!(buf, [1, 2, 3, 4, 5, 6]); + } + + // --------------------------------------------------------------- + // release_range + // --------------------------------------------------------------- + + #[cfg(feature = "arrow")] + mod release_tests { + use super::*; + + #[test] + fn release_range_exact_buffer() { + let mut pb = make_buffers(&[(0, &[1, 2, 3]), (3, &[4, 5, 6])]); + pb.release_range(0..3); + assert_eq!(pb.entries.len(), 1); + assert!(pb.entries.contains_key(&3)); + assert_eq!(pb.buffered_bytes(), 3); + } + + #[test] + fn release_range_partial_overlap() { + let mut pb = make_buffers(&[(0, &[1, 2, 3, 4, 5]), (5, &[6, 7, 8, 9, 10])]); + assert_eq!(pb.buffered_bytes(), 10); + // Release middle section that spans both buffers. + pb.release_range(3..7); + // Both buffers still alive (partial release). + assert_eq!(pb.entries.len(), 2); + // First buffer: 5 - 2 = 3 live bytes + assert_eq!(pb.entries[&0].live_bytes, 3); + // Second buffer: 5 - 2 = 3 live bytes + assert_eq!(pb.entries[&5].live_bytes, 3); + assert_eq!(pb.buffered_bytes(), 6); + // Data is still accessible (live_bytes is bookkeeping only). + assert!(pb.has_range(&(0..10))); + } + + #[test] + fn release_range_drops_middle_buffer() { + let mut pb = make_buffers(&[(0, &[1, 2, 3]), (3, &[4, 5, 6]), (6, &[7, 8, 9])]); + pb.release_range(3..6); + assert_eq!(pb.entries.len(), 2); + assert!(pb.entries.contains_key(&0)); + assert!(pb.entries.contains_key(&6)); + } + + #[test] + fn release_range_no_overlap() { + let mut pb = make_buffers(&[(10, &[1, 2, 3])]); + pb.release_range(0..5); + assert_eq!(pb.entries.len(), 1); + assert_eq!(pb.buffered_bytes(), 3); + } + + #[test] + fn release_range_superset() { + let mut pb = make_buffers(&[(2, &[1, 2, 3, 4, 5, 6])]); + // Release range is larger than the buffer. + pb.release_range(0..10); + assert!(pb.entries.is_empty()); + assert_eq!(pb.buffered_bytes(), 0); + } + + #[test] + fn release_range_out_of_order() { + // Simulate non-sequential row group access: release "later" range first. + let mut pb = make_buffers(&[(0, &[1, 2, 3]), (3, &[4, 5, 6])]); + pb.release_range(3..6); + assert_eq!(pb.entries.len(), 1); + assert!(pb.entries.contains_key(&0)); + pb.release_range(0..3); + assert!(pb.entries.is_empty()); + } + + #[test] + fn release_range_empty() { + let mut pb = make_buffers(&[(0, &[1, 2, 3])]); + pb.release_range(5..5); + assert_eq!(pb.entries.len(), 1); + assert_eq!(pb.buffered_bytes(), 3); + } + + #[test] + fn release_range_incremental_on_single_buffer() { + // One big buffer, released in chunks (like multiple row groups). + let mut pb = make_buffers(&[(0, &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10])]); + assert_eq!(pb.buffered_bytes(), 10); + + pb.release_range(0..3); + assert_eq!(pb.entries.len(), 1); + assert_eq!(pb.buffered_bytes(), 7); + + pb.release_range(3..7); + assert_eq!(pb.entries.len(), 1); + assert_eq!(pb.buffered_bytes(), 3); + + pb.release_range(7..10); + assert!(pb.entries.is_empty()); + assert_eq!(pb.buffered_bytes(), 0); + } + + #[test] + fn buffered_bytes_tracks_live() { + let mut pb = make_buffers(&[(0, &[1, 2, 3]), (3, &[4, 5, 6])]); + assert_eq!(pb.buffered_bytes(), 6); + pb.release_range(0..6); + assert_eq!(pb.buffered_bytes(), 0); } - // Signal that we need more data - let requested_end = start + length as u64; - Err(ParquetError::NeedMoreDataRange(start..requested_end)) } } diff --git a/parquet/src/util/retention.rs b/parquet/src/util/retention.rs new file mode 100644 index 000000000000..d53320e44dc6 --- /dev/null +++ b/parquet/src/util/retention.rs @@ -0,0 +1,280 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use bytes::Bytes; +use std::ops::Range; + +use crate::file::metadata::ParquetMetaData; + +/// A sorted, non-overlapping set of byte ranges that the decoder expects to +/// consume. +/// +/// When attached to a `RowGroupReaderBuilder`, incoming buffers are filtered +/// against this set: only the portions that overlap a retained range are +/// stored. Everything else is silently discarded. +/// +/// This prevents speculatively prefetched data for row groups the decoder will +/// never process from accumulating in memory. +#[derive(Debug, Clone)] +pub(crate) struct RetentionSet { + /// Sorted, non-overlapping, merged ranges. + ranges: Vec>, +} + +impl RetentionSet { + /// Build a retention set from the column chunk byte ranges of the given + /// row groups. + /// + /// All column chunks (regardless of projection) for each queued row group + /// are included — this is a conservative superset of what the decoder will + /// actually read. + pub fn from_row_groups(metadata: &ParquetMetaData, row_groups: &[usize]) -> Self { + let total_cols: usize = row_groups + .iter() + .map(|&rg| metadata.row_group(rg).columns().len()) + .sum(); + let mut ranges: Vec> = Vec::with_capacity(total_cols); + for &rg_idx in row_groups { + let rg = metadata.row_group(rg_idx); + for col in rg.columns() { + let (start, len) = col.byte_range(); + ranges.push(start..start + len); + } + } + ranges.sort_unstable_by_key(|r| r.start); + let mut merged: Vec> = Vec::with_capacity(ranges.len()); + for range in ranges { + if let Some(last) = merged.last_mut() { + if range.start <= last.end { + last.end = last.end.max(range.end); + continue; + } + } + merged.push(range); + } + Self { ranges: merged } + } + + /// Extend the retention set with additional ranges. + /// + /// This is called when the decoder explicitly requests ranges via + /// `NeedsData`. Any range the decoder needs must be admitted on push, + /// even if it wasn't in the original column-chunk-derived set. + /// + /// Empty ranges (`start >= end`) are skipped — they cannot be + /// represented in the interval set. Zero-length pushes are handled + /// separately by bypassing the filter in `push_data`. + pub fn extend(&mut self, ranges: &[Range]) { + let before = self.ranges.len(); + for range in ranges { + if range.start >= range.end { + continue; + } + let insert_at = self.ranges.partition_point(|r| r.start < range.start); + self.ranges.insert(insert_at, range.clone()); + } + if self.ranges.len() == before { + return; // nothing added + } + // Re-merge in case new ranges overlap or abut existing ones. + let mut merged: Vec> = Vec::with_capacity(self.ranges.len()); + for range in self.ranges.drain(..) { + if let Some(last) = merged.last_mut() { + if range.start <= last.end { + last.end = last.end.max(range.end); + continue; + } + } + merged.push(range); + } + self.ranges = merged; + } + + /// Filter incoming ranges and buffers, keeping only the portions that + /// overlap the retention set. + /// + /// Each retained portion is a zero-copy [`Bytes::slice`] of the original + /// buffer. Portions that fall entirely outside the retention set are + /// dropped. + pub fn filter( + &self, + ranges: Vec>, + buffers: Vec, + ) -> (Vec>, Vec) { + let mut out_ranges = Vec::new(); + let mut out_buffers = Vec::new(); + + for (range, buffer) in ranges.into_iter().zip(buffers) { + // Zero-length ranges always pass through. + if range.start >= range.end { + out_ranges.push(range); + out_buffers.push(buffer); + continue; + } + + // Find the first retention range that could overlap: the first + // whose end is past range.start. + let start_idx = self.ranges.partition_point(|r| r.end <= range.start); + + for ret in &self.ranges[start_idx..] { + if ret.start >= range.end { + break; + } + let overlap_start = range.start.max(ret.start); + let overlap_end = range.end.min(ret.end); + let buf_offset = (overlap_start - range.start) as usize; + let buf_len = (overlap_end - overlap_start) as usize; + out_ranges.push(overlap_start..overlap_end); + out_buffers.push(buffer.slice(buf_offset..buf_offset + buf_len)); + } + } + + (out_ranges, out_buffers) + } +} + +#[cfg(test)] +mod tests { + #![allow(clippy::single_range_in_vec_init)] + use super::*; + + fn make_retention(ranges: &[Range]) -> RetentionSet { + let mut sorted: Vec> = ranges.to_vec(); + sorted.sort_unstable_by_key(|r| r.start); + let mut merged: Vec> = Vec::new(); + for range in sorted { + if let Some(last) = merged.last_mut() { + if range.start <= last.end { + last.end = last.end.max(range.end); + continue; + } + } + merged.push(range); + } + RetentionSet { ranges: merged } + } + + #[test] + fn exact_match() { + let ret = make_retention(&[10..20]); + let buf = Bytes::from_static(&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + let (ranges, buffers) = ret.filter(vec![10..20], vec![buf]); + assert_eq!(ranges, vec![10..20]); + assert_eq!(buffers.len(), 1); + assert_eq!(&*buffers[0], &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + } + + #[test] + fn no_overlap() { + let ret = make_retention(&[10..20]); + let buf = Bytes::from_static(&[1, 2, 3]); + let (ranges, buffers) = ret.filter(vec![0..3], vec![buf]); + assert!(ranges.is_empty()); + assert!(buffers.is_empty()); + } + + #[test] + fn partial_overlap_left() { + let ret = make_retention(&[10..20]); + let buf = Bytes::from_static(&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + // Buffer covers 5..15, retention is 10..20 → keep 10..15 + let (ranges, buffers) = ret.filter(vec![5..15], vec![buf]); + assert_eq!(ranges, vec![10..15]); + assert_eq!(&*buffers[0], &[6, 7, 8, 9, 10]); + } + + #[test] + fn partial_overlap_right() { + let ret = make_retention(&[10..20]); + let buf = Bytes::from_static(&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + // Buffer covers 15..25, retention is 10..20 → keep 15..20 + let (ranges, buffers) = ret.filter(vec![15..25], vec![buf]); + assert_eq!(ranges, vec![15..20]); + assert_eq!(&*buffers[0], &[1, 2, 3, 4, 5]); + } + + #[test] + fn buffer_spans_gap_between_retention_ranges() { + // Retention: [10..20) and [30..40). Buffer covers 5..45. + let ret = make_retention(&[10..20, 30..40]); + let data: Vec = (0..40).collect(); + let buf = Bytes::from(data); + let (ranges, buffers) = ret.filter(vec![5..45], vec![buf]); + assert_eq!(ranges, vec![10..20, 30..40]); + assert_eq!(buffers.len(), 2); + // First slice: bytes at offset 5..15 in the buffer (values 5..15) + assert_eq!(&*buffers[0], &[5, 6, 7, 8, 9, 10, 11, 12, 13, 14]); + // Second slice: bytes at offset 25..35 in the buffer (values 25..35) + assert_eq!(&*buffers[1], &[25, 26, 27, 28, 29, 30, 31, 32, 33, 34]); + } + + #[test] + fn superset_buffer_trimmed() { + let ret = make_retention(&[10..20]); + let data: Vec = (0..50).collect(); + let buf = Bytes::from(data); + let (ranges, buffers) = ret.filter(vec![0..50], vec![buf]); + assert_eq!(ranges, vec![10..20]); + assert_eq!(&*buffers[0], &[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]); + } + + #[test] + fn empty_retention_discards_everything() { + let ret = RetentionSet { ranges: Vec::new() }; + let buf = Bytes::from_static(&[1, 2, 3]); + let (ranges, buffers) = ret.filter(vec![0..3], vec![buf]); + assert!(ranges.is_empty()); + assert!(buffers.is_empty()); + } + + #[test] + fn multiple_input_buffers() { + let ret = make_retention(&[10..20, 30..40]); + let buf1 = Bytes::from_static(&[1, 2, 3, 4, 5]); + let buf2 = Bytes::from_static(&[1, 2, 3, 4, 5]); + let buf3 = Bytes::from_static(&[1, 2, 3, 4, 5]); + let (ranges, buffers) = ret.filter(vec![0..5, 10..15, 35..40], vec![buf1, buf2, buf3]); + // First buffer: no overlap. Second: exact. Third: exact. + assert_eq!(ranges, vec![10..15, 35..40]); + assert_eq!(buffers.len(), 2); + } + + #[test] + fn zero_copy_slicing() { + let ret = make_retention(&[10..20]); + let data: Vec = (0..30).collect(); + let buf = Bytes::from(data); + let original_ptr = buf.as_ptr(); + let (_, buffers) = ret.filter(vec![0..30], vec![buf]); + // The output slice should point into the same allocation, + // offset by 10 bytes. + assert_eq!(buffers[0].as_ptr(), unsafe { original_ptr.add(10) },); + } + + #[test] + fn adjacent_retention_ranges_are_merged() { + // Two abutting ranges should merge into one. + let ret = make_retention(&[10..20, 20..30]); + assert_eq!(ret.ranges, vec![10..30]); + let data: Vec = (0..40).collect(); + let buf = Bytes::from(data); + let (ranges, buffers) = ret.filter(vec![0..40], vec![buf]); + // Should produce a single slice, not two. + assert_eq!(ranges, vec![10..30]); + assert_eq!(buffers.len(), 1); + } +}