Skip to content

Commit 58ef433

Browse files
committed
feat(parquet): add retention filter to discard unrequested prefetch data
Per-range release guarantees that every consumed row group is freed. But when the IO layer prefetches aggressively (eg. streaming entire files or over-reading across row group boundaries) data for row groups the decoder will never process enters `PushBuffers` and is never consumed, so `release_range` is never called for it. Add `RetentionSet`, a sorted set of byte ranges derived from column chunk metadata for the queued row groups. Incoming buffers are filtered at push time: only portions overlapping a retained range are stored (zero-copy via `Bytes::slice`); everything else is discarded before reaching `PushBuffers`. Together with the per-range release in the previous commit, this closes the loop on memory management: the IO layer is free to push data in any shape — coalesced, prefetched, uniform-sized, or even the entire file — without knowledge of Parquet layout. The decoder admits only what it will consume, and releases it at row-group boundaries. Signed-off-by: Hippolyte Barraud <hippolyte.barraud@datadoghq.com>
1 parent d56a440 commit 58ef433

4 files changed

Lines changed: 263 additions & 2 deletions

File tree

parquet/src/arrow/push_decoder/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use crate::arrow::arrow_reader::{
2828
use crate::errors::ParquetError;
2929
use crate::file::metadata::ParquetMetaData;
3030
use crate::util::push_buffers::PushBuffers;
31+
use crate::util::retention::RetentionSet;
3132
use arrow_array::RecordBatch;
3233
use bytes::Bytes;
3334
use reader_builder::RowGroupReaderBuilder;
@@ -185,6 +186,7 @@ impl ParquetPushDecoderBuilder {
185186
// Prepare to build RowGroup readers
186187
let file_len = 0; // not used in push decoder
187188
let buffers = PushBuffers::new(file_len);
189+
let retention = RetentionSet::from_row_groups(&parquet_metadata, &row_groups);
188190
let row_group_reader_builder = RowGroupReaderBuilder::new(
189191
batch_size,
190192
projection,
@@ -197,6 +199,7 @@ impl ParquetPushDecoderBuilder {
197199
max_predicate_cache_size,
198200
buffers,
199201
row_selection_policy,
202+
Some(retention),
200203
);
201204

202205
// Initialize the decoder with the configured options

parquet/src/arrow/push_decoder/reader_builder/mod.rs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use crate::errors::ParquetError;
3434
use crate::file::metadata::ParquetMetaData;
3535
use crate::file::page_index::offset_index::OffsetIndexMetaData;
3636
use crate::util::push_buffers::PushBuffers;
37+
use crate::util::retention::RetentionSet;
3738
use bytes::Bytes;
3839
use data::DataRequest;
3940
use filter::AdvanceResult;
@@ -168,6 +169,10 @@ pub(crate) struct RowGroupReaderBuilder {
168169

169170
/// The underlying data store
170171
buffers: PushBuffers,
172+
173+
/// Optional retention filter. When present, incoming `push_data` buffers
174+
/// are trimmed to only keep byte ranges the decoder will eventually need.
175+
retention: Option<RetentionSet>,
171176
}
172177

173178
impl RowGroupReaderBuilder {
@@ -185,6 +190,7 @@ impl RowGroupReaderBuilder {
185190
max_predicate_cache_size: usize,
186191
buffers: PushBuffers,
187192
row_selection_policy: RowSelectionPolicy,
193+
retention: Option<RetentionSet>,
188194
) -> Self {
189195
Self {
190196
batch_size,
@@ -199,12 +205,23 @@ impl RowGroupReaderBuilder {
199205
row_selection_policy,
200206
state: Some(RowGroupDecoderState::Finished),
201207
buffers,
208+
retention,
202209
}
203210
}
204211

205-
/// Push new data buffers that can be used to satisfy pending requests
212+
/// Push new data buffers that can be used to satisfy pending requests.
213+
///
214+
/// When a [`RetentionSet`] is configured, incoming buffers are filtered so
215+
/// that only byte ranges the decoder will eventually need are stored.
216+
/// Portions outside the retention set are silently discarded.
206217
pub fn push_data(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) {
207-
self.buffers.push_ranges(ranges, buffers);
218+
let (ranges, buffers) = match &self.retention {
219+
Some(retention) => retention.filter(ranges, buffers),
220+
None => (ranges, buffers),
221+
};
222+
if !ranges.is_empty() {
223+
self.buffers.push_ranges(ranges, buffers);
224+
}
208225
}
209226

210227
/// Returns the total number of buffered bytes available

parquet/src/util/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ mod bit_pack;
2121
pub(crate) mod interner;
2222

2323
pub(crate) mod push_buffers;
24+
pub(crate) mod retention;
2425
#[cfg(any(test, feature = "test_common"))]
2526
pub(crate) mod test_common;
2627
pub mod utf8;

parquet/src/util/retention.rs

Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use bytes::Bytes;
19+
use std::ops::Range;
20+
21+
use crate::file::metadata::ParquetMetaData;
22+
23+
/// A sorted, non-overlapping set of byte ranges that the decoder expects to
24+
/// consume.
25+
///
26+
/// When attached to a [`RowGroupReaderBuilder`], incoming buffers are filtered
27+
/// against this set: only the portions that overlap a retained range are
28+
/// stored. Everything else is silently discarded.
29+
///
30+
/// This prevents speculatively prefetched data for row groups the decoder will
31+
/// never process from accumulating in memory.
32+
///
33+
/// [`RowGroupReaderBuilder`]: crate::arrow::push_decoder::reader_builder::RowGroupReaderBuilder
34+
#[derive(Debug, Clone)]
35+
pub(crate) struct RetentionSet {
36+
/// Sorted, non-overlapping, merged ranges.
37+
ranges: Vec<Range<u64>>,
38+
}
39+
40+
impl RetentionSet {
41+
/// Build a retention set from the column chunk byte ranges of the given
42+
/// row groups.
43+
///
44+
/// All column chunks (regardless of projection) for each queued row group
45+
/// are included — this is a conservative superset of what the decoder will
46+
/// actually read.
47+
pub fn from_row_groups(metadata: &ParquetMetaData, row_groups: &[usize]) -> Self {
48+
let total_cols: usize = row_groups
49+
.iter()
50+
.map(|&rg| metadata.row_group(rg).columns().len())
51+
.sum();
52+
let mut ranges: Vec<Range<u64>> = Vec::with_capacity(total_cols);
53+
for &rg_idx in row_groups {
54+
let rg = metadata.row_group(rg_idx);
55+
for col in rg.columns() {
56+
let (start, len) = col.byte_range();
57+
ranges.push(start..start + len);
58+
}
59+
}
60+
ranges.sort_unstable_by_key(|r| r.start);
61+
let mut merged: Vec<Range<u64>> = Vec::with_capacity(ranges.len());
62+
for range in ranges {
63+
if let Some(last) = merged.last_mut() {
64+
if range.start <= last.end {
65+
last.end = last.end.max(range.end);
66+
continue;
67+
}
68+
}
69+
merged.push(range);
70+
}
71+
Self { ranges: merged }
72+
}
73+
74+
/// Filter incoming ranges and buffers, keeping only the portions that
75+
/// overlap the retention set.
76+
///
77+
/// Each retained portion is a zero-copy [`Bytes::slice`] of the original
78+
/// buffer. Portions that fall entirely outside the retention set are
79+
/// dropped.
80+
pub fn filter(
81+
&self,
82+
ranges: Vec<Range<u64>>,
83+
buffers: Vec<Bytes>,
84+
) -> (Vec<Range<u64>>, Vec<Bytes>) {
85+
let mut out_ranges = Vec::new();
86+
let mut out_buffers = Vec::new();
87+
88+
for (range, buffer) in ranges.into_iter().zip(buffers) {
89+
// Find the first retention range that could overlap: the first
90+
// whose end is past range.start.
91+
let start_idx = self.ranges.partition_point(|r| r.end <= range.start);
92+
93+
for ret in &self.ranges[start_idx..] {
94+
if ret.start >= range.end {
95+
break;
96+
}
97+
let overlap_start = range.start.max(ret.start);
98+
let overlap_end = range.end.min(ret.end);
99+
let buf_offset = (overlap_start - range.start) as usize;
100+
let buf_len = (overlap_end - overlap_start) as usize;
101+
out_ranges.push(overlap_start..overlap_end);
102+
out_buffers.push(buffer.slice(buf_offset..buf_offset + buf_len));
103+
}
104+
}
105+
106+
(out_ranges, out_buffers)
107+
}
108+
}
109+
110+
#[cfg(test)]
111+
mod tests {
112+
#![allow(clippy::single_range_in_vec_init)]
113+
use super::*;
114+
115+
fn make_retention(ranges: &[Range<u64>]) -> RetentionSet {
116+
let mut sorted: Vec<Range<u64>> = ranges.to_vec();
117+
sorted.sort_unstable_by_key(|r| r.start);
118+
let mut merged: Vec<Range<u64>> = Vec::new();
119+
for range in sorted {
120+
if let Some(last) = merged.last_mut() {
121+
if range.start <= last.end {
122+
last.end = last.end.max(range.end);
123+
continue;
124+
}
125+
}
126+
merged.push(range);
127+
}
128+
RetentionSet { ranges: merged }
129+
}
130+
131+
#[test]
132+
fn exact_match() {
133+
let ret = make_retention(&[10..20]);
134+
let buf = Bytes::from_static(&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
135+
let (ranges, buffers) = ret.filter(vec![10..20], vec![buf]);
136+
assert_eq!(ranges, vec![10..20]);
137+
assert_eq!(buffers.len(), 1);
138+
assert_eq!(&*buffers[0], &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
139+
}
140+
141+
#[test]
142+
fn no_overlap() {
143+
let ret = make_retention(&[10..20]);
144+
let buf = Bytes::from_static(&[1, 2, 3]);
145+
let (ranges, buffers) = ret.filter(vec![0..3], vec![buf]);
146+
assert!(ranges.is_empty());
147+
assert!(buffers.is_empty());
148+
}
149+
150+
#[test]
151+
fn partial_overlap_left() {
152+
let ret = make_retention(&[10..20]);
153+
let buf = Bytes::from_static(&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
154+
// Buffer covers 5..15, retention is 10..20 → keep 10..15
155+
let (ranges, buffers) = ret.filter(vec![5..15], vec![buf]);
156+
assert_eq!(ranges, vec![10..15]);
157+
assert_eq!(&*buffers[0], &[6, 7, 8, 9, 10]);
158+
}
159+
160+
#[test]
161+
fn partial_overlap_right() {
162+
let ret = make_retention(&[10..20]);
163+
let buf = Bytes::from_static(&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
164+
// Buffer covers 15..25, retention is 10..20 → keep 15..20
165+
let (ranges, buffers) = ret.filter(vec![15..25], vec![buf]);
166+
assert_eq!(ranges, vec![15..20]);
167+
assert_eq!(&*buffers[0], &[1, 2, 3, 4, 5]);
168+
}
169+
170+
#[test]
171+
fn buffer_spans_gap_between_retention_ranges() {
172+
// Retention: [10..20) and [30..40). Buffer covers 5..45.
173+
let ret = make_retention(&[10..20, 30..40]);
174+
let data: Vec<u8> = (0..40).collect();
175+
let buf = Bytes::from(data);
176+
let (ranges, buffers) = ret.filter(vec![5..45], vec![buf]);
177+
assert_eq!(ranges, vec![10..20, 30..40]);
178+
assert_eq!(buffers.len(), 2);
179+
// First slice: bytes at offset 5..15 in the buffer (values 5..15)
180+
assert_eq!(&*buffers[0], &[5, 6, 7, 8, 9, 10, 11, 12, 13, 14]);
181+
// Second slice: bytes at offset 25..35 in the buffer (values 25..35)
182+
assert_eq!(&*buffers[1], &[25, 26, 27, 28, 29, 30, 31, 32, 33, 34]);
183+
}
184+
185+
#[test]
186+
fn superset_buffer_trimmed() {
187+
let ret = make_retention(&[10..20]);
188+
let data: Vec<u8> = (0..50).collect();
189+
let buf = Bytes::from(data);
190+
let (ranges, buffers) = ret.filter(vec![0..50], vec![buf]);
191+
assert_eq!(ranges, vec![10..20]);
192+
assert_eq!(&*buffers[0], &[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]);
193+
}
194+
195+
#[test]
196+
fn empty_retention_discards_everything() {
197+
let ret = RetentionSet { ranges: Vec::new() };
198+
let buf = Bytes::from_static(&[1, 2, 3]);
199+
let (ranges, buffers) = ret.filter(vec![0..3], vec![buf]);
200+
assert!(ranges.is_empty());
201+
assert!(buffers.is_empty());
202+
}
203+
204+
#[test]
205+
fn multiple_input_buffers() {
206+
let ret = make_retention(&[10..20, 30..40]);
207+
let buf1 = Bytes::from_static(&[1, 2, 3, 4, 5]);
208+
let buf2 = Bytes::from_static(&[1, 2, 3, 4, 5]);
209+
let buf3 = Bytes::from_static(&[1, 2, 3, 4, 5]);
210+
let (ranges, buffers) = ret.filter(vec![0..5, 10..15, 35..40], vec![buf1, buf2, buf3]);
211+
// First buffer: no overlap. Second: exact. Third: exact.
212+
assert_eq!(ranges, vec![10..15, 35..40]);
213+
assert_eq!(buffers.len(), 2);
214+
}
215+
216+
#[test]
217+
fn zero_copy_slicing() {
218+
let ret = make_retention(&[10..20]);
219+
let data: Vec<u8> = (0..30).collect();
220+
let buf = Bytes::from(data);
221+
let original_ptr = buf.as_ptr();
222+
let (_, buffers) = ret.filter(vec![0..30], vec![buf]);
223+
// The output slice should point into the same allocation,
224+
// offset by 10 bytes.
225+
assert_eq!(buffers[0].as_ptr(), unsafe { original_ptr.add(10) },);
226+
}
227+
228+
#[test]
229+
fn adjacent_retention_ranges_are_merged() {
230+
// Two abutting ranges should merge into one.
231+
let ret = make_retention(&[10..20, 20..30]);
232+
assert_eq!(ret.ranges, vec![10..30]);
233+
let data: Vec<u8> = (0..40).collect();
234+
let buf = Bytes::from(data);
235+
let (ranges, buffers) = ret.filter(vec![0..40], vec![buf]);
236+
// Should produce a single slice, not two.
237+
assert_eq!(ranges, vec![10..30]);
238+
assert_eq!(buffers.len(), 1);
239+
}
240+
}

0 commit comments

Comments
 (0)