@@ -108,6 +108,12 @@ pub fn interleave(
108108 DataType :: Struct ( fields) => interleave_struct( fields, values, indices) ,
109109 DataType :: List ( field) => interleave_list:: <i32 >( values, indices, field) ,
110110 DataType :: LargeList ( field) => interleave_list:: <i64 >( values, indices, field) ,
111+ DataType :: RunEndEncoded ( r, _) => match r. data_type( ) {
112+ DataType :: Int16 => interleave_run_end:: <Int16Type >( values, indices) ,
113+ DataType :: Int32 => interleave_run_end:: <Int32Type >( values, indices) ,
114+ DataType :: Int64 => interleave_run_end:: <Int64Type >( values, indices) ,
115+ t => unreachable!( "illegal run-end type {t}" ) ,
116+ } ,
111117 _ => interleave_fallback( values, indices)
112118 }
113119}
@@ -411,6 +417,70 @@ fn interleave_list<O: OffsetSizeTrait>(
411417 Ok ( Arc :: new ( list_array) )
412418}
413419
420+ /// Specialized [`interleave`] for [`RunArray`].
421+ fn interleave_run_end < R : RunEndIndexType > (
422+ values : & [ & dyn Array ] ,
423+ indices : & [ ( usize , usize ) ] ,
424+ ) -> Result < ArrayRef , ArrowError > {
425+ if indices. is_empty ( ) {
426+ return Ok ( new_empty_array ( values[ 0 ] . data_type ( ) ) ) ;
427+ }
428+
429+ let n = indices. len ( ) ;
430+ R :: Native :: from_usize ( n) . ok_or_else ( || {
431+ ArrowError :: ComputeError ( format ! (
432+ "interleave_run_end: output length {n} does not fit run-end type"
433+ ) )
434+ } ) ?;
435+
436+ let runs: Vec < & RunArray < R > > = values. iter ( ) . map ( |a| a. as_run :: < R > ( ) ) . collect ( ) ;
437+ let value_arrays: Vec < & dyn Array > = runs. iter ( ) . map ( |r| r. values ( ) . as_ref ( ) ) . collect ( ) ;
438+
439+ // Resolve each (array, logical_row) to (array, physical_row), so we can
440+ // lookup physical indices by batch.
441+ let mut phys_pairs: Vec < ( usize , usize ) > = vec ! [ ( 0 , 0 ) ; n] ;
442+ let mut grouped: Vec < ( Vec < R :: Native > , Vec < usize > ) > =
443+ ( 0 ..runs. len ( ) ) . map ( |_| ( Vec :: new ( ) , Vec :: new ( ) ) ) . collect ( ) ;
444+ for ( out_pos, & ( arr, row) ) in indices. iter ( ) . enumerate ( ) {
445+ let row = R :: Native :: from_usize ( row) . ok_or_else ( || {
446+ ArrowError :: InvalidArgumentError ( format ! (
447+ "interleave_run_end: row index {row} out of range"
448+ ) )
449+ } ) ?;
450+ grouped[ arr] . 0 . push ( row) ;
451+ grouped[ arr] . 1 . push ( out_pos) ;
452+ }
453+ for ( arr_idx, ( logical_rows, out_positions) ) in grouped. into_iter ( ) . enumerate ( ) {
454+ let phys = runs[ arr_idx] . get_physical_indices ( & logical_rows) ?;
455+ for ( p, out_pos) in phys. iter ( ) . zip ( out_positions. iter ( ) ) {
456+ phys_pairs[ * out_pos] = ( arr_idx, * p) ;
457+ }
458+ }
459+
460+ // Coalesce by physical-pair equality only: emit a new run when the
461+ // (array_idx, physical_idx) pair changes between adjacent output rows.
462+ // TODO: We could perform an equality check across sources to extend the
463+ // output run, but we can't call make_comparator from this crate.
464+ let mut run_ends_buf: Vec < R :: Native > = Vec :: with_capacity ( n) ;
465+ let mut dedup_pairs: Vec < ( usize , usize ) > = Vec :: with_capacity ( n) ;
466+ dedup_pairs. push ( phys_pairs[ 0 ] ) ;
467+ for i in 1 ..n {
468+ if phys_pairs[ i] != phys_pairs[ i - 1 ] {
469+ run_ends_buf. push ( R :: Native :: from_usize ( i) . unwrap ( ) ) ;
470+ dedup_pairs. push ( phys_pairs[ i] ) ;
471+ }
472+ }
473+ run_ends_buf. push ( R :: Native :: from_usize ( n) . unwrap ( ) ) ;
474+
475+ let taken_values = interleave ( & value_arrays, & dedup_pairs) ?;
476+ let run_ends = PrimitiveArray :: < R > :: from_iter_values ( run_ends_buf) ;
477+
478+ Ok ( Arc :: new ( RunArray :: < R > :: try_new (
479+ & run_ends,
480+ taken_values. as_ref ( ) ,
481+ ) ?) )
482+ }
483+
414484/// Fallback implementation of interleave using [`MutableArrayData`]
415485fn interleave_fallback (
416486 values : & [ & dyn Array ] ,
0 commit comments