Skip to content

Commit e6024a0

Browse files
committed
register_splits to get both offset and relative row_range
Signed-off-by: Onur Satici <onur@spiraldb.com>
1 parent 7349cd6 commit e6024a0

13 files changed

Lines changed: 227 additions & 59 deletions

File tree

vortex-cuda/src/layout.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ use vortex::layout::LayoutReader;
4949
use vortex::layout::LayoutReaderRef;
5050
use vortex::layout::LayoutRef;
5151
use vortex::layout::LayoutStrategy;
52+
use vortex::layout::SplitRange;
5253
use vortex::layout::VTable;
5354
use vortex::layout::layouts::SharedArrayFuture;
5455
use vortex::layout::segments::SegmentId;
@@ -284,10 +285,11 @@ impl LayoutReader for CudaFlatReader {
284285
fn register_splits(
285286
&self,
286287
_field_mask: &[FieldMask],
287-
row_range: &Range<u64>,
288+
split_range: &SplitRange,
288289
splits: &mut BTreeSet<u64>,
289290
) -> VortexResult<()> {
290-
splits.insert(row_range.start + self.layout.row_count);
291+
split_range.check_bounds(self.layout.row_count)?;
292+
splits.insert(split_range.root_row_range().end);
291293
Ok(())
292294
}
293295

vortex-file/public-api.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ pub fn vortex_file::v2::FileStatsLayoutReader::projection_evaluation(&self, &cor
9090

9191
pub fn vortex_file::v2::FileStatsLayoutReader::pruning_evaluation(&self, &core::ops::range::Range<u64>, &vortex_array::expr::expression::Expression, vortex_mask::Mask) -> vortex_error::VortexResult<vortex_array::mask_future::MaskFuture>
9292

93-
pub fn vortex_file::v2::FileStatsLayoutReader::register_splits(&self, &[vortex_array::dtype::field_mask::FieldMask], &core::ops::range::Range<u64>, &mut alloc::collections::btree::set::BTreeSet<u64>) -> vortex_error::VortexResult<()>
93+
pub fn vortex_file::v2::FileStatsLayoutReader::register_splits(&self, &[vortex_array::dtype::field_mask::FieldMask], &vortex_layout::reader::SplitRange, &mut alloc::collections::btree::set::BTreeSet<u64>) -> vortex_error::VortexResult<()>
9494

9595
pub fn vortex_file::v2::FileStatsLayoutReader::row_count(&self) -> u64
9696

vortex-file/src/v2/file_stats_reader.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use vortex_error::VortexResult;
3232
use vortex_layout::ArrayFuture;
3333
use vortex_layout::LayoutReader;
3434
use vortex_layout::LayoutReaderRef;
35+
use vortex_layout::SplitRange;
3536
use vortex_mask::Mask;
3637
use vortex_session::VortexSession;
3738
use vortex_utils::aliases::dash_map::DashMap;
@@ -156,10 +157,10 @@ impl LayoutReader for FileStatsLayoutReader {
156157
fn register_splits(
157158
&self,
158159
field_mask: &[FieldMask],
159-
row_range: &Range<u64>,
160+
split_range: &SplitRange,
160161
splits: &mut BTreeSet<u64>,
161162
) -> VortexResult<()> {
162-
self.child.register_splits(field_mask, row_range, splits)
163+
self.child.register_splits(field_mask, split_range, splits)
163164
}
164165

165166
fn pruning_evaluation(

vortex-layout/public-api.lock

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -636,7 +636,7 @@ pub fn vortex_layout::layouts::row_idx::RowIdxLayoutReader::projection_evaluatio
636636

637637
pub fn vortex_layout::layouts::row_idx::RowIdxLayoutReader::pruning_evaluation(&self, &core::ops::range::Range<u64>, &vortex_array::expr::expression::Expression, vortex_mask::Mask) -> vortex_error::VortexResult<vortex_array::mask_future::MaskFuture>
638638

639-
pub fn vortex_layout::layouts::row_idx::RowIdxLayoutReader::register_splits(&self, &[vortex_array::dtype::field_mask::FieldMask], &core::ops::range::Range<u64>, &mut alloc::collections::btree::set::BTreeSet<u64>) -> vortex_error::VortexResult<()>
639+
pub fn vortex_layout::layouts::row_idx::RowIdxLayoutReader::register_splits(&self, &[vortex_array::dtype::field_mask::FieldMask], &vortex_layout::SplitRange, &mut alloc::collections::btree::set::BTreeSet<u64>) -> vortex_error::VortexResult<()>
640640

641641
pub fn vortex_layout::layouts::row_idx::RowIdxLayoutReader::row_count(&self) -> u64
642642

@@ -1712,6 +1712,42 @@ pub fn vortex_layout::LazyReaderChildren::get(&self, usize) -> vortex_error::Vor
17121712

17131713
pub fn vortex_layout::LazyReaderChildren::new(alloc::sync::Arc<dyn vortex_layout::LayoutChildren>, alloc::vec::Vec<vortex_array::dtype::DType>, alloc::vec::Vec<alloc::sync::Arc<str>>, alloc::sync::Arc<dyn vortex_layout::segments::SegmentSource>, vortex_session::VortexSession) -> Self
17141714

1715+
pub struct vortex_layout::SplitRange
1716+
1717+
impl vortex_layout::SplitRange
1718+
1719+
pub fn vortex_layout::SplitRange::check_bounds(&self, u64) -> vortex_error::VortexResult<()>
1720+
1721+
pub fn vortex_layout::SplitRange::is_empty(&self) -> bool
1722+
1723+
pub fn vortex_layout::SplitRange::len(&self) -> u64
1724+
1725+
pub fn vortex_layout::SplitRange::root(core::ops::range::Range<u64>) -> vortex_error::VortexResult<Self>
1726+
1727+
pub fn vortex_layout::SplitRange::root_row_range(&self) -> core::ops::range::Range<u64>
1728+
1729+
pub fn vortex_layout::SplitRange::row_offset(&self) -> u64
1730+
1731+
pub fn vortex_layout::SplitRange::row_range(&self) -> &core::ops::range::Range<u64>
1732+
1733+
pub fn vortex_layout::SplitRange::try_new(u64, core::ops::range::Range<u64>) -> vortex_error::VortexResult<Self>
1734+
1735+
impl core::clone::Clone for vortex_layout::SplitRange
1736+
1737+
pub fn vortex_layout::SplitRange::clone(&self) -> vortex_layout::SplitRange
1738+
1739+
impl core::cmp::Eq for vortex_layout::SplitRange
1740+
1741+
impl core::cmp::PartialEq for vortex_layout::SplitRange
1742+
1743+
pub fn vortex_layout::SplitRange::eq(&self, &vortex_layout::SplitRange) -> bool
1744+
1745+
impl core::fmt::Debug for vortex_layout::SplitRange
1746+
1747+
pub fn vortex_layout::SplitRange::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result
1748+
1749+
impl core::marker::StructuralPartialEq for vortex_layout::SplitRange
1750+
17151751
pub trait vortex_layout::ArrayFutureExt
17161752

17171753
pub fn vortex_layout::ArrayFutureExt::masked(self, vortex_array::mask_future::MaskFuture) -> Self
@@ -1846,7 +1882,7 @@ pub fn vortex_layout::LayoutReader::projection_evaluation(&self, &core::ops::ran
18461882

18471883
pub fn vortex_layout::LayoutReader::pruning_evaluation(&self, &core::ops::range::Range<u64>, &vortex_array::expr::expression::Expression, vortex_mask::Mask) -> vortex_error::VortexResult<vortex_array::mask_future::MaskFuture>
18481884

1849-
pub fn vortex_layout::LayoutReader::register_splits(&self, &[vortex_array::dtype::field_mask::FieldMask], &core::ops::range::Range<u64>, &mut alloc::collections::btree::set::BTreeSet<u64>) -> vortex_error::VortexResult<()>
1885+
pub fn vortex_layout::LayoutReader::register_splits(&self, &[vortex_array::dtype::field_mask::FieldMask], &vortex_layout::SplitRange, &mut alloc::collections::btree::set::BTreeSet<u64>) -> vortex_error::VortexResult<()>
18501886

18511887
pub fn vortex_layout::LayoutReader::row_count(&self) -> u64
18521888

@@ -1864,7 +1900,7 @@ pub fn vortex_layout::layouts::row_idx::RowIdxLayoutReader::projection_evaluatio
18641900

18651901
pub fn vortex_layout::layouts::row_idx::RowIdxLayoutReader::pruning_evaluation(&self, &core::ops::range::Range<u64>, &vortex_array::expr::expression::Expression, vortex_mask::Mask) -> vortex_error::VortexResult<vortex_array::mask_future::MaskFuture>
18661902

1867-
pub fn vortex_layout::layouts::row_idx::RowIdxLayoutReader::register_splits(&self, &[vortex_array::dtype::field_mask::FieldMask], &core::ops::range::Range<u64>, &mut alloc::collections::btree::set::BTreeSet<u64>) -> vortex_error::VortexResult<()>
1903+
pub fn vortex_layout::layouts::row_idx::RowIdxLayoutReader::register_splits(&self, &[vortex_array::dtype::field_mask::FieldMask], &vortex_layout::SplitRange, &mut alloc::collections::btree::set::BTreeSet<u64>) -> vortex_error::VortexResult<()>
18681904

18691905
pub fn vortex_layout::layouts::row_idx::RowIdxLayoutReader::row_count(&self) -> u64
18701906

vortex-layout/src/layouts/chunked/reader.rs

Lines changed: 79 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ use futures::FutureExt;
1010
use futures::TryStreamExt;
1111
use futures::future::BoxFuture;
1212
use futures::stream::FuturesOrdered;
13-
use itertools::Itertools;
1413
use tracing::trace;
1514
use vortex_array::ArrayRef;
1615
use vortex_array::Canonical;
@@ -30,6 +29,7 @@ use crate::LayoutReaderRef;
3029
use crate::LazyReaderChildren;
3130
use crate::layouts::chunked::ChunkedLayout;
3231
use crate::reader::LayoutReader;
32+
use crate::reader::SplitRange;
3333
use crate::segments::SegmentSource;
3434

3535
/// A [`LayoutReader`] for chunked layouts.
@@ -107,10 +107,11 @@ impl ChunkedReader {
107107
fn ranges<'a>(
108108
&'a self,
109109
row_range: &'a Range<u64>,
110-
) -> impl Iterator<Item = (usize, Range<u64>, Range<usize>)> + 'a {
110+
) -> impl Iterator<Item = (usize, u64, Range<u64>, Range<usize>)> + 'a {
111111
self.chunk_range(row_range).map(move |chunk_idx| {
112112
// Figure out the chunk row range relative to the mask's row range.
113113
let chunk_row_range = self.chunk_offset(chunk_idx)..self.chunk_offset(chunk_idx + 1);
114+
let chunk_start = chunk_row_range.start;
114115

115116
// Find the intersection of the mask and the chunk row ranges.
116117
let intersecting_row_range =
@@ -146,7 +147,7 @@ impl ChunkedReader {
146147
.vortex_expect("Chunk range calculation overflow");
147148
let chunk_range = chunk_relative_start..chunk_relative_end;
148149

149-
(chunk_idx, chunk_range, mask_range)
150+
(chunk_idx, chunk_start, chunk_range, mask_range)
150151
})
151152
}
152153
}
@@ -167,37 +168,32 @@ impl LayoutReader for ChunkedReader {
167168
fn register_splits(
168169
&self,
169170
field_mask: &[FieldMask],
170-
row_range: &Range<u64>,
171+
split_range: &SplitRange,
171172
splits: &mut BTreeSet<u64>,
172173
) -> VortexResult<()> {
173-
if row_range.is_empty() {
174+
split_range.check_bounds(self.layout.row_count())?;
175+
176+
if split_range.is_empty() {
174177
return Ok(());
175178
}
176179

177-
for (index, (&start, &end)) in self
178-
.chunk_offsets
179-
.iter()
180-
.tuple_windows::<(_, _)>()
181-
.enumerate()
182-
{
183-
if end < row_range.start {
184-
continue;
185-
}
180+
for (chunk_idx, chunk_start, child_range, _) in self.ranges(split_range.row_range()) {
181+
let child = self.chunk_reader(chunk_idx)?;
182+
let child_row_offset = split_range
183+
.row_offset()
184+
.checked_add(chunk_start)
185+
.vortex_expect("Chunked layout split offset overflow");
186+
let child_split_range = SplitRange::try_new(child_row_offset, child_range)?;
186187

187-
if start >= row_range.end {
188-
break;
189-
}
190-
191-
// Child overlaps in whole or in part with split
192-
let child = self.chunk_reader(index)?;
193-
let child_range =
194-
std::cmp::max(row_range.start, start)..std::cmp::min(row_range.end, end);
195-
196-
// Register any splits from the child
197-
child.register_splits(field_mask, &child_range, splits)?;
188+
child.register_splits(field_mask, &child_split_range, splits)?;
198189

199190
// Register the split indicating the end of this chunk
200-
splits.insert(child_range.end);
191+
splits.insert(
192+
split_range
193+
.row_offset()
194+
.checked_add(chunk_start + child_split_range.row_range().end)
195+
.vortex_expect("Chunked layout split offset overflow"),
196+
);
201197
}
202198

203199
Ok(())
@@ -215,7 +211,7 @@ impl LayoutReader for ChunkedReader {
215211

216212
let mut chunk_evals = vec![];
217213

218-
for (chunk_idx, chunk_range, mask_range) in self.ranges(row_range) {
214+
for (chunk_idx, _, chunk_range, mask_range) in self.ranges(row_range) {
219215
let chunk_reader = self.chunk_reader(chunk_idx)?;
220216
let chunk_eval = chunk_reader
221217
.pruning_evaluation(&chunk_range, expr, mask.slice(mask_range))
@@ -261,7 +257,7 @@ impl LayoutReader for ChunkedReader {
261257

262258
let mut chunk_evals = vec![];
263259

264-
for (chunk_idx, chunk_range, mask_range) in self.ranges(row_range) {
260+
for (chunk_idx, _, chunk_range, mask_range) in self.ranges(row_range) {
265261
let chunk_reader = self.chunk_reader(chunk_idx)?;
266262
let chunk_eval = chunk_reader
267263
.filter_evaluation(&chunk_range, expr, mask.slice(mask_range))
@@ -301,7 +297,7 @@ impl LayoutReader for ChunkedReader {
301297

302298
let mut chunk_evals = vec![];
303299

304-
for (chunk_idx, chunk_range, mask_range) in self.ranges(row_range) {
300+
for (chunk_idx, _, chunk_range, mask_range) in self.ranges(row_range) {
305301
let chunk_reader = self.chunk_reader(chunk_idx)?;
306302
let chunk_eval = chunk_reader
307303
.projection_evaluation(&chunk_range, expr, mask.slice(mask_range))
@@ -343,17 +339,25 @@ mod test {
343339
use vortex_array::MaskFuture;
344340
use vortex_array::assert_arrays_eq;
345341
use vortex_array::dtype::DType;
342+
use vortex_array::dtype::FieldMask;
346343
use vortex_array::dtype::Nullability::NonNullable;
347344
use vortex_array::dtype::PType;
348345
use vortex_array::expr::root;
349346
use vortex_buffer::buffer;
350347
use vortex_io::runtime::single::block_on;
351348
use vortex_io::session::RuntimeSessionExt;
349+
use vortex_session::registry::ReadContext;
352350

351+
use crate::IntoLayout;
353352
use crate::LayoutRef;
354353
use crate::LayoutStrategy;
354+
use crate::OwnedLayoutChildren;
355+
use crate::layouts::chunked::ChunkedLayout;
355356
use crate::layouts::chunked::writer::ChunkedLayoutStrategy;
357+
use crate::layouts::flat::FlatLayout;
356358
use crate::layouts::flat::writer::FlatLayoutStrategy;
359+
use crate::scan::split_by::SplitBy;
360+
use crate::segments::SegmentId;
357361
use crate::segments::SegmentSource;
358362
use crate::segments::TestSegments;
359363
use crate::sequence::SequenceId;
@@ -395,6 +399,52 @@ mod test {
395399
(segments, layout)
396400
}
397401

402+
fn nested_chunked_layout() -> LayoutRef {
403+
let dtype = DType::Primitive(PType::U8, NonNullable);
404+
let ctx = ReadContext::new([]);
405+
let flat = |segment_id| {
406+
FlatLayout::new(5, dtype.clone(), SegmentId::from(segment_id), ctx.clone())
407+
.into_layout()
408+
};
409+
let nested = |first_segment_id| {
410+
ChunkedLayout::new(
411+
10,
412+
dtype.clone(),
413+
OwnedLayoutChildren::layout_children(vec![
414+
flat(first_segment_id),
415+
flat(first_segment_id + 1),
416+
]),
417+
)
418+
.into_layout()
419+
};
420+
421+
ChunkedLayout::new(
422+
30,
423+
dtype.clone(),
424+
OwnedLayoutChildren::layout_children(vec![nested(0), nested(2), nested(4)]),
425+
)
426+
.into_layout()
427+
}
428+
429+
#[rstest]
430+
#[case(0..30, [0, 5, 10, 15, 20, 25, 30])]
431+
#[case(7..23, [7, 10, 15, 20, 23])]
432+
fn test_nested_chunked_layout_splits(
433+
#[case] row_range: std::ops::Range<u64>,
434+
#[case] expected: impl IntoIterator<Item = u64>,
435+
) {
436+
let layout = nested_chunked_layout();
437+
let reader = layout
438+
.new_reader("".into(), Arc::new(TestSegments::default()), &SESSION)
439+
.unwrap();
440+
441+
let splits = SplitBy::Layout
442+
.splits(reader.as_ref(), &row_range, &[FieldMask::All])
443+
.unwrap();
444+
445+
assert_eq!(splits, expected.into_iter().collect());
446+
}
447+
398448
#[rstest]
399449
fn test_chunked_evaluator(
400450
#[from(chunked_layout)] (segments, layout): (Arc<dyn SegmentSource>, LayoutRef),

vortex-layout/src/layouts/dict/reader.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use vortex_utils::aliases::dash_map::DashMap;
3232
use super::DictLayout;
3333
use crate::LayoutReader;
3434
use crate::LayoutReaderRef;
35+
use crate::SplitRange;
3536
use crate::layouts::SharedArrayFuture;
3637
use crate::segments::SegmentSource;
3738

@@ -166,10 +167,10 @@ impl LayoutReader for DictReader {
166167
fn register_splits(
167168
&self,
168169
field_mask: &[FieldMask],
169-
row_range: &Range<u64>,
170+
split_range: &SplitRange,
170171
splits: &mut BTreeSet<u64>,
171172
) -> VortexResult<()> {
172-
self.codes.register_splits(field_mask, row_range, splits)
173+
self.codes.register_splits(field_mask, split_range, splits)
173174
}
174175

175176
fn pruning_evaluation(

vortex-layout/src/layouts/flat/reader.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@ use vortex_error::VortexResult;
2121
use vortex_mask::Mask;
2222
use vortex_session::VortexSession;
2323

24-
use crate::LayoutReader;
2524
use crate::layouts::SharedArrayFuture;
2625
use crate::layouts::flat::FlatLayout;
26+
use crate::reader::LayoutReader;
27+
use crate::reader::SplitRange;
2728
use crate::segments::SegmentSource;
2829

2930
/// The threshold of mask density below which we will evaluate the expression only over the
@@ -103,10 +104,11 @@ impl LayoutReader for FlatReader {
103104
fn register_splits(
104105
&self,
105106
_field_mask: &[FieldMask],
106-
row_range: &Range<u64>,
107+
split_range: &SplitRange,
107108
splits: &mut BTreeSet<u64>,
108109
) -> VortexResult<()> {
109-
splits.insert(row_range.start + self.layout.row_count);
110+
split_range.check_bounds(self.layout.row_count)?;
111+
splits.insert(split_range.root_row_range().end);
110112
Ok(())
111113
}
112114

vortex-layout/src/layouts/row_idx/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ use vortex_utils::aliases::dash_map::DashMap;
4343

4444
use crate::ArrayFuture;
4545
use crate::LayoutReader;
46+
use crate::SplitRange;
4647
use crate::layouts::partitioned::PartitionedExprEval;
4748

4849
pub struct RowIdxLayoutReader {
@@ -170,10 +171,10 @@ impl LayoutReader for RowIdxLayoutReader {
170171
fn register_splits(
171172
&self,
172173
field_mask: &[FieldMask],
173-
row_range: &Range<u64>,
174+
split_range: &SplitRange,
174175
splits: &mut BTreeSet<u64>,
175176
) -> VortexResult<()> {
176-
self.child.register_splits(field_mask, row_range, splits)
177+
self.child.register_splits(field_mask, split_range, splits)
177178
}
178179

179180
fn pruning_evaluation(

0 commit comments

Comments
 (0)