feat(parquet): separate push decoder frontier state from row-group decoding#9804
feat(parquet): separate push decoder frontier state from row-group decoding#9804HippoBaro wants to merge 2 commits intoapache:mainfrom
Conversation
600500d to
307e4d8
Compare
alamb
left a comment
There was a problem hiding this comment.
Thank you @HippoBaro -- I very much like the idea of encapsulating the budget (offset/limit) and the state about what row groups remain into a new structure and I think in general this is headed in the right direction. I had a few suggestions about structure and comments but I think this is pretty close in general
| let row_count = self.row_group_num_rows(row_group_idx)?; | ||
| let selection = self.selection.take_for_row_group(row_count); | ||
|
|
||
| match self.classify_queued_row_group(row_count, selection.as_ref()) { |
There was a problem hiding this comment.
i wondr if we can encapsulate the logic to "get the next chunk of work into a single function rather than split across three separate functions
For example what if QueuedRowGroupDecision::NeedThis returned the row group index directly -- that way you wouldn't have to check being empty above and down here again.
Also, maybe QueuedRowGroupDecision::NeedThis could simply have a field of NextRowGroup 🤔
There was a problem hiding this comment.
Agreed. I reworked this so next_readable_row_group is now the single loop that advances queued row groups until it either finds work or determines there is no more work.
The code is now much nicer 🙇
| /// Remaining selection to apply to the next row groups | ||
| selection: Option<RowSelection>, | ||
| /// Cross-row-group scan state for queued work. | ||
| frontier: RowGroupFrontier, |
There was a problem hiding this comment.
Can you remind me why the datea bout th enext row groups is being encapsulated in RowGroupFontier? Do you have more plans for it in future PRs?
There was a problem hiding this comment.
For clearer separation of concerns. The builder now only owns the currently active row group and its decode state, while RowGroupFrontier owns the look-ahead state needed to decide which row group can be handed to the builder next.
This is also the piece that I care about in the context of the PushBuffers work, because it centralizes all the row-group pruning logic, which makes releasing prefetched buffer easy.
| match result { | ||
| DecodeResult::Finished => { | ||
| match self.row_group_reader_builder.try_build()? { | ||
| RowGroupBuildResult::Idle => { |
There was a problem hiding this comment.
WHy the term "Idle" ? Maybe RowGroupBuildResult::Skipped would better match the semantics
There was a problem hiding this comment.
This is an intermediary step between Finished and the processing of the next row group. Idle is appropriate is that narrow sense. And Skipped would not be, because we are not skipping anything here.
Your remark makes me think this has to go anyways because it's confusing.: I'll rework the code to just move from Finished to the next state directly instead!
| } | ||
| DecodeResult::NeedsData(ranges) => { | ||
| RowGroupBuildResult::Finished { remaining_budget } => { | ||
| self.frontier.advance_budget(remaining_budget); |
There was a problem hiding this comment.
I feel like something is not quite right here as both the reader_builder and frontier both have a budget. It seems like this could result in a discrepancy eventually when they don't match (also this is perhaps why there can be an internal error)
There was a problem hiding this comment.
Thanks! The frontier owns the cross-row-group budget until it hands a NextRowGroup to the builder and the builder owns that budget only while decoding the active row group, then returns remaining_budget.
No more possibility of corrupted states
|
run benchmark arrow_reader |
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing frontier_row_group_selection (307e4d8) to b93240a (merge-base) diff File an issue against this benchmark runner |
|
🤖 Arrow criterion benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagebase (merge-base)
branch
File an issue against this benchmark runner |
307e4d8 to
72d5c60
Compare
Extract the push decoder offset/limit accounting into `RowBudget` and use it when planning row-group reads. This centralizes the row-count arithmetic needed to apply offset and limit without changing decoder behavior. It also adds focused tests for plain limit, offset+limit, and empty-selection cases so later frontier work can reuse the same accounting safely. Signed-off-by: Hippolyte Barraud <hippolyte.barraud@datadoghq.com>
Move the cross-row-group scan state into a dedicated `RowGroupFrontier`. The frontier now owns the queued row groups, the tail `RowSelection`, the running `RowBudget`, and the conservative "has predicates" flag. Reduce `RowGroupReaderBuilder` to current-row-group work only by threading a budget snapshot into `next_row_group` and returning a typed `RowGroupBuildResult`. This also folds in the selection-frontier cleanup so queued selection state is consumed in one place instead of through ad hoc split/clone logic. Signed-off-by: Hippolyte Barraud <hippolyte.barraud@datadoghq.com>
72d5c60 to
52cb122
Compare
|
@alamb Thank you for the thorough review. I've updated the code as suggested. State is no longer duplicated, the state machines are simpler, and I also removed I also rebased to play nicely with changes made in #9766, which impacted the new |
Which issue does this PR close?
PushBuffersboundary-agnostic for prefetch IO #9697Rationale for this change
#9697 aims to make staged buffer management in the push decoder more explicit. In doing so, it exposes a structural problem: the logic for deciding whether a row group is still live, skipped, or unreachable is spread across several parts of the decoder.
This matters because row-group-level buffer release depends on a single question having a clear answer: can this row group ever need bytes again? That answer depends on the queued row groups, the remaining selection, the running offset/limit budget, and whether predicates require the decoder to stay conservative. Today, that state is split across multiple components, which makes the release policy difficult to centralize cleanly.
What changes are included in this PR?
This PR introduces a clearer ownership boundary in the push decoder:
This does not implement row-group-level buffer release directly, but it establishes the structure needed for that follow-up work. It should also make future pruning rules easier to add and maintain.
Are these changes tested?
All existing tests pass, and the refactor adds focused coverage for the extracted budget logic and the frontier-driven
try_next_readerpath.Are there any user-facing changes?
None.