@@ -24,9 +24,9 @@ use crate::make_array::make_array_inner;
2424use crate :: utils:: { align_array_dimensions, check_datatypes, make_scalar_function} ;
2525use arrow:: array:: {
2626 Array , ArrayData , ArrayRef , Capacities , GenericListArray , MutableArrayData ,
27- NullBufferBuilder , OffsetSizeTrait ,
27+ OffsetSizeTrait ,
2828} ;
29- use arrow:: buffer:: OffsetBuffer ;
29+ use arrow:: buffer:: { NullBuffer , OffsetBuffer } ;
3030use arrow:: datatypes:: { DataType , Field } ;
3131use datafusion_common:: Result ;
3232use datafusion_common:: utils:: {
@@ -352,7 +352,7 @@ impl ScalarUDFImpl for ArrayConcat {
352352 }
353353}
354354
355- fn array_concat_inner ( args : & [ ArrayRef ] ) -> Result < ArrayRef > {
355+ pub fn array_concat_inner ( args : & [ ArrayRef ] ) -> Result < ArrayRef > {
356356 if args. is_empty ( ) {
357357 return exec_err ! ( "array_concat expects at least one argument" ) ;
358358 }
@@ -396,58 +396,65 @@ fn concat_internal<O: OffsetSizeTrait>(args: &[ArrayRef]) -> Result<ArrayRef> {
396396 . iter ( )
397397 . map ( |arg| as_generic_list_array :: < O > ( arg) )
398398 . collect :: < Result < Vec < _ > > > ( ) ?;
399- // Assume number of rows is the same for all arrays
400399 let row_count = list_arrays[ 0 ] . len ( ) ;
401400
402- let mut array_lengths = vec ! [ ] ;
403- let mut arrays = vec ! [ ] ;
404- let mut valid = NullBufferBuilder :: new ( row_count) ;
405- for i in 0 ..row_count {
406- let nulls = list_arrays
401+ // Extract underlying values ArrayData from each list array for MutableArrayData.
402+ let values_data: Vec < ArrayData > =
403+ list_arrays. iter ( ) . map ( |la| la. values ( ) . to_data ( ) ) . collect ( ) ;
404+ let values_data_refs: Vec < & ArrayData > = values_data. iter ( ) . collect ( ) ;
405+
406+ // Estimate capacity as the sum of all values arrays' lengths.
407+ let total_capacity: usize = values_data. iter ( ) . map ( |d| d. len ( ) ) . sum ( ) ;
408+
409+ let mut mutable = MutableArrayData :: with_capacities (
410+ values_data_refs,
411+ false ,
412+ Capacities :: Array ( total_capacity) ,
413+ ) ;
414+ let mut offsets: Vec < O > = Vec :: with_capacity ( row_count + 1 ) ;
415+ offsets. push ( O :: zero ( ) ) ;
416+
417+ // Compute the output null buffer: a row is null only if null in ALL input
418+ // arrays. This is the bitwise OR of validity bits (valid if valid in ANY
419+ // input). If any array has no null buffer (all valid), no output row can be
420+ // null.
421+ let nulls = list_arrays
422+ . iter ( )
423+ . filter_map ( |la| la. nulls ( ) )
424+ . collect :: < Vec < _ > > ( ) ;
425+ let valid = if nulls. len ( ) == list_arrays. len ( ) {
426+ nulls
407427 . iter ( )
408- . map ( |arr| arr. is_null ( i) )
409- . collect :: < Vec < _ > > ( ) ;
410-
411- // If all the arrays are null, the concatenated array is null
412- let is_null = nulls. iter ( ) . all ( |& x| x) ;
413- if is_null {
414- array_lengths. push ( 0 ) ;
415- valid. append_null ( ) ;
416- } else {
417- // Get all the arrays on i-th row
418- let values = list_arrays
419- . iter ( )
420- . map ( |arr| arr. value ( i) )
421- . collect :: < Vec < _ > > ( ) ;
422-
423- let elements = values
424- . iter ( )
425- . map ( |a| a. as_ref ( ) )
426- . collect :: < Vec < & dyn Array > > ( ) ;
427-
428- // Concatenated array on i-th row
429- let concatenated_array = arrow:: compute:: concat ( elements. as_slice ( ) ) ?;
430- array_lengths. push ( concatenated_array. len ( ) ) ;
431- arrays. push ( concatenated_array) ;
432- valid. append_non_null ( ) ;
428+ . map ( |n| n. inner ( ) . clone ( ) )
429+ . reduce ( |a, b| & a | & b)
430+ . map ( NullBuffer :: new)
431+ } else {
432+ None
433+ } ;
434+
435+ for row_idx in 0 ..row_count {
436+ for ( arr_idx, list_array) in list_arrays. iter ( ) . enumerate ( ) {
437+ if list_array. is_null ( row_idx) {
438+ continue ;
439+ }
440+ let start = list_array. offsets ( ) [ row_idx] . to_usize ( ) . unwrap ( ) ;
441+ let end = list_array. offsets ( ) [ row_idx + 1 ] . to_usize ( ) . unwrap ( ) ;
442+ if start < end {
443+ mutable. extend ( arr_idx, start, end) ;
444+ }
433445 }
446+ offsets. push ( O :: usize_as ( mutable. len ( ) ) ) ;
434447 }
435- // Assume all arrays have the same data type
436- let data_type = list_arrays[ 0 ] . value_type ( ) ;
437448
438- let elements = arrays
439- . iter ( )
440- . map ( |a| a. as_ref ( ) )
441- . collect :: < Vec < & dyn Array > > ( ) ;
449+ let data_type = list_arrays[ 0 ] . value_type ( ) ;
450+ let data = mutable. freeze ( ) ;
442451
443- let list_arr = GenericListArray :: < O > :: new (
452+ Ok ( Arc :: new ( GenericListArray :: < O > :: try_new (
444453 Arc :: new ( Field :: new_list_field ( data_type, true ) ) ,
445- OffsetBuffer :: from_lengths ( array_lengths) ,
446- Arc :: new ( arrow:: compute:: concat ( elements. as_slice ( ) ) ?) ,
447- valid. finish ( ) ,
448- ) ;
449-
450- Ok ( Arc :: new ( list_arr) )
454+ OffsetBuffer :: new ( offsets. into ( ) ) ,
455+ arrow:: array:: make_array ( data) ,
456+ valid,
457+ ) ?) )
451458}
452459
453460// Kernel functions
0 commit comments