1818//! Defines the merge plan for executing partitions in parallel and then merging the results
1919//! into a single partition
2020
21+ use std:: pin:: Pin ;
2122use std:: sync:: Arc ;
23+ use std:: task:: { Context , Poll } ;
2224
2325use super :: metrics:: { BaselineMetrics , ExecutionPlanMetricsSet , MetricsSet } ;
2426use super :: stream:: { ObservedStream , RecordBatchReceiverStream } ;
2527use super :: {
26- DisplayAs , ExecutionPlanProperties , PlanProperties , SendableRecordBatchStream ,
27- Statistics ,
28+ DisplayAs , ExecutionPlanProperties , PlanProperties , RecordBatchStream ,
29+ SendableRecordBatchStream , Statistics ,
2830} ;
31+ use crate :: coalesce:: { LimitedBatchCoalescer , PushBatchStatus } ;
2932use crate :: execution_plan:: { CardinalityEffect , EvaluationType , SchedulingType } ;
3033use crate :: filter_pushdown:: { FilterDescription , FilterPushdownPhase } ;
3134use crate :: projection:: { ProjectionExec , make_with_child} ;
3235use crate :: sort_pushdown:: SortOrderPushdownResult ;
3336use crate :: { DisplayFormatType , ExecutionPlan , Partitioning , check_if_same_properties} ;
3437use datafusion_physical_expr_common:: sort_expr:: PhysicalSortExpr ;
3538
39+ use arrow:: datatypes:: SchemaRef ;
40+ use arrow:: record_batch:: RecordBatch ;
3641use datafusion_common:: config:: ConfigOptions ;
3742use datafusion_common:: tree_node:: TreeNodeRecursion ;
3843use datafusion_common:: { Result , assert_eq_or_internal_err, internal_err} ;
3944use datafusion_execution:: TaskContext ;
4045use datafusion_physical_expr:: PhysicalExpr ;
46+ use futures:: ready;
47+ use futures:: stream:: { Stream , StreamExt } ;
4148
4249/// Merge execution plan executes partitions in parallel and combines them into a single
4350/// partition. No guarantees are made about the order of the resulting partition.
@@ -209,6 +216,8 @@ impl ExecutionPlan for CoalescePartitionsExec {
209216 let elapsed_compute = baseline_metrics. elapsed_compute ( ) . clone ( ) ;
210217 let _timer = elapsed_compute. timer ( ) ;
211218
219+ let batch_size = context. session_config ( ) . batch_size ( ) ;
220+
212221 // use a stream that allows each sender to put in at
213222 // least one result in an attempt to maximize
214223 // parallelism.
@@ -226,11 +235,23 @@ impl ExecutionPlan for CoalescePartitionsExec {
226235 }
227236
228237 let stream = builder. build ( ) ;
229- Ok ( Box :: pin ( ObservedStream :: new (
230- stream,
231- baseline_metrics,
232- self . fetch ,
233- ) ) )
238+ // Coalesce small batches from multiple partitions into
239+ // larger batches of target_batch_size. This improves
240+ // downstream performance (e.g. hash join build side
241+ // benefits from fewer, larger batches).
242+ Ok ( Box :: pin ( CoalescedStream {
243+ input : Box :: pin ( ObservedStream :: new (
244+ stream,
245+ baseline_metrics,
246+ self . fetch ,
247+ ) ) ,
248+ coalescer : LimitedBatchCoalescer :: new (
249+ self . schema ( ) ,
250+ batch_size,
251+ None , // fetch is already handled by ObservedStream
252+ ) ,
253+ completed : false ,
254+ } ) )
234255 }
235256 }
236257 }
@@ -347,6 +368,53 @@ impl ExecutionPlan for CoalescePartitionsExec {
347368 }
348369}
349370
371+ /// Stream that coalesces small batches into larger ones using
372+ /// [`LimitedBatchCoalescer`].
373+ struct CoalescedStream {
374+ input : SendableRecordBatchStream ,
375+ coalescer : LimitedBatchCoalescer ,
376+ completed : bool ,
377+ }
378+
379+ impl Stream for CoalescedStream {
380+ type Item = Result < RecordBatch > ;
381+
382+ fn poll_next (
383+ mut self : Pin < & mut Self > ,
384+ cx : & mut Context < ' _ > ,
385+ ) -> Poll < Option < Self :: Item > > {
386+ loop {
387+ if let Some ( batch) = self . coalescer . next_completed_batch ( ) {
388+ return Poll :: Ready ( Some ( Ok ( batch) ) ) ;
389+ }
390+ if self . completed {
391+ return Poll :: Ready ( None ) ;
392+ }
393+ let input_batch = ready ! ( self . input. poll_next_unpin( cx) ) ;
394+ match input_batch {
395+ None => {
396+ self . completed = true ;
397+ self . coalescer . finish ( ) ?;
398+ }
399+ Some ( Ok ( batch) ) => match self . coalescer . push_batch ( batch) ? {
400+ PushBatchStatus :: Continue => { }
401+ PushBatchStatus :: LimitReached => {
402+ self . completed = true ;
403+ self . coalescer . finish ( ) ?;
404+ }
405+ } ,
406+ other => return Poll :: Ready ( other) ,
407+ }
408+ }
409+ }
410+ }
411+
412+ impl RecordBatchStream for CoalescedStream {
413+ fn schema ( & self ) -> SchemaRef {
414+ self . coalescer . schema ( )
415+ }
416+ }
417+
350418#[ cfg( test) ]
351419mod tests {
352420 use super :: * ;
@@ -378,10 +446,9 @@ mod tests {
378446 1
379447 ) ;
380448
381- // the result should contain 4 batches (one per input partition )
449+ // the result should contain all rows (coalesced into fewer batches )
382450 let iter = merge. execute ( 0 , task_ctx) ?;
383451 let batches = common:: collect ( iter) . await ?;
384- assert_eq ! ( batches. len( ) , num_partitions) ;
385452
386453 // there should be a total of 400 rows (100 per each partition)
387454 let row_count: usize = batches. iter ( ) . map ( |batch| batch. num_rows ( ) ) . sum ( ) ;
0 commit comments