Skip to content

Commit 2fe2a69

Browse files
authored
Use layout file splits when DF re-partitions individual files (#7591)
## Summary Instead of just splitting files arbitrarily, align it with split layouts to make better use of Vortex's internal pruning and other behaviors. --------- Signed-off-by: Adam Gutglick <adam@spiraldb.com>
1 parent 029fb66 commit 2fe2a69

3 files changed

Lines changed: 321 additions & 58 deletions

File tree

vortex-datafusion/src/persistent/opener.rs

Lines changed: 139 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ use datafusion_common::DataFusionError;
1010
use datafusion_common::Result as DFResult;
1111
use datafusion_common::ScalarValue;
1212
use datafusion_common::exec_datafusion_err;
13-
use datafusion_datasource::FileRange;
1413
use datafusion_datasource::PartitionedFile;
1514
use datafusion_datasource::TableSchema;
1615
use datafusion_datasource::file_stream::FileOpenFuture;
@@ -30,16 +29,19 @@ use futures::FutureExt;
3029
use futures::StreamExt;
3130
use futures::TryStreamExt;
3231
use futures::stream;
32+
use itertools::Itertools;
3333
use object_store::path::Path;
3434
use tracing::Instrument;
35-
use vortex::array::ArrayRef;
3635
use vortex::array::VortexSessionExecute;
3736
use vortex::array::arrow::ArrowArrayExecutor;
37+
use vortex::dtype::FieldMask;
3838
use vortex::error::VortexError;
39+
use vortex::error::VortexExpect;
3940
use vortex::file::OpenOptionsSessionExt;
4041
use vortex::io::InstrumentedReadAt;
4142
use vortex::layout::LayoutReader;
4243
use vortex::layout::scan::scan_builder::ScanBuilder;
44+
use vortex::layout::scan::split_by::SplitBy;
4345
use vortex::metrics::Label;
4446
use vortex::metrics::MetricsRegistry;
4547
use vortex::session::VortexSession;
@@ -88,6 +90,8 @@ pub(crate) struct VortexOpener {
8890
/// To save on the overhead of reparsing FlatBuffers and rebuilding the layout tree, we cache
8991
/// a file reader the first time we read a file.
9092
pub layout_readers: Arc<DashMap<Path, Weak<dyn LayoutReader>>>,
93+
/// Shared full-file natural split ranges keyed by file path.
94+
pub natural_split_ranges: Arc<DashMap<Path, Arc<[Range<u64>]>>>,
9195
/// Whether the query has output ordering specified
9296
pub has_output_ordering: bool,
9397

@@ -123,6 +127,7 @@ impl FileOpener for VortexOpener {
123127
let batch_size = self.batch_size;
124128
let limit = self.limit;
125129
let layout_reader = Arc::clone(&self.layout_readers);
130+
let natural_split_ranges = Arc::clone(&self.natural_split_ranges);
126131
let has_output_ordering = self.has_output_ordering;
127132
let scan_concurrency = self.scan_concurrency;
128133

@@ -302,6 +307,12 @@ impl FileOpener for VortexOpener {
302307
}
303308
};
304309

310+
let natural_split_ranges = natural_split_ranges_for_file(
311+
natural_split_ranges.as_ref(),
312+
&file.object_meta.location,
313+
&layout_reader,
314+
)?;
315+
305316
let mut scan_builder = ScanBuilder::new(session.clone(), layout_reader);
306317

307318
if let Some(extensions) = file.extensions
@@ -311,12 +322,22 @@ impl FileOpener for VortexOpener {
311322
}
312323

313324
if let Some(file_range) = file.range {
314-
scan_builder = apply_byte_range(
315-
file_range,
325+
let byte_range = Range {
326+
start: u64::try_from(file_range.start)
327+
.map_err(|_| exec_datafusion_err!("Vortex file range start is negative"))?,
328+
end: u64::try_from(file_range.end)
329+
.map_err(|_| exec_datafusion_err!("Vortex file range end is negative"))?,
330+
};
331+
332+
let Some(row_range) = split_aligned_row_range(
333+
byte_range,
316334
file.object_meta.size,
317-
vxf.row_count(),
318-
scan_builder,
319-
);
335+
natural_split_ranges.as_ref(),
336+
) else {
337+
return Ok(stream::empty().boxed());
338+
};
339+
340+
scan_builder = scan_builder.with_row_range(row_range);
320341
}
321342

322343
let filter = filter
@@ -421,33 +442,74 @@ impl FileOpener for VortexOpener {
421442
}
422443
}
423444

424-
/// If the file has a [`FileRange`], we translate it into a row range in the file for the scan.
425-
fn apply_byte_range(
426-
file_range: FileRange,
427-
total_size: u64,
428-
row_count: u64,
429-
scan_builder: ScanBuilder<ArrayRef>,
430-
) -> ScanBuilder<ArrayRef> {
431-
let row_range = byte_range_to_row_range(
432-
file_range.start as u64..file_range.end as u64,
433-
row_count,
434-
total_size,
435-
);
436-
437-
scan_builder.with_row_range(row_range)
445+
fn natural_split_ranges_for_file(
446+
natural_split_ranges: &DashMap<Path, Arc<[Range<u64>]>>,
447+
path: &Path,
448+
layout_reader: &Arc<dyn LayoutReader>,
449+
) -> DFResult<Arc<[Range<u64>]>> {
450+
if let Some(split_ranges) = natural_split_ranges.get(path) {
451+
return Ok(Arc::clone(split_ranges.value()));
452+
}
453+
454+
let split_ranges = compute_natural_split_ranges(layout_reader.as_ref())?;
455+
456+
match natural_split_ranges.entry(path.clone()) {
457+
Entry::Occupied(entry) => Ok(Arc::clone(entry.get())),
458+
Entry::Vacant(entry) => {
459+
entry.insert(Arc::clone(&split_ranges));
460+
Ok(split_ranges)
461+
}
462+
}
438463
}
439464

440-
fn byte_range_to_row_range(byte_range: Range<u64>, row_count: u64, total_size: u64) -> Range<u64> {
441-
debug_assert!(row_count > 0); // Asserted by an early exit check in VortexOpener::open
465+
fn compute_natural_split_ranges(layout_reader: &dyn LayoutReader) -> DFResult<Arc<[Range<u64>]>> {
466+
let row_count = layout_reader.row_count();
467+
let row_range = 0..row_count;
468+
let split_points: Vec<_> = SplitBy::Layout
469+
.splits(layout_reader, &row_range, &[FieldMask::All])
470+
.map_err(|e| exec_datafusion_err!("Failed to compute Vortex natural splits: {e}"))?
471+
.into_iter()
472+
.tuple_windows()
473+
.map(|(s, e)| s..e)
474+
.collect::<Vec<_>>();
475+
476+
Ok(split_points.into())
477+
}
442478

443-
let average_row = total_size / row_count;
444-
assert!(average_row > 0, "A row must always have at least one byte");
479+
/// Translate a DataFusion byte range to the contiguous natural split ranges it owns.
480+
fn split_aligned_row_range(
481+
byte_range: Range<u64>,
482+
total_size: u64,
483+
split_ranges: &[Range<u64>],
484+
) -> Option<Range<u64>> {
485+
if byte_range.start >= byte_range.end {
486+
return None;
487+
}
445488

446-
let start_row = byte_range.start / average_row;
447-
let end_row = byte_range.end / average_row;
489+
let row_count = split_ranges.last().map(|split| split.end)?;
490+
if row_count == 0 {
491+
return None;
492+
}
493+
494+
let mut owned_splits = split_ranges.iter().filter(|split_range| {
495+
let midpoint_byte = split_midpoint_to_byte(split_range, row_count, total_size);
496+
byte_range.contains(&midpoint_byte)
497+
});
448498

449-
// We take the min here as `end_row` might overshoot
450-
start_row..u64::min(row_count, end_row)
499+
let first_split = owned_splits.next()?;
500+
let mut row_range = first_split.start..first_split.end;
501+
for split_range in owned_splits {
502+
row_range.end = split_range.end;
503+
}
504+
505+
Some(row_range)
506+
}
507+
508+
fn split_midpoint_to_byte(split_range: &Range<u64>, row_count: u64, total_size: u64) -> u64 {
509+
let midpoint_row = split_range.start + (split_range.end - split_range.start) / 2;
510+
let midpoint_byte = (u128::from(midpoint_row) * u128::from(total_size)) / u128::from(row_count);
511+
512+
u64::try_from(midpoint_byte).vortex_expect("midpoint byte projection should fit into u64")
451513
}
452514

453515
#[cfg(test)]
@@ -500,43 +562,56 @@ mod tests {
500562
static SESSION: LazyLock<VortexSession> = LazyLock::new(VortexSession::default);
501563

502564
#[rstest]
503-
#[case(0..100, 100, 100, 0..100)]
504-
#[case(0..105, 100, 105, 0..100)]
505-
#[case(0..50, 100, 105, 0..50)]
506-
#[case(50..105, 100, 105, 50..100)]
507-
#[case(0..1, 4, 8, 0..0)]
508-
#[case(1..8, 4, 8, 0..4)]
509-
fn test_range_translation(
565+
#[case(0..3, 10, vec![0..2, 2..5, 5..10], Some(0..2))]
566+
#[case(3..7, 10, vec![0..2, 2..5, 5..10], Some(2..5))]
567+
#[case(1..8, 10, vec![0..1, 1..9, 9..10], Some(1..9))]
568+
#[case(1..4, 16, vec![0..1, 1..2, 2..3, 3..4], None)]
569+
fn test_split_aligned_row_range(
510570
#[case] byte_range: Range<u64>,
511-
#[case] row_count: u64,
512571
#[case] total_size: u64,
513-
#[case] expected: Range<u64>,
572+
#[case] split_ranges: Vec<Range<u64>>,
573+
#[case] expected: Option<Range<u64>>,
514574
) {
515575
assert_eq!(
516-
byte_range_to_row_range(byte_range, row_count, total_size),
576+
split_aligned_row_range(byte_range, total_size, &split_ranges),
517577
expected
518578
);
519579
}
520580

521581
#[test]
522-
fn test_consecutive_ranges() {
523-
let row_count = 100;
524-
let total_size = 429;
525-
let bytes_a = 0..143;
526-
let bytes_b = 143..286;
527-
let bytes_c = 286..429;
528-
529-
let rows_a = byte_range_to_row_range(bytes_a, row_count, total_size);
530-
let rows_b = byte_range_to_row_range(bytes_b, row_count, total_size);
531-
let rows_c = byte_range_to_row_range(bytes_c, row_count, total_size);
532-
533-
assert_eq!(rows_a.end - rows_a.start, 35);
534-
assert_eq!(rows_b.end - rows_b.start, 36);
535-
assert_eq!(rows_c.end - rows_c.start, 29);
536-
537-
assert_eq!(rows_a.start, 0);
538-
assert_eq!(rows_c.end, 100);
539-
for (left, right) in [rows_a, rows_b, rows_c].iter().tuple_windows() {
582+
fn test_split_aligned_ranges_cover_splits_exactly_once() {
583+
let split_ranges = vec![0..1, 1..4, 4..10, 10..13];
584+
let byte_ranges = [0..4, 4..8, 8..12, 12..16];
585+
586+
let assigned = byte_ranges
587+
.into_iter()
588+
.filter_map(|byte_range| split_aligned_row_range(byte_range, 16, &split_ranges))
589+
.collect::<Vec<_>>();
590+
591+
assert_eq!(assigned, vec![0..4, 4..10, 10..13]);
592+
assert_eq!(
593+
assigned
594+
.iter()
595+
.map(|range| range.end - range.start)
596+
.sum::<u64>(),
597+
13
598+
);
599+
600+
let split_starts = split_ranges
601+
.iter()
602+
.map(|range| range.start)
603+
.collect::<Vec<_>>();
604+
let split_ends = split_ranges
605+
.iter()
606+
.map(|range| range.end)
607+
.collect::<Vec<_>>();
608+
609+
for range in &assigned {
610+
assert!(split_starts.contains(&range.start));
611+
assert!(split_ends.contains(&range.end));
612+
}
613+
614+
for (left, right) in assigned.iter().tuple_windows() {
540615
assert_eq!(left.end, right.start);
541616
}
542617
}
@@ -577,6 +652,7 @@ mod tests {
577652
limit: None,
578653
metrics_registry: Arc::new(DefaultMetricsRegistry::default()),
579654
layout_readers: Default::default(),
655+
natural_split_ranges: Default::default(),
580656
has_output_ordering: false,
581657
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
582658
file_metadata_cache: None,
@@ -650,7 +726,7 @@ mod tests {
650726

651727
let file_schema = data_batch.schema();
652728
// Parallel scans may attach a byte range even for empty files; the
653-
// opener must not call byte_range_to_row_range when the row_count is 0.
729+
// opener must return early before attempting split-aligned translation.
654730
let file =
655731
PartitionedFile::new_with_range(file_path.to_string(), file_size, 0, file_size as i64);
656732

@@ -708,6 +784,7 @@ mod tests {
708784
limit: None,
709785
metrics_registry: Arc::new(DefaultMetricsRegistry::default()),
710786
layout_readers: Default::default(),
787+
natural_split_ranges: Default::default(),
711788
has_output_ordering: false,
712789
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
713790
file_metadata_cache: None,
@@ -794,6 +871,7 @@ mod tests {
794871
limit: None,
795872
metrics_registry: Arc::new(DefaultMetricsRegistry::default()),
796873
layout_readers: Default::default(),
874+
natural_split_ranges: Default::default(),
797875
has_output_ordering: false,
798876
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
799877
file_metadata_cache: None,
@@ -950,6 +1028,7 @@ mod tests {
9501028
limit: None,
9511029
metrics_registry: Arc::new(DefaultMetricsRegistry::default()),
9521030
layout_readers: Default::default(),
1031+
natural_split_ranges: Default::default(),
9531032
has_output_ordering: false,
9541033
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
9551034
file_metadata_cache: None,
@@ -1009,6 +1088,7 @@ mod tests {
10091088
limit: None,
10101089
metrics_registry: Arc::new(DefaultMetricsRegistry::default()),
10111090
layout_readers: Default::default(),
1091+
natural_split_ranges: Default::default(),
10121092
has_output_ordering: false,
10131093
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
10141094
file_metadata_cache: None,
@@ -1212,6 +1292,7 @@ mod tests {
12121292
limit: None,
12131293
metrics_registry: Arc::new(DefaultMetricsRegistry::default()),
12141294
layout_readers: Default::default(),
1295+
natural_split_ranges: Default::default(),
12151296
has_output_ordering: false,
12161297
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
12171298
file_metadata_cache: None,

vortex-datafusion/src/persistent/source.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
use std::any::Any;
55
use std::fmt::Formatter;
6+
use std::ops::Range;
67
use std::sync::Arc;
78
use std::sync::Weak;
89

@@ -185,6 +186,8 @@ pub struct VortexSource {
185186
///
186187
/// Sharing the readers allows us to only read every layout once from the file, even across partitions.
187188
layout_readers: Arc<DashMap<Path, Weak<dyn LayoutReader>>>,
189+
/// Shared full-file natural split ranges keyed by path.
190+
natural_split_ranges: Arc<DashMap<Path, Arc<[Range<u64>]>>>,
188191
expression_convertor: Arc<dyn ExpressionConvertor>,
189192
pub(crate) vortex_reader_factory: Option<Arc<dyn VortexReaderFactory>>,
190193
vx_metrics_registry: Arc<dyn MetricsRegistry>,
@@ -216,6 +219,7 @@ impl VortexSource {
216219
batch_size: None,
217220
_unused_df_metrics: Default::default(),
218221
layout_readers: Arc::new(DashMap::default()),
222+
natural_split_ranges: Arc::new(DashMap::default()),
219223
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
220224
vortex_reader_factory: None,
221225
vx_metrics_registry: Arc::new(DefaultMetricsRegistry::default()),
@@ -333,6 +337,7 @@ impl FileSource for VortexSource {
333337
limit: base_config.limit.map(|l| l as u64),
334338
metrics_registry: Arc::clone(&self.vx_metrics_registry),
335339
layout_readers: Arc::clone(&self.layout_readers),
340+
natural_split_ranges: Arc::clone(&self.natural_split_ranges),
336341
has_output_ordering: !base_config.output_ordering.is_empty(),
337342
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
338343
file_metadata_cache: self.file_metadata_cache.clone(),

0 commit comments

Comments
 (0)