Skip to content

Commit 5b0a69a

Browse files
adriangbclaude
andcommitted
feat: split Parquet files into row-group-sized morsels
Each Parquet file previously produced a single morsel containing one `ParquetPushDecoder` over the full pruned `ParquetAccessPlan`. Morselize at row-group granularity instead: after all pruning work is done, pack surviving row groups into chunks bounded by a per-morsel row budget and compressed-byte budget (defaults: 100k rows, 64 MiB). Each chunk becomes its own stream so the executor can interleave row-group decode work with other operators and — in a follow-up — let sibling `FileStream`s steal row-group-sized units of work across partitions. A single oversized row group still becomes its own morsel; no sub-row-group splitting is introduced. `EarlyStoppingStream` (which is driven by the non-Clone `FilePruner`) is attached only to the first morsel's stream so the whole file can still short-circuit on dynamic-filter narrowing. Row-group reversal is applied per-chunk on the `PreparedAccessPlan` and the chunk list is reversed so reverse output order is preserved. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent af67cdd commit 5b0a69a

3 files changed

Lines changed: 718 additions & 137 deletions

File tree

datafusion/datasource-parquet/src/access_plan.rs

Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,77 @@ impl ParquetAccessPlan {
349349

350350
PreparedAccessPlan::new(row_group_indexes, row_selection)
351351
}
352+
353+
/// Split this plan into an ordered list of sub-plans ("chunks"), each of
354+
/// which represents a contiguous prefix of work packed together.
355+
///
356+
/// Each returned plan has the same `len()` as `self`. Row groups outside
357+
/// the chunk are set to [`RowGroupAccess::Skip`]; row groups inside the
358+
/// chunk keep their original [`RowGroupAccess`].
359+
///
360+
/// Chunks are formed by walking `self.row_groups` in order and grouping
361+
/// consecutive entries with `should_scan() == true`. A new chunk is started
362+
/// whenever adding the next scannable row group would push the accumulated
363+
/// row count past `max_rows` or compressed byte size past `max_bytes`. A
364+
/// single row group that already exceeds either limit becomes its own
365+
/// chunk (no sub-row-group split is performed).
366+
///
367+
/// [`RowGroupAccess::Skip`] entries are carried silently in whichever chunk
368+
/// is active at that point; they contribute no rows or bytes.
369+
///
370+
/// If there are no scannable row groups, the result is empty.
371+
pub(crate) fn split_into_chunks(
372+
self,
373+
row_group_meta_data: &[RowGroupMetaData],
374+
max_rows: u64,
375+
max_bytes: u64,
376+
) -> Vec<ParquetAccessPlan> {
377+
assert_eq!(self.row_groups.len(), row_group_meta_data.len());
378+
379+
let len = self.row_groups.len();
380+
let mut chunks: Vec<ParquetAccessPlan> = Vec::new();
381+
let mut current: Option<(ParquetAccessPlan, u64, u64)> = None;
382+
383+
for (idx, access) in self.row_groups.into_iter().enumerate() {
384+
if !access.should_scan() {
385+
// Skip entries are attached to the currently open chunk (if
386+
// any) so they do not force a chunk boundary. They contribute
387+
// zero rows/bytes.
388+
if let Some((plan, _, _)) = current.as_mut() {
389+
plan.row_groups[idx] = access;
390+
}
391+
continue;
392+
}
393+
394+
let rg_meta = &row_group_meta_data[idx];
395+
let rg_rows = rg_meta.num_rows().max(0) as u64;
396+
let rg_bytes = rg_meta.compressed_size().max(0) as u64;
397+
398+
if let Some((plan, acc_rows, acc_bytes)) = current.as_mut() {
399+
let exceeds = acc_rows.saturating_add(rg_rows) > max_rows
400+
|| acc_bytes.saturating_add(rg_bytes) > max_bytes;
401+
if exceeds {
402+
chunks.push(current.take().unwrap().0);
403+
} else {
404+
plan.row_groups[idx] = access;
405+
*acc_rows += rg_rows;
406+
*acc_bytes += rg_bytes;
407+
continue;
408+
}
409+
}
410+
411+
// Start a new chunk with this row group.
412+
let mut plan = ParquetAccessPlan::new_none(len);
413+
plan.row_groups[idx] = access;
414+
current = Some((plan, rg_rows, rg_bytes));
415+
}
416+
417+
if let Some((plan, _, _)) = current {
418+
chunks.push(plan);
419+
}
420+
421+
chunks
422+
}
352423
}
353424

354425
/// Represents a prepared, fully resolved [`ParquetAccessPlan`]
@@ -600,6 +671,180 @@ mod test {
600671
.collect()
601672
});
602673

674+
/// Build metadata for row groups with the given `(num_rows, compressed_bytes)`
675+
/// pairs. Returned metadata has one `BYTE_ARRAY` column per row group.
676+
fn row_groups_with_bytes(specs: &[(i64, i64)]) -> Vec<RowGroupMetaData> {
677+
let schema_descr = get_test_schema_descr();
678+
specs
679+
.iter()
680+
.map(|(num_rows, compressed)| {
681+
let column = ColumnChunkMetaData::builder(schema_descr.column(0))
682+
.set_num_values(*num_rows)
683+
.set_total_compressed_size(*compressed)
684+
.build()
685+
.unwrap();
686+
687+
RowGroupMetaData::builder(schema_descr.clone())
688+
.set_num_rows(*num_rows)
689+
.set_column_metadata(vec![column])
690+
.build()
691+
.unwrap()
692+
})
693+
.collect()
694+
}
695+
696+
fn access_kinds(plan: &ParquetAccessPlan) -> Vec<&'static str> {
697+
plan.inner()
698+
.iter()
699+
.map(|rg| match rg {
700+
RowGroupAccess::Skip => "skip",
701+
RowGroupAccess::Scan => "scan",
702+
RowGroupAccess::Selection(_) => "sel",
703+
})
704+
.collect()
705+
}
706+
707+
#[test]
708+
fn test_split_into_chunks_empty() {
709+
let plan = ParquetAccessPlan::new(vec![]);
710+
let chunks = plan.split_into_chunks(&[], 1000, 1000);
711+
assert!(chunks.is_empty());
712+
}
713+
714+
#[test]
715+
fn test_split_into_chunks_all_skip() {
716+
let meta = row_groups_with_bytes(&[(100, 1_000), (100, 1_000)]);
717+
let plan = ParquetAccessPlan::new_none(2);
718+
let chunks = plan.split_into_chunks(&meta, 1000, 10_000);
719+
assert!(chunks.is_empty());
720+
}
721+
722+
#[test]
723+
fn test_split_into_chunks_one_per_row_group() {
724+
// Each row group is already at the per-morsel limit, so each becomes
725+
// its own chunk.
726+
let meta = row_groups_with_bytes(&[(100, 1_000), (100, 1_000), (100, 1_000)]);
727+
let plan = ParquetAccessPlan::new_all(3);
728+
let chunks = plan.split_into_chunks(&meta, 100, 1_000);
729+
assert_eq!(chunks.len(), 3);
730+
assert_eq!(access_kinds(&chunks[0]), vec!["scan", "skip", "skip"]);
731+
assert_eq!(access_kinds(&chunks[1]), vec!["skip", "scan", "skip"]);
732+
assert_eq!(access_kinds(&chunks[2]), vec!["skip", "skip", "scan"]);
733+
}
734+
735+
#[test]
736+
fn test_split_into_chunks_packs_small() {
737+
// Three small row groups fit within one chunk by rows AND bytes.
738+
let meta = row_groups_with_bytes(&[(30, 100), (30, 100), (30, 100)]);
739+
let plan = ParquetAccessPlan::new_all(3);
740+
let chunks = plan.split_into_chunks(&meta, 100, 1_000);
741+
assert_eq!(chunks.len(), 1);
742+
assert_eq!(access_kinds(&chunks[0]), vec!["scan", "scan", "scan"]);
743+
}
744+
745+
#[test]
746+
fn test_split_into_chunks_oversized_single() {
747+
// First row group alone exceeds max_rows; still becomes its own chunk
748+
// (no sub-row-group split).
749+
let meta = row_groups_with_bytes(&[(1_000, 100), (10, 100), (10, 100)]);
750+
let plan = ParquetAccessPlan::new_all(3);
751+
let chunks = plan.split_into_chunks(&meta, 100, 10_000);
752+
assert_eq!(chunks.len(), 2);
753+
assert_eq!(access_kinds(&chunks[0]), vec!["scan", "skip", "skip"]);
754+
assert_eq!(access_kinds(&chunks[1]), vec!["skip", "scan", "scan"]);
755+
}
756+
757+
#[test]
758+
fn test_split_into_chunks_respects_bytes() {
759+
// All row groups are small in rows but the second one is big enough
760+
// that it must start a new chunk on byte budget alone.
761+
let meta = row_groups_with_bytes(&[(10, 500), (10, 600), (10, 100), (10, 100)]);
762+
let plan = ParquetAccessPlan::new_all(4);
763+
let chunks = plan.split_into_chunks(&meta, 1_000_000, 1_000);
764+
assert_eq!(chunks.len(), 2);
765+
assert_eq!(
766+
access_kinds(&chunks[0]),
767+
vec!["scan", "skip", "skip", "skip"]
768+
);
769+
assert_eq!(
770+
access_kinds(&chunks[1]),
771+
vec!["skip", "scan", "scan", "scan"]
772+
);
773+
}
774+
775+
#[test]
776+
fn test_split_into_chunks_with_skip_preserved() {
777+
// Skip entries are carried by whichever chunk is currently being
778+
// grown and never contribute to the row/byte budget, so here all
779+
// three scan row groups fit together despite the wide skip in the
780+
// middle.
781+
let meta =
782+
row_groups_with_bytes(&[(30, 100), (1_000, 500), (30, 100), (30, 100)]);
783+
let plan = ParquetAccessPlan::new(vec![
784+
RowGroupAccess::Scan,
785+
RowGroupAccess::Skip,
786+
RowGroupAccess::Scan,
787+
RowGroupAccess::Scan,
788+
]);
789+
let chunks = plan.split_into_chunks(&meta, 100, 1_000);
790+
assert_eq!(chunks.len(), 1);
791+
assert_eq!(
792+
access_kinds(&chunks[0]),
793+
vec!["scan", "skip", "scan", "scan"]
794+
);
795+
}
796+
797+
#[test]
798+
fn test_split_into_chunks_skip_between_chunks() {
799+
// When a chunk closes on budget, a following Skip is picked up by the
800+
// next chunk rather than creating an empty one.
801+
let meta = row_groups_with_bytes(&[(50, 100), (50, 100), (50, 100), (50, 100)]);
802+
let plan = ParquetAccessPlan::new(vec![
803+
RowGroupAccess::Scan,
804+
RowGroupAccess::Scan,
805+
RowGroupAccess::Skip,
806+
RowGroupAccess::Scan,
807+
]);
808+
let chunks = plan.split_into_chunks(&meta, 100, 10_000);
809+
assert_eq!(chunks.len(), 2);
810+
assert_eq!(
811+
access_kinds(&chunks[0]),
812+
vec!["scan", "scan", "skip", "skip"]
813+
);
814+
// rg2's Skip still lives in chunk 0 because chunk 0 was still open
815+
// when we hit rg2; chunk 1 only covers rg3.
816+
assert_eq!(
817+
access_kinds(&chunks[1]),
818+
vec!["skip", "skip", "skip", "scan"]
819+
);
820+
}
821+
822+
#[test]
823+
fn test_split_into_chunks_preserves_selection() {
824+
let meta = row_groups_with_bytes(&[(10, 100), (20, 100), (30, 100)]);
825+
let selection: RowSelection =
826+
vec![RowSelector::select(5), RowSelector::skip(15)].into();
827+
let plan = ParquetAccessPlan::new(vec![
828+
RowGroupAccess::Scan,
829+
RowGroupAccess::Selection(selection),
830+
RowGroupAccess::Scan,
831+
]);
832+
// Budget forces each row group into its own chunk.
833+
let chunks = plan.split_into_chunks(&meta, 15, 10_000);
834+
assert_eq!(chunks.len(), 3);
835+
assert_eq!(access_kinds(&chunks[0]), vec!["scan", "skip", "skip"]);
836+
assert_eq!(access_kinds(&chunks[1]), vec!["skip", "sel", "skip"]);
837+
assert_eq!(access_kinds(&chunks[2]), vec!["skip", "skip", "scan"]);
838+
// The Selection must be preserved verbatim in its chunk.
839+
let RowGroupAccess::Selection(sel) = &chunks[1].inner()[1] else {
840+
panic!("expected Selection preserved in chunk");
841+
};
842+
let selectors: Vec<_> = sel.clone().into();
843+
assert_eq!(selectors.len(), 2);
844+
assert_eq!((selectors[0].skip, selectors[0].row_count), (false, 5));
845+
assert_eq!((selectors[1].skip, selectors[1].row_count), (true, 15));
846+
}
847+
603848
/// Single column schema with a single column named "a" of type `BYTE_ARRAY`/`String`
604849
fn get_test_schema_descr() -> SchemaDescPtr {
605850
use parquet::basic::Type as PhysicalType;

0 commit comments

Comments
 (0)