Skip to content

Commit 52cb122

Browse files
committed
parquet: introduce a row-group frontier for push decoding
Move the cross-row-group scan state into a dedicated `RowGroupFrontier`. The frontier now owns the queued row groups, the tail `RowSelection`, the running `RowBudget`, and the conservative "has predicates" flag. Reduce `RowGroupReaderBuilder` to current-row-group work only by threading a budget snapshot into `next_row_group` and returning a typed `RowGroupBuildResult`. This also folds in the selection-frontier cleanup so queued selection state is consumed in one place instead of through ad hoc split/clone logic. Signed-off-by: Hippolyte Barraud <hippolyte.barraud@datadoghq.com>
1 parent 1dd51f2 commit 52cb122

3 files changed

Lines changed: 316 additions & 103 deletions

File tree

parquet/src/arrow/push_decoder/mod.rs

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use crate::file::metadata::ParquetMetaData;
3030
use crate::util::push_buffers::PushBuffers;
3131
use arrow_array::RecordBatch;
3232
use bytes::Bytes;
33-
use reader_builder::RowGroupReaderBuilder;
33+
use reader_builder::{RowBudget, RowGroupReaderBuilder};
3434
use remaining::RemainingRowGroups;
3535
use std::ops::Range;
3636
use std::sync::Arc;
@@ -181,6 +181,9 @@ impl ParquetPushDecoderBuilder {
181181
// If no row groups were specified, read all of them
182182
let row_groups =
183183
row_groups.unwrap_or_else(|| (0..parquet_metadata.num_row_groups()).collect());
184+
let has_predicates = filter
185+
.as_ref()
186+
.is_some_and(|filter| !filter.predicates.is_empty());
184187

185188
// Prepare to build RowGroup readers
186189
let file_len = 0; // not used in push decoder
@@ -191,8 +194,6 @@ impl ParquetPushDecoderBuilder {
191194
Arc::clone(&parquet_metadata),
192195
fields,
193196
filter,
194-
limit,
195-
offset,
196197
metrics,
197198
max_predicate_cache_size,
198199
buffers,
@@ -204,6 +205,8 @@ impl ParquetPushDecoderBuilder {
204205
parquet_metadata,
205206
row_groups,
206207
selection,
208+
RowBudget::new(offset, limit),
209+
has_predicates,
207210
row_group_reader_builder,
208211
);
209212

@@ -1402,6 +1405,28 @@ mod test {
14021405
expect_finished(decoder.try_decode());
14031406
}
14041407

1408+
#[test]
1409+
fn test_decoder_try_next_reader_offset_limit() {
1410+
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
1411+
.unwrap()
1412+
.with_offset(225)
1413+
.with_limit(20)
1414+
.build()
1415+
.unwrap();
1416+
1417+
let ranges = expect_needs_data(decoder.try_next_reader());
1418+
push_ranges_to_decoder(&mut decoder, ranges);
1419+
1420+
let reader = expect_data(decoder.try_next_reader());
1421+
let batches = reader
1422+
.map(|batch| batch.expect("expected decoded batch"))
1423+
.collect::<Vec<_>>();
1424+
let output = concat_batches(&TEST_BATCH.schema(), &batches).unwrap();
1425+
assert_eq!(output, TEST_BATCH.slice(225, 20));
1426+
1427+
expect_finished(decoder.try_next_reader());
1428+
}
1429+
14051430
#[test]
14061431
fn test_decoder_row_group_selection() {
14071432
// take only the second row group

0 commit comments

Comments
 (0)