diff --git a/parquet/src/arrow/push_decoder/mod.rs b/parquet/src/arrow/push_decoder/mod.rs index 4c667e534366..f905d6fb2ccb 100644 --- a/parquet/src/arrow/push_decoder/mod.rs +++ b/parquet/src/arrow/push_decoder/mod.rs @@ -30,7 +30,7 @@ use crate::file::metadata::ParquetMetaData; use crate::util::push_buffers::PushBuffers; use arrow_array::RecordBatch; use bytes::Bytes; -use reader_builder::RowGroupReaderBuilder; +use reader_builder::{RowBudget, RowGroupReaderBuilder}; use remaining::RemainingRowGroups; use std::ops::Range; use std::sync::Arc; @@ -181,6 +181,9 @@ impl ParquetPushDecoderBuilder { // If no row groups were specified, read all of them let row_groups = row_groups.unwrap_or_else(|| (0..parquet_metadata.num_row_groups()).collect()); + let has_predicates = filter + .as_ref() + .is_some_and(|filter| !filter.predicates.is_empty()); // Prepare to build RowGroup readers let file_len = 0; // not used in push decoder @@ -191,8 +194,6 @@ impl ParquetPushDecoderBuilder { Arc::clone(&parquet_metadata), fields, filter, - limit, - offset, metrics, max_predicate_cache_size, buffers, @@ -204,6 +205,8 @@ impl ParquetPushDecoderBuilder { parquet_metadata, row_groups, selection, + RowBudget::new(offset, limit), + has_predicates, row_group_reader_builder, ); @@ -1402,6 +1405,28 @@ mod test { expect_finished(decoder.try_decode()); } + #[test] + fn test_decoder_try_next_reader_offset_limit() { + let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .with_offset(225) + .with_limit(20) + .build() + .unwrap(); + + let ranges = expect_needs_data(decoder.try_next_reader()); + push_ranges_to_decoder(&mut decoder, ranges); + + let reader = expect_data(decoder.try_next_reader()); + let batches = reader + .map(|batch| batch.expect("expected decoded batch")) + .collect::>(); + let output = concat_batches(&TEST_BATCH.schema(), &batches).unwrap(); + assert_eq!(output, TEST_BATCH.slice(225, 20)); + + expect_finished(decoder.try_next_reader()); + } + #[test] fn test_decoder_row_group_selection() { // take only the second row group diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs b/parquet/src/arrow/push_decoder/reader_builder/mod.rs index 60e50d29524e..fd9737f7fa9b 100644 --- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs +++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs @@ -18,7 +18,6 @@ mod data; mod filter; -use crate::DecodeResult; use crate::arrow::ProjectionMask; use crate::arrow::array_reader::{ArrayReaderBuilder, CacheOptions, RowGroupCache}; use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics; @@ -42,12 +41,13 @@ use filter::FilterInfo; use std::ops::Range; use std::sync::{Arc, RwLock}; -/// The current row group being read and the read plan +/// The current row group being read, its read plan, and its offset/limit budget. #[derive(Debug)] struct RowGroupInfo { row_group_idx: usize, row_count: usize, plan_builder: ReadPlanBuilder, + budget: RowBudget, } /// This is the inner state machine for reading a single row group. @@ -88,6 +88,95 @@ enum RowGroupDecoderState { Finished, } +/// Running offset/limit budget shared across row groups. +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub(crate) struct RowBudget { + offset: Option, + limit: Option, +} + +impl RowBudget { + pub(crate) fn new(offset: Option, limit: Option) -> Self { + Self { offset, limit } + } + + pub(crate) fn is_exhausted(self) -> bool { + matches!(self.limit, Some(0)) + } + + pub(crate) fn rows_after(self, rows_before_budget: usize) -> usize { + let rows_after_offset = rows_before_budget.saturating_sub(self.offset.unwrap_or(0)); + match self.limit { + Some(limit) => rows_after_offset.min(limit), + None => rows_after_offset, + } + } + + /// Returns the number of selected rows needed before applying the offset. + fn selected_row_limit(self) -> Option { + self.limit + .map(|limit| limit.saturating_add(self.offset.unwrap_or(0))) + } + + fn apply_to_plan(self, plan_builder: ReadPlanBuilder, row_count: usize) -> BudgetedReadPlan { + let rows_before_budget = plan_builder.num_rows_selected().unwrap_or(row_count); + let plan_builder = plan_builder + .limited(row_count) + .with_offset(self.offset) + .with_limit(self.limit) + .build_limited(); + let rows_after_budget = self.rows_after(rows_before_budget); + + BudgetedReadPlan { + plan_builder, + rows_before_budget, + rows_after_budget, + remaining_budget: self.advance(rows_before_budget, rows_after_budget), + } + } + + pub(crate) fn advance(mut self, rows_before_budget: usize, rows_after_budget: usize) -> Self { + if let Some(offset) = &mut self.offset { + // Reduction is either because of offset or limit, as limit is applied + // after offset has been "exhausted" can just use saturating sub here. + *offset = offset.saturating_sub(rows_before_budget - rows_after_budget); + } + + if rows_after_budget != 0 { + if let Some(limit) = &mut self.limit { + *limit -= rows_after_budget; + } + } + + self + } +} + +#[derive(Debug)] +struct BudgetedReadPlan { + plan_builder: ReadPlanBuilder, + rows_before_budget: usize, + rows_after_budget: usize, + remaining_budget: RowBudget, +} + +#[derive(Debug)] +pub(crate) enum RowGroupBuildResult { + /// The active row group is complete without producing a reader. + Finished { + /// Budget remaining after applying this row group's selection. + remaining_budget: RowBudget, + }, + /// More bytes are needed before the active row group can make progress. + NeedsData(Vec>), + /// The active row group produced a reader. + Data { + batch_reader: ParquetRecordBatchReader, + /// Budget remaining after applying this row group's selection. + remaining_budget: RowBudget, + }, +} + /// Result of a state transition #[derive(Debug)] struct NextState { @@ -96,7 +185,7 @@ struct NextState { /// /// * `Some`: the processing should stop and return the result /// * `None`: processing should continue - result: Option>, + result: Option, } impl NextState { @@ -111,10 +200,7 @@ impl NextState { } /// Create a NextState with a result that should be returned - fn result( - next_state: RowGroupDecoderState, - result: DecodeResult, - ) -> Self { + fn result(next_state: RowGroupDecoderState, result: RowGroupBuildResult) -> Self { Self { next_state, result: Some(result), @@ -144,12 +230,6 @@ pub(crate) struct RowGroupReaderBuilder { /// Optional filter filter: Option, - /// Limit to apply to remaining row groups (decremented as rows are read) - limit: Option, - - /// Offset to apply to remaining row groups (decremented as rows are read) - offset: Option, - /// The size in bytes of the predicate cache to use /// /// See [`RowGroupCache`] for details. @@ -180,8 +260,6 @@ impl RowGroupReaderBuilder { metadata: Arc, fields: Option>, filter: Option, - limit: Option, - offset: Option, metrics: ArrowReaderMetrics, max_predicate_cache_size: usize, buffers: PushBuffers, @@ -193,8 +271,6 @@ impl RowGroupReaderBuilder { metadata, fields, filter, - limit, - offset, metrics, max_predicate_cache_size, row_selection_policy, @@ -233,12 +309,18 @@ impl RowGroupReaderBuilder { }) } + /// Returns true if this builder is currently decoding a row group. + pub(crate) fn has_active_row_group(&self) -> bool { + !matches!(self.state, Some(RowGroupDecoderState::Finished)) + } + /// Setup this reader to read the next row group pub(crate) fn next_row_group( &mut self, row_group_idx: usize, row_count: usize, selection: Option, + budget: RowBudget, ) -> Result<(), ParquetError> { let state = self.take_state()?; if !matches!(state, RowGroupDecoderState::Finished) { @@ -254,22 +336,20 @@ impl RowGroupReaderBuilder { row_group_idx, row_count, plan_builder, + budget, }; self.state = Some(RowGroupDecoderState::Start { row_group_info }); Ok(()) } - /// Try to build the next `ParquetRecordBatchReader` from this RowGroupReader. - /// - /// If more data is needed, returns [`DecodeResult::NeedsData`] with the - /// ranges of data that are needed to proceed. + /// Try to build the next `ParquetRecordBatchReader` for the active row group. /// - /// If a [`ParquetRecordBatchReader`] is ready, it is returned in - /// `DecodeResult::Data`. - pub(crate) fn try_build( - &mut self, - ) -> Result, ParquetError> { + /// Returns [`RowGroupBuildResult::NeedsData`] if more data is needed, + /// [`RowGroupBuildResult::Data`] if a reader is ready, or + /// [`RowGroupBuildResult::Finished`] if the row group completed without + /// producing a reader. + pub(crate) fn try_build(&mut self) -> Result { loop { let current_state = self.take_state()?; // Try to transition the decoder. @@ -310,18 +390,10 @@ impl RowGroupReaderBuilder { ) -> Result { let result = match current_state { RowGroupDecoderState::Start { row_group_info } => { - // Short-circuit once the overall output limit is exhausted. - // - // `self.limit` tracks how many more rows the reader is still - // allowed to emit and is decremented as each row group is - // planned in `StartData`, so `Some(0)` means earlier row - // groups have already produced the full requested output. - if matches!(self.limit, Some(0)) { - return Ok(NextState::result( - RowGroupDecoderState::Finished, - DecodeResult::Finished, - )); - } + debug_assert!( + !row_group_info.budget.is_exhausted(), + "RowGroupFrontier should not hand off row groups after the output limit is exhausted" + ); let column_chunks = None; // no prior column chunks @@ -371,6 +443,7 @@ impl RowGroupReaderBuilder { row_group_idx, row_count, plan_builder, + budget, } = row_group_info; // If nothing is selected, we are done with this row group @@ -379,7 +452,9 @@ impl RowGroupReaderBuilder { self.filter = Some(filter_info.into_filter()); return Ok(NextState::result( RowGroupDecoderState::Finished, - DecodeResult::Finished, + RowGroupBuildResult::Finished { + remaining_budget: budget, + }, )); } @@ -405,6 +480,7 @@ impl RowGroupReaderBuilder { row_group_idx, row_count, plan_builder, + budget, }; NextState::again(RowGroupDecoderState::WaitingOnFilterData { @@ -428,7 +504,7 @@ impl RowGroupReaderBuilder { filter_info, data_request, }, - DecodeResult::NeedsData(needed_ranges), + RowGroupBuildResult::NeedsData(needed_ranges), )); } @@ -437,6 +513,7 @@ impl RowGroupReaderBuilder { row_group_idx, row_count, mut plan_builder, + budget, } = row_group_info; let predicate = filter_info.current(); @@ -476,10 +553,10 @@ impl RowGroupReaderBuilder { // When this is the final predicate in the chain and an output // limit is set, tell the filter evaluation to stop once enough // matching rows have been accumulated. - let predicate_limit = self - .limit - .filter(|_| filter_info.is_last()) - .map(|l| l.saturating_add(self.offset.unwrap_or(0))); + let predicate_limit = filter_info + .is_last() + .then(|| budget.selected_row_limit()) + .flatten(); // Evaluate the filter via `with_predicate_options`, opting into // early termination when this is the final predicate and an @@ -495,6 +572,7 @@ impl RowGroupReaderBuilder { row_group_idx, row_count, plan_builder, + budget, }; // Take back the column chunks that were read @@ -531,47 +609,32 @@ impl RowGroupReaderBuilder { row_group_idx, row_count, plan_builder, + budget, } = row_group_info; - // Compute the number of rows in the selection before applying limit and offset - let rows_before = plan_builder.num_rows_selected().unwrap_or(row_count); + let BudgetedReadPlan { + mut plan_builder, + rows_before_budget, + rows_after_budget, + remaining_budget, + } = budget.apply_to_plan(plan_builder, row_count); - if rows_before == 0 { + if rows_before_budget == 0 { // ruled out entire row group return Ok(NextState::result( RowGroupDecoderState::Finished, - DecodeResult::Finished, + RowGroupBuildResult::Finished { remaining_budget }, )); } - // Apply any limit and offset - let mut plan_builder = plan_builder - .limited(row_count) - .with_offset(self.offset) - .with_limit(self.limit) - .build_limited(); - - let rows_after = plan_builder.num_rows_selected().unwrap_or(row_count); - - // Update running offset and limit for after the current row group is read - if let Some(offset) = &mut self.offset { - // Reduction is either because of offset or limit, as limit is applied - // after offset has been "exhausted" can just use saturating sub here - *offset = offset.saturating_sub(rows_before - rows_after) - } - - if rows_after == 0 { + if rows_after_budget == 0 { // no rows left after applying limit/offset return Ok(NextState::result( RowGroupDecoderState::Finished, - DecodeResult::Finished, + RowGroupBuildResult::Finished { remaining_budget }, )); } - if let Some(limit) = &mut self.limit { - *limit -= rows_after; - } - let data_request = DataRequestBuilder::new( row_group_idx, row_count, @@ -597,6 +660,7 @@ impl RowGroupReaderBuilder { row_group_idx, row_count, plan_builder, + budget: remaining_budget, }; NextState::again(RowGroupDecoderState::WaitingOnData { @@ -620,7 +684,7 @@ impl RowGroupReaderBuilder { data_request, cache_info, }, - DecodeResult::NeedsData(needed_ranges), + RowGroupBuildResult::NeedsData(needed_ranges), )); } @@ -629,6 +693,7 @@ impl RowGroupReaderBuilder { row_group_idx, row_count, plan_builder, + budget, } = row_group_info; let row_group = data_request.try_into_in_memory_row_group( @@ -656,11 +721,18 @@ impl RowGroupReaderBuilder { }?; let reader = ParquetRecordBatchReader::new(array_reader, plan); - NextState::result(RowGroupDecoderState::Finished, DecodeResult::Data(reader)) + NextState::result( + RowGroupDecoderState::Finished, + RowGroupBuildResult::Data { + batch_reader: reader, + remaining_budget: budget, + }, + ) } RowGroupDecoderState::Finished => { - // nothing left to read - NextState::result(RowGroupDecoderState::Finished, DecodeResult::Finished) + return Err(ParquetError::General(String::from( + "Internal Error: try_build called without an active row group", + ))); } }; Ok(result) @@ -760,10 +832,55 @@ fn override_selector_strategy_if_needed( #[cfg(test)] mod tests { use super::*; + use crate::arrow::arrow_reader::{RowSelection, RowSelector}; #[test] // Verify that the size of RowGroupDecoderState does not grow too large fn test_structure_size() { - assert_eq!(std::mem::size_of::(), 200); + assert_eq!(std::mem::size_of::(), 232); + } + + #[test] + fn test_row_budget_offset_limit_across_row_groups() { + let first = + RowBudget::new(Some(225), Some(20)).apply_to_plan(ReadPlanBuilder::new(1024), 200); + assert_eq!(first.rows_before_budget, 200); + assert_eq!(first.rows_after_budget, 0); + assert_eq!(first.remaining_budget, RowBudget::new(Some(25), Some(20))); + assert_eq!(first.plan_builder.num_rows_selected(), Some(0)); + + let second = first + .remaining_budget + .apply_to_plan(ReadPlanBuilder::new(1024), 200); + assert_eq!(second.rows_before_budget, 200); + assert_eq!(second.rows_after_budget, 20); + assert_eq!(second.remaining_budget, RowBudget::new(Some(0), Some(0))); + assert_eq!(second.plan_builder.num_rows_selected(), Some(20)); + } + + #[test] + fn test_row_budget_limit_only() { + let budgeted = + RowBudget::new(None, Some(20)).apply_to_plan(ReadPlanBuilder::new(1024), 200); + assert_eq!(budgeted.rows_before_budget, 200); + assert_eq!(budgeted.rows_after_budget, 20); + assert_eq!(budgeted.remaining_budget, RowBudget::new(None, Some(0))); + assert_eq!(budgeted.plan_builder.num_rows_selected(), Some(20)); + } + + #[test] + fn test_row_budget_empty_selection() { + let empty_selection = RowSelection::from(vec![RowSelector::skip(200)]); + let budgeted = RowBudget::new(Some(10), Some(20)).apply_to_plan( + ReadPlanBuilder::new(1024).with_selection(Some(empty_selection)), + 200, + ); + assert_eq!(budgeted.rows_before_budget, 0); + assert_eq!(budgeted.rows_after_budget, 0); + assert_eq!( + budgeted.remaining_budget, + RowBudget::new(Some(10), Some(20)) + ); + assert_eq!(budgeted.plan_builder.num_rows_selected(), Some(0)); } } diff --git a/parquet/src/arrow/push_decoder/remaining.rs b/parquet/src/arrow/push_decoder/remaining.rs index 2986ca0da8d8..5a3293bf3c45 100644 --- a/parquet/src/arrow/push_decoder/remaining.rs +++ b/parquet/src/arrow/push_decoder/remaining.rs @@ -17,7 +17,9 @@ use crate::DecodeResult; use crate::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection}; -use crate::arrow::push_decoder::reader_builder::RowGroupReaderBuilder; +use crate::arrow::push_decoder::reader_builder::{ + RowBudget, RowGroupBuildResult, RowGroupReaderBuilder, +}; use crate::errors::ParquetError; use crate::file::metadata::ParquetMetaData; use bytes::Bytes; @@ -25,21 +27,163 @@ use std::collections::VecDeque; use std::ops::Range; use std::sync::Arc; -/// State machine that tracks the remaining high level chunks (row groups) of -/// Parquet data are left to read. -/// -/// This is currently a row group, but the author aspires to extend the pattern -/// to data boundaries other than RowGroups in the future. +/// Plan for the next queued row group after row-selection slicing. #[derive(Debug)] -pub(crate) struct RemainingRowGroups { - /// The underlying Parquet metadata - parquet_metadata: Arc, +enum QueuedRowGroupDecision { + /// Hand this row group to the builder. + Read(NextRowGroup), + /// Skip this row group, and keep scanning with the updated budget. + Skip { remaining_budget: RowBudget }, +} - /// The row groups that have not yet been read - row_groups: VecDeque, +/// Work item handed from [`RowGroupFrontier`] to [`RowGroupReaderBuilder`]. +#[derive(Debug)] +struct NextRowGroup { + row_group_idx: usize, + row_count: usize, + selection: Option, + budget: RowBudget, +} - /// Remaining selection to apply to the next row groups +#[derive(Debug)] +struct RowGroupFrontier { + /// Metadata used to resolve row counts for queued row groups. + parquet_metadata: Arc, + /// Row group indices not yet handed to the builder. + row_groups: VecDeque, + /// Cross-row-group cursor for the optional global row selection. selection: Option, + /// Offset/limit budget before the next readable row group is planned. + budget: RowBudget, + /// If predicates are present, row groups with selected rows must be read so + /// the predicate can decide whether they are actually needed. + has_predicates: bool, +} + +impl RowGroupFrontier { + fn new( + parquet_metadata: Arc, + row_groups: Vec, + selection: Option, + budget: RowBudget, + has_predicates: bool, + ) -> Self { + Self { + parquet_metadata, + row_groups: VecDeque::from(row_groups), + selection, + budget, + has_predicates, + } + } + + fn row_group_num_rows(&self, row_group_idx: usize) -> Result { + self.parquet_metadata + .row_group(row_group_idx) + .num_rows() + .try_into() + .map_err(|e| ParquetError::General(format!("Row count overflow: {e}"))) + } + + fn update_budget_after_row_group(&mut self, budget: RowBudget) { + self.budget = budget; + } + + fn clear_remaining(&mut self) { + self.selection = None; + self.row_groups.clear(); + } + + /// Plan whether a selected row group should be read or skipped. + /// + /// Selection-only skips are handled before this method is called. This + /// method applies the remaining offset/limit budget and predicate + /// conservatism. + fn plan_selected_row_group( + &self, + next_row_group: NextRowGroup, + selected_rows: usize, + ) -> QueuedRowGroupDecision { + if self.has_predicates { + return QueuedRowGroupDecision::Read(next_row_group); + } + + let rows_after_budget = self.budget.rows_after(selected_rows); + if rows_after_budget != 0 { + return QueuedRowGroupDecision::Read(next_row_group); + } + + QueuedRowGroupDecision::Skip { + remaining_budget: self.budget.advance(selected_rows, rows_after_budget), + } + } + + /// Advance queued row groups until one should be handed to the builder. + fn next_readable_row_group(&mut self) -> Result, ParquetError> { + loop { + let Some(&row_group_idx) = self.row_groups.front() else { + return Ok(None); + }; + if self.budget.is_exhausted() + || self + .selection + .as_ref() + .is_some_and(|selection| selection.row_count() == 0) + { + self.clear_remaining(); + return Ok(None); + } + + let row_count = self.row_group_num_rows(row_group_idx)?; + let (selection, selected_rows) = match self.selection.as_mut() { + Some(selection) => { + let selection = selection.split_off(row_count); + let selected_rows = selection.row_count(); + if selected_rows == 0 { + self.row_groups.pop_front(); + continue; + } + + let selection = if selected_rows == row_count { + None + } else { + Some(selection) + }; + (selection, selected_rows) + } + None => (None, row_count), + }; + + let next_row_group = NextRowGroup { + row_group_idx, + row_count, + selection, + budget: self.budget, + }; + + match self.plan_selected_row_group(next_row_group, selected_rows) { + QueuedRowGroupDecision::Read(next_row_group) => { + self.row_groups.pop_front(); + return Ok(Some(next_row_group)); + } + QueuedRowGroupDecision::Skip { remaining_budget } => { + self.row_groups.pop_front(); + self.budget = remaining_budget; + } + } + } + } +} + +/// State machine that tracks the remaining high level chunks (row groups) of +/// Parquet data left to read. +/// +/// [`RowGroupFrontier`] owns cross-row-group scan state and selects the next +/// work item. [`RowGroupReaderBuilder`] owns decoding for the active row group. +#[derive(Debug)] +pub(crate) struct RemainingRowGroups { + /// Cross-row-group scan state for queued work. + frontier: RowGroupFrontier, /// State for building the reader for the current row group row_group_reader_builder: RowGroupReaderBuilder, @@ -50,12 +194,18 @@ impl RemainingRowGroups { parquet_metadata: Arc, row_groups: Vec, selection: Option, + budget: RowBudget, + has_predicates: bool, row_group_reader_builder: RowGroupReaderBuilder, ) -> Self { Self { - parquet_metadata, - row_groups: VecDeque::from(row_groups), - selection, + frontier: RowGroupFrontier::new( + parquet_metadata, + row_groups, + selection, + budget, + has_predicates, + ), row_group_reader_builder, } } @@ -82,42 +232,48 @@ impl RemainingRowGroups { &mut self, ) -> Result, ParquetError> { loop { - // Are we ready yet to start reading? - let result: DecodeResult = - self.row_group_reader_builder.try_build()?; - match result { - DecodeResult::Finished => { + if !self.row_group_reader_builder.has_active_row_group() { + // We are done with the previous row group, seek to the next one + // from the frontier, if any. + + match self.frontier.next_readable_row_group()? { + Some(NextRowGroup { + row_group_idx, + row_count, + selection, + budget, + }) => { + self.row_group_reader_builder.next_row_group( + row_group_idx, + row_count, + selection, + budget, + )?; + } + None => return Ok(DecodeResult::Finished), + } + } + + match self.row_group_reader_builder.try_build()? { + RowGroupBuildResult::Finished { remaining_budget } => { + self.frontier + .update_budget_after_row_group(remaining_budget); // reader is done, proceed to the next row group - // fall through to the next row group - // This happens if the row group was completely filtered out } - DecodeResult::NeedsData(ranges) => { + RowGroupBuildResult::NeedsData(ranges) => { // need more data to proceed return Ok(DecodeResult::NeedsData(ranges)); } - DecodeResult::Data(batch_reader) => { + RowGroupBuildResult::Data { + batch_reader, + remaining_budget, + } => { + self.frontier + .update_budget_after_row_group(remaining_budget); // ready to read the row group return Ok(DecodeResult::Data(batch_reader)); } } - - // No current reader, proceed to the next row group if any - let row_group_idx = match self.row_groups.pop_front() { - None => return Ok(DecodeResult::Finished), - Some(idx) => idx, - }; - - let row_count: usize = self - .parquet_metadata - .row_group(row_group_idx) - .num_rows() - .try_into() - .map_err(|e| ParquetError::General(format!("Row count overflow: {e}")))?; - - let selection = self.selection.as_mut().map(|s| s.split_off(row_count)); - self.row_group_reader_builder - .next_row_group(row_group_idx, row_count, selection)?; - // the next iteration will try to build the reader for the new row group } } }