33
44use std:: fmt:: Debug ;
55use std:: hash:: Hash ;
6-
6+ use std:: sync:: Arc ;
7+
8+ use arrow_array:: ArrayRef as ArrowArrayRef ;
9+ use arrow_array:: RunArray ;
10+ use arrow_array:: cast:: AsArray ;
11+ use arrow_array:: types:: * ;
12+ use arrow_buffer:: ArrowNativeType ;
13+ use arrow_schema:: DataType ;
14+ use arrow_schema:: Field ;
715use vortex_array:: ArrayEq ;
816use vortex_array:: ArrayHash ;
917use vortex_array:: ArrayRef ;
@@ -17,6 +25,7 @@ use vortex_array::ProstMetadata;
1725use vortex_array:: SerializeMetadata ;
1826use vortex_array:: arrays:: Primitive ;
1927use vortex_array:: arrays:: VarBinViewArray ;
28+ use vortex_array:: arrow:: ArrowArrayExecutor ;
2029use vortex_array:: buffer:: BufferHandle ;
2130use vortex_array:: dtype:: DType ;
2231use vortex_array:: dtype:: Nullability ;
@@ -203,6 +212,41 @@ impl VTable for RunEnd {
203212 PARENT_KERNELS . execute ( array, parent, child_idx, ctx)
204213 }
205214
215+ fn preferred_arrow_data_type ( array : & RunEndArray ) -> Option < DataType > {
216+ let ends_dt = array. ends ( ) . dtype ( ) . to_arrow_dtype ( ) . ok ( ) ?;
217+ let values_dt = array. values ( ) . dtype ( ) . to_arrow_dtype ( ) . ok ( ) ?;
218+ let values_nullable = array. values ( ) . dtype ( ) . is_nullable ( ) ;
219+ Some ( DataType :: RunEndEncoded (
220+ Arc :: new ( Field :: new ( "run_ends" , ends_dt, false ) ) ,
221+ Arc :: new ( Field :: new ( "values" , values_dt, values_nullable) ) ,
222+ ) )
223+ }
224+
225+ fn to_arrow_array (
226+ array : & RunEndArray ,
227+ data_type : & DataType ,
228+ ctx : & mut ExecutionCtx ,
229+ ) -> VortexResult < Option < ArrowArrayRef > > {
230+ let DataType :: RunEndEncoded ( ends_field, values_field) = data_type else {
231+ return Ok ( None ) ;
232+ } ;
233+ let arrow_ends = array
234+ . ends ( )
235+ . clone ( )
236+ . execute_arrow ( Some ( ends_field. data_type ( ) ) , ctx) ?;
237+ let arrow_values = array
238+ . values ( )
239+ . clone ( )
240+ . execute_arrow ( Some ( values_field. data_type ( ) ) , ctx) ?;
241+ Ok ( Some ( build_run_array (
242+ & arrow_ends,
243+ & arrow_values,
244+ ends_field. data_type ( ) ,
245+ array. offset ( ) ,
246+ array. len ( ) ,
247+ ) ?) )
248+ }
249+
206250 fn execute ( array : & Self :: Array , ctx : & mut ExecutionCtx ) -> VortexResult < ExecutionStep > {
207251 run_end_canonicalize ( array, ctx) . map ( ExecutionStep :: Done )
208252 }
@@ -503,6 +547,59 @@ pub(super) fn run_end_canonicalize(
503547 } )
504548}
505549
550+ fn build_run_array (
551+ ends : & ArrowArrayRef ,
552+ values : & ArrowArrayRef ,
553+ ends_type : & DataType ,
554+ offset : usize ,
555+ length : usize ,
556+ ) -> VortexResult < ArrowArrayRef > {
557+ match ends_type {
558+ DataType :: Int16 => build_run_array_typed :: < Int16Type > ( ends, values, offset, length) ,
559+ DataType :: Int32 => build_run_array_typed :: < Int32Type > ( ends, values, offset, length) ,
560+ DataType :: Int64 => build_run_array_typed :: < Int64Type > ( ends, values, offset, length) ,
561+ _ => vortex_bail ! ( "Unsupported run-end index type: {:?}" , ends_type) ,
562+ }
563+ }
564+
565+ fn build_run_array_typed < R : RunEndIndexType > (
566+ ends : & ArrowArrayRef ,
567+ values : & ArrowArrayRef ,
568+ offset : usize ,
569+ length : usize ,
570+ ) -> VortexResult < ArrowArrayRef >
571+ where
572+ R :: Native : std:: ops:: Sub < Output = R :: Native > + Ord ,
573+ {
574+ let offset_native = R :: Native :: from_usize ( offset) . ok_or_else ( || {
575+ vortex_error:: vortex_err!( "Offset {offset} exceeds run-end index capacity" )
576+ } ) ?;
577+ let length_native = R :: Native :: from_usize ( length) . ok_or_else ( || {
578+ vortex_error:: vortex_err!( "Length {length} exceeds run-end index capacity" )
579+ } ) ?;
580+
581+ let ends_prim = ends. as_primitive :: < R > ( ) ;
582+ if offset == 0 && ends_prim. values ( ) . last ( ) == Some ( & length_native) {
583+ return Ok ( Arc :: new ( RunArray :: < R > :: try_new ( ends_prim, values) ?) as ArrowArrayRef ) ;
584+ }
585+
586+ // Trim to only include runs covering the [offset, offset+length) range.
587+ let num_runs = ( ends_prim
588+ . values ( )
589+ . partition_point ( |& e| e - offset_native < length_native)
590+ + 1 )
591+ . min ( ends_prim. len ( ) ) ;
592+
593+ let trimmed_ends = ends. slice ( 0 , num_runs) ;
594+ let trimmed_values = values. slice ( 0 , num_runs) ;
595+
596+ let adjusted = trimmed_ends
597+ . as_primitive :: < R > ( )
598+ . unary ( |end| ( end - offset_native) . min ( length_native) ) ;
599+
600+ Ok ( Arc :: new ( RunArray :: < R > :: try_new ( & adjusted, & trimmed_values) ?) as ArrowArrayRef )
601+ }
602+
506603#[ cfg( test) ]
507604mod tests {
508605 use vortex_array:: IntoArray ;
0 commit comments