@@ -52,8 +52,7 @@ use datafusion_common::stats::Precision;
5252use datafusion_common:: tree_node:: TreeNodeRecursion ;
5353use datafusion_common:: utils:: transpose;
5454use datafusion_common:: {
55- ColumnStatistics , DataFusionError , HashMap , assert_or_internal_err,
56- internal_datafusion_err, internal_err,
55+ ColumnStatistics , DataFusionError , HashMap , assert_or_internal_err, internal_err,
5756} ;
5857use datafusion_common:: { Result , not_impl_err} ;
5958use datafusion_common_runtime:: SpawnedTask ;
@@ -681,46 +680,8 @@ impl BatchPartitioner {
681680 // Finished building index-arrays for output partitions
682681 timer. done ( ) ;
683682
684- // Borrowing partitioner timer to prevent moving `self` to closure
685- let partitioner_timer = & self . timer ;
686-
687- let mut partitioned_batches = vec ! [ ] ;
688- for ( partition, p_indices) in indices. iter_mut ( ) . enumerate ( ) {
689- if !p_indices. is_empty ( ) {
690- let taken_indices = std:: mem:: take ( p_indices) ;
691- let indices_array: PrimitiveArray < UInt32Type > =
692- taken_indices. into ( ) ;
693-
694- // Tracking time required for repartitioned batches construction
695- let _timer = partitioner_timer. timer ( ) ;
696-
697- // Produce batches based on indices
698- let columns =
699- take_arrays ( batch. columns ( ) , & indices_array, None ) ?;
700-
701- let mut options = RecordBatchOptions :: new ( ) ;
702- options = options. with_row_count ( Some ( indices_array. len ( ) ) ) ;
703- let batch = RecordBatch :: try_new_with_options (
704- batch. schema ( ) ,
705- columns,
706- & options,
707- )
708- . unwrap ( ) ;
709-
710- partitioned_batches. push ( Ok ( ( partition, batch) ) ) ;
711-
712- // Return the taken vec
713- let ( _, buffer, _) = indices_array. into_parts ( ) ;
714- let mut vec =
715- buffer. into_inner ( ) . into_vec :: < u32 > ( ) . map_err ( |e| {
716- internal_datafusion_err ! (
717- "Could not convert buffer to vec: {e:?}"
718- )
719- } ) ?;
720- vec. clear ( ) ;
721- * p_indices = vec;
722- }
723- }
683+ let partitioned_batches =
684+ Self :: partition_grouped_take ( & batch, indices, & self . timer ) ?;
724685
725686 Box :: new ( partitioned_batches. into_iter ( ) )
726687 }
@@ -736,6 +697,66 @@ impl BatchPartitioner {
736697 BatchPartitionerState :: Hash { indices, .. } => indices. len ( ) ,
737698 }
738699 }
700+
701+ /// Build repartitioned hash output batches using one `take` per input batch.
702+ ///
703+ /// The hash router first fills one index vector per output partition. This method
704+ /// concatenates those index vectors, performs one grouped `take_arrays`, and
705+ /// then returns each output partition as a slice of the reordered batch.
706+ ///
707+ /// For example, given partition indices:
708+ ///
709+ /// ```text
710+ /// partition 0: [2, 5]
711+ /// partition 1: []
712+ /// partition 2: [0, 3, 4]
713+ /// ```
714+ ///
715+ /// this method takes rows in `[2, 5, 0, 3, 4]` order once, then returns
716+ /// `partition 0 = slice(0, 2)` and `partition 2 = slice(2, 3)`.
717+ fn partition_grouped_take (
718+ batch : & RecordBatch ,
719+ indices : & mut [ Vec < u32 > ] ,
720+ timer : & metrics:: Time ,
721+ ) -> Result < Vec < Result < ( usize , RecordBatch ) > > > {
722+ let mut partition_ranges = Vec :: with_capacity ( indices. len ( ) ) ;
723+ let mut reordered_indices = Vec :: with_capacity ( batch. num_rows ( ) ) ;
724+
725+ for ( partition, p_indices) in indices. iter_mut ( ) . enumerate ( ) {
726+ if p_indices. is_empty ( ) {
727+ continue ;
728+ }
729+
730+ let start = reordered_indices. len ( ) ;
731+ reordered_indices. extend_from_slice ( p_indices) ;
732+ partition_ranges. push ( ( partition, start, p_indices. len ( ) ) ) ;
733+ p_indices. clear ( ) ;
734+ }
735+
736+ if reordered_indices. is_empty ( ) {
737+ return Ok ( vec ! [ ] ) ;
738+ }
739+
740+ let batches = {
741+ let _timer = timer. timer ( ) ;
742+ let indices_array: PrimitiveArray < UInt32Type > = reordered_indices. into ( ) ;
743+ let columns = take_arrays ( batch. columns ( ) , & indices_array, None ) ?;
744+
745+ let mut options = RecordBatchOptions :: new ( ) ;
746+ options = options. with_row_count ( Some ( indices_array. len ( ) ) ) ;
747+ let reordered_batch =
748+ RecordBatch :: try_new_with_options ( batch. schema ( ) , columns, & options) ?;
749+
750+ partition_ranges
751+ . into_iter ( )
752+ . map ( |( partition, start, len) | {
753+ Ok ( ( partition, reordered_batch. slice ( start, len) ) )
754+ } )
755+ . collect ( )
756+ } ;
757+
758+ Ok ( batches)
759+ }
739760}
740761
741762/// Maps `N` input partitions to `M` output partitions based on a
0 commit comments