Skip to content

Commit f10157f

Browse files
authored
row range scans to prune unneeded splits (#4602)
Signed-off-by: Onur Satici <onur@spiraldb.com>
1 parent ff299eb commit f10157f

4 files changed

Lines changed: 36 additions & 48 deletions

File tree

vortex-file/src/file.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
use std::ops::Range;
1010
use std::sync::Arc;
1111

12+
use itertools::Itertools;
1213
use vortex_array::ArrayRef;
1314
use vortex_array::stats::StatsSet;
1415
use vortex_dtype::{DType, Field, FieldMask, FieldPath, FieldPathSet};
@@ -136,6 +137,11 @@ impl VortexFile {
136137
}
137138

138139
pub fn splits(&self) -> VortexResult<Vec<Range<u64>>> {
139-
SplitBy::Layout.splits(self.layout_reader()?.as_ref(), &[FieldMask::All])
140+
Ok(SplitBy::Layout
141+
.splits(self.layout_reader()?.as_ref(), &[FieldMask::All])?
142+
.into_iter()
143+
.tuple_windows()
144+
.map(|(start, end)| start..end)
145+
.collect())
140146
}
141147
}

vortex-scan/src/lib.rs

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4-
use std::cmp;
4+
use std::collections::BTreeSet;
55
use std::ops::Range;
66
use std::sync::Arc;
7+
use std::{cmp, iter};
78

89
use futures::future::BoxFuture;
910
use itertools::Itertools;
@@ -326,7 +327,7 @@ pub struct RepeatedScan<A: 'static + Send> {
326327
/// The selection mask to apply to the selected row range.
327328
selection: Selection,
328329
/// The natural splits of the file.
329-
splits: Vec<Range<u64>>,
330+
splits: BTreeSet<u64>,
330331
/// The number of splits to make progress on concurrently **per-thread**.
331332
concurrency: usize,
332333
/// Function to apply to each [`ArrayRef`] within the spawned split tasks.
@@ -342,27 +343,38 @@ impl<A: 'static + Send> RepeatedScan<A> {
342343
&self,
343344
row_range: Option<Range<u64>>,
344345
) -> VortexResult<Vec<BoxFuture<'static, VortexResult<Option<A>>>>> {
345-
let row_range = intersect_ranges(self.row_range.as_ref(), row_range);
346-
347346
let ctx = Arc::new(TaskContext {
348-
row_range,
349347
selection: self.selection.clone(),
350348
filter: self.filter.clone().map(|f| Arc::new(FilterExpr::new(f))),
351349
reader: self.layout_reader.clone(),
352350
projection: self.projection.clone(),
353351
mapper: self.map_fn.clone(),
354352
});
355353

354+
let row_range = intersect_ranges(self.row_range.as_ref(), row_range);
355+
let splits_iter: Box<dyn Iterator<Item = _>> = match row_range {
356+
None => Box::new(self.splits.iter().copied()),
357+
Some(range) => {
358+
if range.start > range.end {
359+
return Ok(Vec::new());
360+
}
361+
Box::new(
362+
iter::once(range.start)
363+
.chain(self.splits.range(range.clone()).copied())
364+
.chain(iter::once(range.end)),
365+
)
366+
}
367+
};
368+
356369
// Create a task that executes the full scan pipeline for each split.
357370
let mut limit = self.limit;
358-
let split_tasks = self
359-
.splits
360-
.iter()
361-
.filter_map(|split_range| {
362-
if limit.is_some_and(|l| l == 0) {
371+
let split_tasks = splits_iter
372+
.tuple_windows()
373+
.filter_map(|(start, end)| {
374+
if limit.is_some_and(|l| l == 0) || start >= end {
363375
None
364376
} else {
365-
Some(split_exec(ctx.clone(), split_range.clone(), limit.as_mut()))
377+
Some(split_exec(ctx.clone(), start..end, limit.as_mut()))
366378
}
367379
})
368380
.try_collect()?;

vortex-scan/src/split_by.rs

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,8 @@
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

44
use std::collections::BTreeSet;
5-
use std::ops::Range;
5+
use std::iter::once;
66

7-
use itertools::Itertools;
87
use vortex_array::stats::StatBound;
98
use vortex_dtype::FieldMask;
109
use vortex_error::{VortexResult, vortex_err};
@@ -31,32 +30,21 @@ impl SplitBy {
3130
&self,
3231
layout_reader: &dyn LayoutReader,
3332
field_mask: &[FieldMask],
34-
) -> VortexResult<Vec<Range<u64>>> {
33+
) -> VortexResult<BTreeSet<u64>> {
3534
Ok(match *self {
3635
SplitBy::Layout => {
3736
let mut row_splits = BTreeSet::<u64>::new();
3837
row_splits.insert(0);
3938

4039
// Register the splits for all the layouts.
4140
layout_reader.register_splits(field_mask, 0, &mut row_splits)?;
42-
4341
row_splits
44-
.into_iter()
45-
.tuple_windows()
46-
.map(|(start, end)| start..end)
47-
.collect()
4842
}
4943
SplitBy::RowCount(n) => {
5044
let row_count = *layout_reader.row_count().to_exact().ok_or_else(|| {
5145
vortex_err!("Cannot split layout by row count, row count is not exact")
5246
})?;
53-
let mut splits =
54-
Vec::with_capacity(usize::try_from((row_count + n as u64) / n as u64)?);
55-
for start in (0..row_count).step_by(n) {
56-
let end = (start + n as u64).min(row_count);
57-
splits.push(start..end);
58-
}
59-
splits
47+
(0..row_count).step_by(n).chain(once(row_count)).collect()
6048
}
6149
})
6250
}
@@ -103,7 +91,7 @@ mod test {
10391
let splits = SplitBy::Layout
10492
.splits(reader.as_ref(), &[FieldMask::Exact(FieldPath::root())])
10593
.unwrap();
106-
assert_eq!(splits, vec![0..10]);
94+
assert_eq!(splits, [0, 10].into_iter().collect());
10795
}
10896

10997
#[test]
@@ -132,6 +120,6 @@ mod test {
132120
let splits = SplitBy::RowCount(3)
133121
.splits(reader.as_ref(), &[FieldMask::Exact(FieldPath::root())])
134122
.unwrap();
135-
assert_eq!(splits, vec![0..3, 3..6, 6..9, 9..10]);
123+
assert_eq!(splits, [0, 3, 6, 9, 10].into_iter().collect());
136124
}
137125
}

vortex-scan/src/tasks.rs

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -38,23 +38,8 @@ pub(super) fn split_exec<A: 'static + Send>(
3838
split: Range<u64>,
3939
limit: Option<&mut usize>,
4040
) -> VortexResult<TaskFuture<Option<A>>> {
41-
// Step 1: using the caller-provided row range and selection, attempt to disregard this split.
42-
let read_range = match &ctx.row_range {
43-
None => split,
44-
Some(row_range) => {
45-
if row_range.start >= split.end || row_range.end < split.start {
46-
// No overlap for this task
47-
return Ok(ok(None).boxed());
48-
}
49-
50-
let intersect_start = row_range.start.max(split.start);
51-
let intersect_end = row_range.end.min(split.end);
52-
intersect_start..intersect_end
53-
}
54-
};
55-
5641
// Apply the selection to calculate a read mask
57-
let read_mask = ctx.selection.row_mask(&read_range);
42+
let read_mask = ctx.selection.row_mask(&split);
5843
let row_range = read_mask.row_range();
5944
let row_mask = read_mask.mask().clone();
6045
if row_mask.all_false() {
@@ -167,9 +152,6 @@ pub(super) fn split_exec<A: 'static + Send>(
167152

168153
/// Information needed to execute a single split task.
169154
pub(super) struct TaskContext<A> {
170-
/// A caller-provided range of the file to read. All tasks should intersect their reads
171-
/// with this range to ensure that they are split as well.
172-
pub(super) row_range: Option<Range<u64>>,
173155
/// A row selection to apply.
174156
pub(super) selection: Selection,
175157
/// The shared filter expression.

0 commit comments

Comments
 (0)