@@ -22,11 +22,11 @@ use datafusion::error::Result as DFResult;
2222use datafusion:: physical_expr:: expressions:: Column ;
2323use datafusion:: physical_expr:: { LexOrdering , Partitioning } ;
2424use datafusion:: physical_optimizer:: PhysicalOptimizerRule ;
25- use datafusion:: physical_plan:: ExecutionPlan ;
2625use datafusion:: physical_plan:: aggregates:: { AggregateExec , AggregateMode } ;
2726use datafusion:: physical_plan:: repartition:: RepartitionExec ;
2827use datafusion:: physical_plan:: sorts:: sort:: SortExec ;
2928use datafusion:: physical_plan:: sorts:: sort_preserving_merge:: SortPreservingMergeExec ;
29+ use datafusion:: physical_plan:: { ExecutionPlan , ExecutionPlanProperties } ;
3030use quickwit_parquet_engine:: sorted_series:: SORTED_SERIES_COLUMN ;
3131
3232/// Replaces the inner sorted-series hash repartition in rollup plans with a
@@ -93,10 +93,17 @@ fn rewrite_sorted_series_final_aggregate(
9393 }
9494
9595 let ordering = sort. expr ( ) . clone ( ) ;
96- let partition_sort: Arc < dyn ExecutionPlan > = Arc :: new (
97- SortExec :: new ( ordering. clone ( ) , Arc :: clone ( repartition. input ( ) ) )
98- . with_preserve_partitioning ( true ) ,
99- ) ;
96+ let repartition_input = Arc :: clone ( repartition. input ( ) ) ;
97+ let partition_sort: Arc < dyn ExecutionPlan > = if repartition_input
98+ . equivalence_properties ( )
99+ . ordering_satisfy ( ordering. clone ( ) ) ?
100+ {
101+ repartition_input
102+ } else {
103+ Arc :: new (
104+ SortExec :: new ( ordering. clone ( ) , repartition_input) . with_preserve_partitioning ( true ) ,
105+ )
106+ } ;
100107 let merged: Arc < dyn ExecutionPlan > =
101108 Arc :: new ( SortPreservingMergeExec :: new ( ordering, partition_sort) ) ;
102109
@@ -133,7 +140,8 @@ fn ordering_starts_with_sorted_series(ordering: &LexOrdering) -> bool {
133140}
134141
135142fn is_sorted_series_column ( expr : & Arc < dyn datafusion:: physical_expr:: PhysicalExpr > ) -> bool {
136- expr. as_any ( )
137- . downcast_ref :: < Column > ( )
138- . is_some_and ( |column| column. name ( ) == SORTED_SERIES_COLUMN )
143+ match expr. as_any ( ) . downcast_ref :: < Column > ( ) {
144+ Some ( column) => column. name ( ) == SORTED_SERIES_COLUMN ,
145+ None => false ,
146+ }
139147}
0 commit comments