Skip to content

Commit 1dd51f2

Browse files
committed
parquet: extract push decoder row budget helper
Extract the push decoder offset/limit accounting into `RowBudget` and use it when planning row-group reads. This centralizes the row-count arithmetic needed to apply offset and limit without changing decoder behavior. It also adds focused tests for plain limit, offset+limit, and empty-selection cases so later frontier work can reuse the same accounting safely. Signed-off-by: Hippolyte Barraud <hippolyte.barraud@datadoghq.com>
1 parent 4fa8d2f commit 1dd51f2

1 file changed

Lines changed: 110 additions & 25 deletions

File tree

  • parquet/src/arrow/push_decoder/reader_builder

parquet/src/arrow/push_decoder/reader_builder/mod.rs

Lines changed: 110 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,60 @@ enum RowGroupDecoderState {
8888
Finished,
8989
}
9090

91+
/// Running offset/limit budget shared across row groups.
92+
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
93+
struct RowBudget {
94+
offset: Option<usize>,
95+
limit: Option<usize>,
96+
}
97+
98+
impl RowBudget {
99+
fn new(offset: Option<usize>, limit: Option<usize>) -> Self {
100+
Self { offset, limit }
101+
}
102+
103+
fn apply_to_plan(self, plan_builder: ReadPlanBuilder, row_count: usize) -> BudgetedReadPlan {
104+
let rows_before_budget = plan_builder.num_rows_selected().unwrap_or(row_count);
105+
let plan_builder = plan_builder
106+
.limited(row_count)
107+
.with_offset(self.offset)
108+
.with_limit(self.limit)
109+
.build_limited();
110+
let rows_after_budget = plan_builder.num_rows_selected().unwrap_or(row_count);
111+
112+
BudgetedReadPlan {
113+
plan_builder,
114+
rows_before_budget,
115+
rows_after_budget,
116+
remaining_budget: self.advance(rows_before_budget, rows_after_budget),
117+
}
118+
}
119+
120+
fn advance(mut self, rows_before_budget: usize, rows_after_budget: usize) -> Self {
121+
if let Some(offset) = &mut self.offset {
122+
// Reduction is either because of offset or limit, as limit is applied
123+
// after offset has been "exhausted" can just use saturating sub here.
124+
*offset = offset.saturating_sub(rows_before_budget - rows_after_budget);
125+
}
126+
127+
if rows_after_budget != 0 {
128+
if let Some(limit) = &mut self.limit {
129+
*limit -= rows_after_budget;
130+
}
131+
}
132+
133+
self
134+
}
135+
}
136+
137+
#[derive(Debug)]
138+
struct BudgetedReadPlan {
139+
plan_builder: ReadPlanBuilder,
140+
rows_before_budget: usize,
141+
rows_after_budget: usize,
142+
remaining_budget: RowBudget,
143+
}
144+
91145
/// Result of a state transition
92146
#[derive(Debug)]
93147
struct NextState {
@@ -533,45 +587,31 @@ impl RowGroupReaderBuilder {
533587
plan_builder,
534588
} = row_group_info;
535589

536-
// Compute the number of rows in the selection before applying limit and offset
537-
let rows_before = plan_builder.num_rows_selected().unwrap_or(row_count);
538-
539-
if rows_before == 0 {
590+
let BudgetedReadPlan {
591+
mut plan_builder,
592+
rows_before_budget,
593+
rows_after_budget,
594+
remaining_budget,
595+
} = RowBudget::new(self.offset, self.limit).apply_to_plan(plan_builder, row_count);
596+
self.offset = remaining_budget.offset;
597+
self.limit = remaining_budget.limit;
598+
599+
if rows_before_budget == 0 {
540600
// ruled out entire row group
541601
return Ok(NextState::result(
542602
RowGroupDecoderState::Finished,
543603
DecodeResult::Finished,
544604
));
545605
}
546606

547-
// Apply any limit and offset
548-
let mut plan_builder = plan_builder
549-
.limited(row_count)
550-
.with_offset(self.offset)
551-
.with_limit(self.limit)
552-
.build_limited();
553-
554-
let rows_after = plan_builder.num_rows_selected().unwrap_or(row_count);
555-
556-
// Update running offset and limit for after the current row group is read
557-
if let Some(offset) = &mut self.offset {
558-
// Reduction is either because of offset or limit, as limit is applied
559-
// after offset has been "exhausted" can just use saturating sub here
560-
*offset = offset.saturating_sub(rows_before - rows_after)
561-
}
562-
563-
if rows_after == 0 {
607+
if rows_after_budget == 0 {
564608
// no rows left after applying limit/offset
565609
return Ok(NextState::result(
566610
RowGroupDecoderState::Finished,
567611
DecodeResult::Finished,
568612
));
569613
}
570614

571-
if let Some(limit) = &mut self.limit {
572-
*limit -= rows_after;
573-
}
574-
575615
let data_request = DataRequestBuilder::new(
576616
row_group_idx,
577617
row_count,
@@ -760,10 +800,55 @@ fn override_selector_strategy_if_needed(
760800
#[cfg(test)]
761801
mod tests {
762802
use super::*;
803+
use crate::arrow::arrow_reader::{RowSelection, RowSelector};
763804

764805
#[test]
765806
// Verify that the size of RowGroupDecoderState does not grow too large
766807
fn test_structure_size() {
767808
assert_eq!(std::mem::size_of::<RowGroupDecoderState>(), 200);
768809
}
810+
811+
#[test]
812+
fn test_row_budget_offset_limit_across_row_groups() {
813+
let first =
814+
RowBudget::new(Some(225), Some(20)).apply_to_plan(ReadPlanBuilder::new(1024), 200);
815+
assert_eq!(first.rows_before_budget, 200);
816+
assert_eq!(first.rows_after_budget, 0);
817+
assert_eq!(first.remaining_budget, RowBudget::new(Some(25), Some(20)));
818+
assert_eq!(first.plan_builder.num_rows_selected(), Some(0));
819+
820+
let second = first
821+
.remaining_budget
822+
.apply_to_plan(ReadPlanBuilder::new(1024), 200);
823+
assert_eq!(second.rows_before_budget, 200);
824+
assert_eq!(second.rows_after_budget, 20);
825+
assert_eq!(second.remaining_budget, RowBudget::new(Some(0), Some(0)));
826+
assert_eq!(second.plan_builder.num_rows_selected(), Some(20));
827+
}
828+
829+
#[test]
830+
fn test_row_budget_limit_only() {
831+
let budgeted =
832+
RowBudget::new(None, Some(20)).apply_to_plan(ReadPlanBuilder::new(1024), 200);
833+
assert_eq!(budgeted.rows_before_budget, 200);
834+
assert_eq!(budgeted.rows_after_budget, 20);
835+
assert_eq!(budgeted.remaining_budget, RowBudget::new(None, Some(0)));
836+
assert_eq!(budgeted.plan_builder.num_rows_selected(), Some(20));
837+
}
838+
839+
#[test]
840+
fn test_row_budget_empty_selection() {
841+
let empty_selection = RowSelection::from(vec![RowSelector::skip(200)]);
842+
let budgeted = RowBudget::new(Some(10), Some(20)).apply_to_plan(
843+
ReadPlanBuilder::new(1024).with_selection(Some(empty_selection)),
844+
200,
845+
);
846+
assert_eq!(budgeted.rows_before_budget, 0);
847+
assert_eq!(budgeted.rows_after_budget, 0);
848+
assert_eq!(
849+
budgeted.remaining_budget,
850+
RowBudget::new(Some(10), Some(20))
851+
);
852+
assert_eq!(budgeted.plan_builder.num_rows_selected(), Some(0));
853+
}
769854
}

0 commit comments

Comments
 (0)