@@ -10,7 +10,6 @@ use futures::FutureExt;
1010use futures:: TryStreamExt ;
1111use futures:: future:: BoxFuture ;
1212use futures:: stream:: FuturesOrdered ;
13- use itertools:: Itertools ;
1413use tracing:: trace;
1514use vortex_array:: ArrayRef ;
1615use vortex_array:: Canonical ;
@@ -30,6 +29,7 @@ use crate::LayoutReaderRef;
3029use crate :: LazyReaderChildren ;
3130use crate :: layouts:: chunked:: ChunkedLayout ;
3231use crate :: reader:: LayoutReader ;
32+ use crate :: reader:: SplitRange ;
3333use crate :: segments:: SegmentSource ;
3434
3535/// A [`LayoutReader`] for chunked layouts.
@@ -107,10 +107,11 @@ impl ChunkedReader {
107107 fn ranges < ' a > (
108108 & ' a self ,
109109 row_range : & ' a Range < u64 > ,
110- ) -> impl Iterator < Item = ( usize , Range < u64 > , Range < usize > ) > + ' a {
110+ ) -> impl Iterator < Item = ( usize , u64 , Range < u64 > , Range < usize > ) > + ' a {
111111 self . chunk_range ( row_range) . map ( move |chunk_idx| {
112112 // Figure out the chunk row range relative to the mask's row range.
113113 let chunk_row_range = self . chunk_offset ( chunk_idx) ..self . chunk_offset ( chunk_idx + 1 ) ;
114+ let chunk_start = chunk_row_range. start ;
114115
115116 // Find the intersection of the mask and the chunk row ranges.
116117 let intersecting_row_range =
@@ -146,7 +147,7 @@ impl ChunkedReader {
146147 . vortex_expect ( "Chunk range calculation overflow" ) ;
147148 let chunk_range = chunk_relative_start..chunk_relative_end;
148149
149- ( chunk_idx, chunk_range, mask_range)
150+ ( chunk_idx, chunk_start , chunk_range, mask_range)
150151 } )
151152 }
152153}
@@ -167,37 +168,32 @@ impl LayoutReader for ChunkedReader {
167168 fn register_splits (
168169 & self ,
169170 field_mask : & [ FieldMask ] ,
170- row_range : & Range < u64 > ,
171+ split_range : & SplitRange ,
171172 splits : & mut BTreeSet < u64 > ,
172173 ) -> VortexResult < ( ) > {
173- if row_range. is_empty ( ) {
174+ split_range. check_bounds ( self . layout . row_count ( ) ) ?;
175+
176+ if split_range. is_empty ( ) {
174177 return Ok ( ( ) ) ;
175178 }
176179
177- for ( index, ( & start, & end) ) in self
178- . chunk_offsets
179- . iter ( )
180- . tuple_windows :: < ( _ , _ ) > ( )
181- . enumerate ( )
182- {
183- if end < row_range. start {
184- continue ;
185- }
180+ for ( chunk_idx, chunk_start, child_range, _) in self . ranges ( split_range. row_range ( ) ) {
181+ let child = self . chunk_reader ( chunk_idx) ?;
182+ let child_row_offset = split_range
183+ . row_offset ( )
184+ . checked_add ( chunk_start)
185+ . vortex_expect ( "Chunked layout split offset overflow" ) ;
186+ let child_split_range = SplitRange :: try_new ( child_row_offset, child_range) ?;
186187
187- if start >= row_range. end {
188- break ;
189- }
190-
191- // Child overlaps in whole or in part with split
192- let child = self . chunk_reader ( index) ?;
193- let child_range =
194- std:: cmp:: max ( row_range. start , start) ..std:: cmp:: min ( row_range. end , end) ;
195-
196- // Register any splits from the child
197- child. register_splits ( field_mask, & child_range, splits) ?;
188+ child. register_splits ( field_mask, & child_split_range, splits) ?;
198189
199190 // Register the split indicating the end of this chunk
200- splits. insert ( child_range. end ) ;
191+ splits. insert (
192+ split_range
193+ . row_offset ( )
194+ . checked_add ( chunk_start + child_split_range. row_range ( ) . end )
195+ . vortex_expect ( "Chunked layout split offset overflow" ) ,
196+ ) ;
201197 }
202198
203199 Ok ( ( ) )
@@ -215,7 +211,7 @@ impl LayoutReader for ChunkedReader {
215211
216212 let mut chunk_evals = vec ! [ ] ;
217213
218- for ( chunk_idx, chunk_range, mask_range) in self . ranges ( row_range) {
214+ for ( chunk_idx, _ , chunk_range, mask_range) in self . ranges ( row_range) {
219215 let chunk_reader = self . chunk_reader ( chunk_idx) ?;
220216 let chunk_eval = chunk_reader
221217 . pruning_evaluation ( & chunk_range, expr, mask. slice ( mask_range) )
@@ -261,7 +257,7 @@ impl LayoutReader for ChunkedReader {
261257
262258 let mut chunk_evals = vec ! [ ] ;
263259
264- for ( chunk_idx, chunk_range, mask_range) in self . ranges ( row_range) {
260+ for ( chunk_idx, _ , chunk_range, mask_range) in self . ranges ( row_range) {
265261 let chunk_reader = self . chunk_reader ( chunk_idx) ?;
266262 let chunk_eval = chunk_reader
267263 . filter_evaluation ( & chunk_range, expr, mask. slice ( mask_range) )
@@ -301,7 +297,7 @@ impl LayoutReader for ChunkedReader {
301297
302298 let mut chunk_evals = vec ! [ ] ;
303299
304- for ( chunk_idx, chunk_range, mask_range) in self . ranges ( row_range) {
300+ for ( chunk_idx, _ , chunk_range, mask_range) in self . ranges ( row_range) {
305301 let chunk_reader = self . chunk_reader ( chunk_idx) ?;
306302 let chunk_eval = chunk_reader
307303 . projection_evaluation ( & chunk_range, expr, mask. slice ( mask_range) )
@@ -343,17 +339,25 @@ mod test {
343339 use vortex_array:: MaskFuture ;
344340 use vortex_array:: assert_arrays_eq;
345341 use vortex_array:: dtype:: DType ;
342+ use vortex_array:: dtype:: FieldMask ;
346343 use vortex_array:: dtype:: Nullability :: NonNullable ;
347344 use vortex_array:: dtype:: PType ;
348345 use vortex_array:: expr:: root;
349346 use vortex_buffer:: buffer;
350347 use vortex_io:: runtime:: single:: block_on;
351348 use vortex_io:: session:: RuntimeSessionExt ;
349+ use vortex_session:: registry:: ReadContext ;
352350
351+ use crate :: IntoLayout ;
353352 use crate :: LayoutRef ;
354353 use crate :: LayoutStrategy ;
354+ use crate :: OwnedLayoutChildren ;
355+ use crate :: layouts:: chunked:: ChunkedLayout ;
355356 use crate :: layouts:: chunked:: writer:: ChunkedLayoutStrategy ;
357+ use crate :: layouts:: flat:: FlatLayout ;
356358 use crate :: layouts:: flat:: writer:: FlatLayoutStrategy ;
359+ use crate :: scan:: split_by:: SplitBy ;
360+ use crate :: segments:: SegmentId ;
357361 use crate :: segments:: SegmentSource ;
358362 use crate :: segments:: TestSegments ;
359363 use crate :: sequence:: SequenceId ;
@@ -395,6 +399,52 @@ mod test {
395399 ( segments, layout)
396400 }
397401
402+ fn nested_chunked_layout ( ) -> LayoutRef {
403+ let dtype = DType :: Primitive ( PType :: U8 , NonNullable ) ;
404+ let ctx = ReadContext :: new ( [ ] ) ;
405+ let flat = |segment_id| {
406+ FlatLayout :: new ( 5 , dtype. clone ( ) , SegmentId :: from ( segment_id) , ctx. clone ( ) )
407+ . into_layout ( )
408+ } ;
409+ let nested = |first_segment_id| {
410+ ChunkedLayout :: new (
411+ 10 ,
412+ dtype. clone ( ) ,
413+ OwnedLayoutChildren :: layout_children ( vec ! [
414+ flat( first_segment_id) ,
415+ flat( first_segment_id + 1 ) ,
416+ ] ) ,
417+ )
418+ . into_layout ( )
419+ } ;
420+
421+ ChunkedLayout :: new (
422+ 30 ,
423+ dtype. clone ( ) ,
424+ OwnedLayoutChildren :: layout_children ( vec ! [ nested( 0 ) , nested( 2 ) , nested( 4 ) ] ) ,
425+ )
426+ . into_layout ( )
427+ }
428+
429+ #[ rstest]
430+ #[ case( 0 ..30 , [ 0 , 5 , 10 , 15 , 20 , 25 , 30 ] ) ]
431+ #[ case( 7 ..23 , [ 7 , 10 , 15 , 20 , 23 ] ) ]
432+ fn test_nested_chunked_layout_splits (
433+ #[ case] row_range : std:: ops:: Range < u64 > ,
434+ #[ case] expected : impl IntoIterator < Item = u64 > ,
435+ ) {
436+ let layout = nested_chunked_layout ( ) ;
437+ let reader = layout
438+ . new_reader ( "" . into ( ) , Arc :: new ( TestSegments :: default ( ) ) , & SESSION )
439+ . unwrap ( ) ;
440+
441+ let splits = SplitBy :: Layout
442+ . splits ( reader. as_ref ( ) , & row_range, & [ FieldMask :: All ] )
443+ . unwrap ( ) ;
444+
445+ assert_eq ! ( splits, expected. into_iter( ) . collect( ) ) ;
446+ }
447+
398448 #[ rstest]
399449 fn test_chunked_evaluator (
400450 #[ from( chunked_layout) ] ( segments, layout) : ( Arc < dyn SegmentSource > , LayoutRef ) ,
0 commit comments