@@ -24,7 +24,8 @@ use crate::physical_optimizer::test_utils::{
2424 coalesce_partitions_exec, create_test_schema, create_test_schema2,
2525 create_test_schema3, filter_exec, global_limit_exec, hash_join_exec,
2626 local_limit_exec, memory_exec, parquet_exec, parquet_exec_with_sort, projection_exec,
27- repartition_exec, sort_exec, sort_exec_with_fetch, sort_expr, sort_expr_options,
27+ repartition_exec, simple_projection_exec, sort_exec, sort_exec_with_fetch,
28+ sort_exec_with_preserve_partitioning, sort_expr, sort_expr_options,
2829 sort_merge_join_exec, sort_preserving_merge_exec,
2930 sort_preserving_merge_exec_with_fetch, spr_repartition_exec, stream_exec_ordered,
3031 union_exec,
@@ -458,6 +459,102 @@ async fn output_requirement_adds_merge_after_partition_preserving_sort() -> Resu
458459 Ok ( ( ) )
459460}
460461
462+ /// Regression test: when `OutputRequirementExec(SinglePartition)` wraps a plan
463+ /// that already contains `SortPreservingMergeExec`, sort pushdown must not add
464+ /// a second `SortPreservingMergeExec` below the existing one.
465+ #[ test]
466+ fn test_no_extra_spm_from_output_requirement_single_partition ( ) -> Result < ( ) > {
467+ let schema = create_test_schema ( ) ?;
468+ let sort_exprs: LexOrdering = [ sort_expr ( "nullable_col" , & schema) ] . into ( ) ;
469+ let requirement = [ PhysicalSortRequirement :: new (
470+ col ( "nullable_col" , & schema) ?,
471+ Some ( SortOptions :: new ( false , true ) ) ,
472+ ) ]
473+ . into ( ) ;
474+
475+ // Plan entering pushdown_sorts:
476+ // OutputRequirementExec (dist=SinglePartition)
477+ // SortPreservingMergeExec [nullable_col@0]
478+ // SortExec [nullable_col@0] (preserve_partitioning=true)
479+ // RepartitionExec (10 partitions)
480+ // DataSource
481+ let source = memory_exec ( & schema) ;
482+ let repartitioned = repartition_exec ( source) ;
483+ let sorted = sort_exec_with_preserve_partitioning ( sort_exprs. clone ( ) , repartitioned) ;
484+ let merged = sort_preserving_merge_exec ( sort_exprs. clone ( ) , sorted) ;
485+ let plan: Arc < dyn ExecutionPlan > = Arc :: new ( OutputRequirementExec :: new (
486+ merged,
487+ Some ( OrderingRequirements :: new ( requirement) ) ,
488+ Distribution :: SinglePartition ,
489+ None ,
490+ ) ) ;
491+
492+ let mut sort_pushdown = SortPushDown :: new_default ( Arc :: clone ( & plan) ) ;
493+ assign_initial_requirements ( & mut sort_pushdown) ;
494+ let result = pushdown_sorts ( sort_pushdown) ?;
495+
496+ // The plan is already optimal; no extra SortPreservingMergeExec should appear.
497+ assert_snapshot ! (
498+ displayable( result. plan. as_ref( ) ) . indent( true ) . to_string( ) ,
499+ @r"
500+ OutputRequirementExec: order_by=[(nullable_col@0, asc)], dist_by=SinglePartition
501+ SortPreservingMergeExec: [nullable_col@0 ASC]
502+ SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[true]
503+ RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
504+ DataSourceExec: partitions=1, partition_sizes=[0]
505+ "
506+ ) ;
507+ Ok ( ( ) )
508+ }
509+
510+ /// Positive test: when `OutputRequirementExec` carries `SinglePartition` and
511+ /// sort pushdown reaches a multi-partition node through a projection, it must
512+ /// insert both `SortExec(preserve_partitioning=true)` AND
513+ /// `SortPreservingMergeExec` — the core behaviour added by commit 45620e982.
514+ #[ test]
515+ fn test_sort_pushdown_adds_spm_for_single_partition_requirement ( ) -> Result < ( ) > {
516+ let schema = create_test_schema ( ) ?;
517+ let requirement = [ PhysicalSortRequirement :: new (
518+ col ( "nullable_col" , & schema) ?,
519+ Some ( SortOptions :: new ( false , true ) ) ,
520+ ) ]
521+ . into ( ) ;
522+
523+ // Plan entering pushdown_sorts:
524+ // OutputRequirementExec (dist=SinglePartition, order=[nullable_col@0])
525+ // ProjectionExec (identity)
526+ // RepartitionExec (10 partitions)
527+ // DataSource
528+ let source = memory_exec ( & schema) ;
529+ let repartitioned = repartition_exec ( source) ;
530+ let projected = simple_projection_exec ( repartitioned, vec ! [ 0 , 1 ] ) ;
531+ let plan: Arc < dyn ExecutionPlan > = Arc :: new ( OutputRequirementExec :: new (
532+ projected,
533+ Some ( OrderingRequirements :: new ( requirement) ) ,
534+ Distribution :: SinglePartition ,
535+ None ,
536+ ) ) ;
537+
538+ let mut sort_pushdown = SortPushDown :: new_default ( Arc :: clone ( & plan) ) ;
539+ assign_initial_requirements ( & mut sort_pushdown) ;
540+ let result = pushdown_sorts ( sort_pushdown) ?;
541+
542+ // Sort is pushed through the projection; because SinglePartition is
543+ // required, add_sort_above_with_distribution wraps it in SPM.
544+ assert_snapshot ! (
545+ displayable( result. plan. as_ref( ) ) . indent( true ) . to_string( ) ,
546+ @r"
547+ OutputRequirementExec: order_by=[(nullable_col@0, asc)], dist_by=SinglePartition
548+ SortPreservingMergeExec: [nullable_col@0 ASC]
549+ SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[true]
550+ ProjectionExec: expr=[nullable_col@0 as nullable_col, non_nullable_col@1 as non_nullable_col]
551+ RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
552+ DataSourceExec: partitions=1, partition_sizes=[0]
553+ "
554+ ) ;
555+ Ok ( ( ) )
556+ }
557+
461558async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl (
462559 repartition_sorts : bool ,
463560) -> Result < String > {
0 commit comments