@@ -1454,14 +1454,14 @@ mod tests {
14541454 use arrow:: compute:: SortOptions ;
14551455 use arrow:: datatypes:: * ;
14561456 use datafusion_common:: cast:: as_primitive_array;
1457- use datafusion_common:: test_util:: batches_to_string;
14581457 use datafusion_common:: config:: ConfigOptions ;
1458+ use datafusion_common:: test_util:: batches_to_string;
14591459 use datafusion_common:: { DataFusionError , Result , ScalarValue } ;
14601460 use datafusion_execution:: RecordBatchStream ;
1461+ use datafusion_execution:: config:: SessionConfig ;
14611462 use datafusion_execution:: memory_pool:: {
14621463 GreedyMemoryPool , MemoryConsumer , MemoryPool ,
14631464 } ;
1464- use datafusion_execution:: config:: SessionConfig ;
14651465 use datafusion_execution:: runtime_env:: RuntimeEnvBuilder ;
14661466 use datafusion_physical_expr:: EquivalenceProperties ;
14671467 use datafusion_physical_expr:: expressions:: { Column , Literal } ;
@@ -2938,4 +2938,48 @@ mod tests {
29382938
29392939 Ok ( ( ) )
29402940 }
2941+
2942+ #[ test]
2943+ fn test_sort_with_fetch_allows_topk_self_filter_in_post_phase ( ) -> Result < ( ) > {
2944+ let schema = Arc :: new ( Schema :: new ( vec ! [ Field :: new( "a" , DataType :: Int32 , false ) ] ) ) ;
2945+ let input = Arc :: new ( EmptyExec :: new ( Arc :: clone ( & schema) ) ) ;
2946+ let sort = SortExec :: new (
2947+ [ PhysicalSortExpr :: new_default ( Arc :: new ( Column :: new ( "a" , 0 ) ) ) ] . into ( ) ,
2948+ input,
2949+ )
2950+ . with_fetch ( Some ( 10 ) ) ;
2951+
2952+ // with_fetch(Some(_)) creates the TopK dynamic filter automatically.
2953+ assert ! ( sort. filter. is_some( ) , "TopK filter should be created" ) ;
2954+
2955+ let parent_filter: Arc < dyn PhysicalExpr > = Arc :: new ( Column :: new ( "a" , 0 ) ) ;
2956+ let mut config = ConfigOptions :: new ( ) ;
2957+ config. optimizer . enable_topk_dynamic_filter_pushdown = true ;
2958+
2959+ let desc = sort. gather_filters_for_pushdown (
2960+ FilterPushdownPhase :: Post ,
2961+ vec ! [ parent_filter] ,
2962+ & config,
2963+ ) ?;
2964+
2965+ // Parent filters should be blocked in Post phase when fetch is set.
2966+ let parent_filters = desc. parent_filters ( ) ;
2967+ assert_eq ! ( parent_filters. len( ) , 1 ) ;
2968+ assert_eq ! ( parent_filters[ 0 ] . len( ) , 1 ) ;
2969+ assert ! (
2970+ matches!( parent_filters[ 0 ] [ 0 ] . discriminant, PushedDown :: No ) ,
2971+ "Parent filter should be unsupported in Post phase when sort has fetch"
2972+ ) ;
2973+
2974+ // The TopK self-filter should still be allowed through.
2975+ let self_filters = desc. self_filters ( ) ;
2976+ assert_eq ! ( self_filters. len( ) , 1 ) ;
2977+ assert_eq ! (
2978+ self_filters[ 0 ] . len( ) ,
2979+ 1 ,
2980+ "TopK dynamic self-filter should be pushed down"
2981+ ) ;
2982+
2983+ Ok ( ( ) )
2984+ }
29412985}
0 commit comments