@@ -37,6 +37,7 @@ use datafusion::physical_plan::{
3737} ;
3838use datafusion_common:: { DataFusionError , internal_err} ;
3939use datafusion_physical_expr:: PhysicalSortExpr ;
40+ use datafusion_physical_expr:: expressions:: { DynamicFilterPhysicalExpr , lit} ;
4041use futures:: { Stream , StreamExt } ;
4142use itertools:: Itertools ;
4243use parking_lot:: RwLock ;
@@ -60,7 +61,6 @@ pub struct PartSortExec {
6061 metrics : ExecutionPlanMetricsSet ,
6162 partition_ranges : Vec < Vec < PartitionRange > > ,
6263 properties : PlanProperties ,
63- filter : Arc < RwLock < TopKDynamicFilters > > ,
6464}
6565
6666impl PartSortExec {
@@ -69,7 +69,6 @@ impl PartSortExec {
6969 limit : Option < usize > ,
7070 partition_ranges : Vec < Vec < PartitionRange > > ,
7171 input : Arc < dyn ExecutionPlan > ,
72- filter : Arc < RwLock < TopKDynamicFilters > > ,
7372 ) -> Self {
7473 let metrics = ExecutionPlanMetricsSet :: new ( ) ;
7574 let properties = input. properties ( ) ;
@@ -87,7 +86,6 @@ impl PartSortExec {
8786 metrics,
8887 partition_ranges,
8988 properties,
90- filter,
9189 }
9290 }
9391
@@ -115,7 +113,6 @@ impl PartSortExec {
115113 input_stream,
116114 self . partition_ranges [ partition] . clone ( ) ,
117115 partition,
118- self . filter . clone ( ) ,
119116 ) ?) as _ ;
120117
121118 Ok ( df_stream)
@@ -172,7 +169,6 @@ impl ExecutionPlan for PartSortExec {
172169 self . limit ,
173170 self . partition_ranges . clone ( ) ,
174171 new_input. clone ( ) ,
175- self . filter . clone ( ) ,
176172 ) ) )
177173 }
178174
@@ -234,7 +230,6 @@ struct PartSortStream {
234230 metrics : BaselineMetrics ,
235231 context : Arc < TaskContext > ,
236232 root_metrics : ExecutionPlanMetricsSet ,
237- filter : Arc < RwLock < TopKDynamicFilters > > ,
238233}
239234
240235impl PartSortStream {
@@ -245,9 +240,11 @@ impl PartSortStream {
245240 input : DfSendableRecordBatchStream ,
246241 partition_ranges : Vec < PartitionRange > ,
247242 partition : usize ,
248- filter : Arc < RwLock < TopKDynamicFilters > > ,
249243 ) -> datafusion_common:: Result < Self > {
250244 let buffer = if let Some ( limit) = limit {
245+ let filter = Arc :: new ( RwLock :: new ( TopKDynamicFilters :: new ( Arc :: new (
246+ DynamicFilterPhysicalExpr :: new ( vec ! [ ] , lit ( true ) ) ,
247+ ) ) ) ) ;
251248 PartSortBuffer :: Top (
252249 TopK :: try_new (
253250 partition,
@@ -258,7 +255,7 @@ impl PartSortStream {
258255 context. session_config ( ) . batch_size ( ) ,
259256 context. runtime_env ( ) ,
260257 & sort. metrics ,
261- filter. clone ( ) ,
258+ filter,
262259 ) ?,
263260 0 ,
264261 )
@@ -283,7 +280,6 @@ impl PartSortStream {
283280 metrics : BaselineMetrics :: new ( & sort. metrics , partition) ,
284281 context,
285282 root_metrics : sort. metrics . clone ( ) ,
286- filter,
287283 } )
288284 }
289285}
@@ -507,6 +503,9 @@ impl PartSortStream {
507503
508504 /// Internal method for sorting `Top` buffer (with limit).
509505 fn sort_top_buffer ( & mut self ) -> datafusion_common:: Result < DfRecordBatch > {
506+ let filter = Arc :: new ( RwLock :: new ( TopKDynamicFilters :: new ( Arc :: new (
507+ DynamicFilterPhysicalExpr :: new ( vec ! [ ] , lit ( true ) ) ,
508+ ) ) ) ) ;
510509 let new_top_buffer = TopK :: try_new (
511510 self . partition ,
512511 self . schema ( ) . clone ( ) ,
@@ -516,7 +515,7 @@ impl PartSortStream {
516515 self . context . session_config ( ) . batch_size ( ) ,
517516 self . context . runtime_env ( ) ,
518517 & self . root_metrics ,
519- self . filter . clone ( ) ,
518+ filter,
520519 ) ?;
521520 let PartSortBuffer :: Top ( top_k, _) =
522521 std:: mem:: replace ( & mut self . buffer , PartSortBuffer :: Top ( new_top_buffer, 0 ) )
@@ -685,7 +684,6 @@ mod test {
685684 use arrow:: json:: ArrayWriter ;
686685 use arrow_schema:: { DataType , Field , Schema , SortOptions , TimeUnit } ;
687686 use common_time:: Timestamp ;
688- use datafusion:: physical_plan:: sorts:: sort:: SortExec ;
689687 use datafusion_physical_expr:: expressions:: Column ;
690688 use futures:: StreamExt ;
691689 use store_api:: region_engine:: PartitionRange ;
@@ -1044,19 +1042,16 @@ mod test {
10441042 cols
10451043 } )
10461044 . collect_vec ( ) ;
1047- let mock_input = Arc :: new ( MockInputExec :: new ( batches, schema. clone ( ) ) ) ;
1045+ let mock_input = MockInputExec :: new ( batches, schema. clone ( ) ) ;
10481046
1049- let expr = PhysicalSortExpr {
1050- expr : Arc :: new ( Column :: new ( "ts" , 0 ) ) ,
1051- options : opt,
1052- } ;
1053- let sort_exec = SortExec :: new ( [ expr. clone ( ) ] . into ( ) , mock_input. clone ( ) ) ;
10541047 let exec = PartSortExec :: new (
1055- expr,
1048+ PhysicalSortExpr {
1049+ expr : Arc :: new ( Column :: new ( "ts" , 0 ) ) ,
1050+ options : opt,
1051+ } ,
10561052 limit,
10571053 vec ! [ ranges. clone( ) ] ,
1058- mock_input,
1059- sort_exec. create_filter ( ) ,
1054+ Arc :: new ( mock_input) ,
10601055 ) ;
10611056
10621057 let exec_stream = exec. execute ( 0 , Arc :: new ( TaskContext :: default ( ) ) ) . unwrap ( ) ;
0 commit comments