@@ -57,12 +57,12 @@ use datafusion_common::{
5757use datafusion_execution:: TaskContext ;
5858use datafusion_expr:: Operator ;
5959use datafusion_physical_expr:: equivalence:: ProjectionMapping ;
60- use datafusion_physical_expr:: expressions:: { BinaryExpr , Column , lit} ;
60+ use datafusion_physical_expr:: expressions:: { BinaryExpr , Column , Literal , lit} ;
6161use datafusion_physical_expr:: intervals:: utils:: check_support;
6262use datafusion_physical_expr:: utils:: { collect_columns, reassign_expr_columns} ;
6363use datafusion_physical_expr:: {
64- AcrossPartitions , AnalysisContext , ConstExpr , ExprBoundaries , PhysicalExpr , analyze ,
65- conjunction, split_conjunction,
64+ AcrossPartitions , AnalysisContext , ConstExpr , EquivalenceProperties , ExprBoundaries ,
65+ PhysicalExpr , analyze , conjunction, split_conjunction,
6666} ;
6767
6868use datafusion_physical_expr_common:: physical_expr:: fmt_sql;
@@ -348,6 +348,20 @@ impl FilterExec {
348348 } )
349349 }
350350
351+ /// Returns the `AcrossPartitions` value for `expr` if it is constant:
352+ /// either already known constant in `input_eqs`, or a `Literal`
353+ /// (which is inherently constant across all partitions).
354+ fn expr_constant_or_literal (
355+ expr : & Arc < dyn PhysicalExpr > ,
356+ input_eqs : & EquivalenceProperties ,
357+ ) -> Option < AcrossPartitions > {
358+ input_eqs. is_expr_constant ( expr) . or_else ( || {
359+ expr. as_any ( )
360+ . downcast_ref :: < Literal > ( )
361+ . map ( |l| AcrossPartitions :: Uniform ( Some ( l. value ( ) . clone ( ) ) ) )
362+ } )
363+ }
364+
351365 fn extend_constants (
352366 input : & Arc < dyn ExecutionPlan > ,
353367 predicate : & Arc < dyn PhysicalExpr > ,
@@ -360,18 +374,24 @@ impl FilterExec {
360374 if let Some ( binary) = conjunction. as_any ( ) . downcast_ref :: < BinaryExpr > ( )
361375 && binary. op ( ) == & Operator :: Eq
362376 {
363- // Filter evaluates to single value for all partitions
364- if input_eqs. is_expr_constant ( binary. left ( ) ) . is_some ( ) {
365- let across = input_eqs
366- . is_expr_constant ( binary. right ( ) )
367- . unwrap_or_default ( ) ;
377+ // Check if either side is constant — either already known
378+ // constant from the input equivalence properties, or a literal
379+ // value (which is inherently constant across all partitions).
380+ let left_const = Self :: expr_constant_or_literal ( binary. left ( ) , input_eqs) ;
381+ let right_const =
382+ Self :: expr_constant_or_literal ( binary. right ( ) , input_eqs) ;
383+
384+ if let Some ( left_across) = left_const {
385+ // LEFT is constant, so RIGHT must also be constant.
386+ // Use RIGHT's known across value if available, otherwise
387+ // propagate LEFT's (e.g. Uniform from a literal).
388+ let across = right_const. unwrap_or ( left_across) ;
368389 res_constants
369390 . push ( ConstExpr :: new ( Arc :: clone ( binary. right ( ) ) , across) ) ;
370- } else if input_eqs. is_expr_constant ( binary. right ( ) ) . is_some ( ) {
371- let across = input_eqs
372- . is_expr_constant ( binary. left ( ) )
373- . unwrap_or_default ( ) ;
374- res_constants. push ( ConstExpr :: new ( Arc :: clone ( binary. left ( ) ) , across) ) ;
391+ } else if let Some ( right_across) = right_const {
392+ // RIGHT is constant, so LEFT must also be constant.
393+ res_constants
394+ . push ( ConstExpr :: new ( Arc :: clone ( binary. left ( ) ) , right_across) ) ;
375395 }
376396 }
377397 }
@@ -1003,6 +1023,19 @@ fn collect_columns_from_predicate_inner(
10031023 let predicates = split_conjunction ( predicate) ;
10041024 predicates. into_iter ( ) . for_each ( |p| {
10051025 if let Some ( binary) = p. as_any ( ) . downcast_ref :: < BinaryExpr > ( ) {
1026+ // Only extract pairs where at least one side is a Column reference.
1027+ // Pairs like `complex_expr = literal` should not create equivalence
1028+ // classes — the literal could appear in many unrelated expressions
1029+ // (e.g. sort keys), and normalize_expr's deep traversal would
1030+ // replace those occurrences with the complex expression, corrupting
1031+ // sort orderings. Constant propagation for such pairs is handled
1032+ // separately by `extend_constants`.
1033+ let has_direct_column_operand =
1034+ binary. left ( ) . as_any ( ) . downcast_ref :: < Column > ( ) . is_some ( )
1035+ || binary. right ( ) . as_any ( ) . downcast_ref :: < Column > ( ) . is_some ( ) ;
1036+ if !has_direct_column_operand {
1037+ return ;
1038+ }
10061039 match binary. op ( ) {
10071040 Operator :: Eq => {
10081041 eq_predicate_columns. push ( ( binary. left ( ) , binary. right ( ) ) )
@@ -2155,6 +2188,48 @@ mod tests {
21552188 Ok ( ( ) )
21562189 }
21572190
2191+ /// Regression test for https://github.com/apache/datafusion/issues/20194
2192+ ///
2193+ /// `collect_columns_from_predicate_inner` should only extract equality
2194+ /// pairs where at least one side is a Column. Pairs like
2195+ /// `complex_expr = literal` must not create equivalence classes because
2196+ /// `normalize_expr`'s deep traversal would replace the literal inside
2197+ /// unrelated expressions (e.g. sort keys) with the complex expression.
2198+ #[ test]
2199+ fn test_collect_columns_skips_non_column_pairs ( ) -> Result < ( ) > {
2200+ let schema = test:: aggr_test_schema ( ) ;
2201+
2202+ // Simulate: nvl(c2, 0) = 0 → (c2 IS DISTINCT FROM 0) = 0
2203+ // Neither side is a Column, so this should NOT be extracted.
2204+ let complex_expr: Arc < dyn PhysicalExpr > = binary (
2205+ col ( "c2" , & schema) ?,
2206+ Operator :: IsDistinctFrom ,
2207+ lit ( 0u32 ) ,
2208+ & schema,
2209+ ) ?;
2210+ let predicate: Arc < dyn PhysicalExpr > =
2211+ binary ( complex_expr, Operator :: Eq , lit ( 0u32 ) , & schema) ?;
2212+
2213+ let ( equal_pairs, _) = collect_columns_from_predicate_inner ( & predicate) ;
2214+ assert_eq ! (
2215+ 0 ,
2216+ equal_pairs. len( ) ,
2217+ "Should not extract equality pairs where neither side is a Column"
2218+ ) ;
2219+
2220+ // But col = literal should still be extracted
2221+ let predicate: Arc < dyn PhysicalExpr > =
2222+ binary ( col ( "c2" , & schema) ?, Operator :: Eq , lit ( 0u32 ) , & schema) ?;
2223+ let ( equal_pairs, _) = collect_columns_from_predicate_inner ( & predicate) ;
2224+ assert_eq ! (
2225+ 1 ,
2226+ equal_pairs. len( ) ,
2227+ "Should extract equality pairs where one side is a Column"
2228+ ) ;
2229+
2230+ Ok ( ( ) )
2231+ }
2232+
21582233 /// Columns with Absent min/max statistics should remain Absent after
21592234 /// FilterExec.
21602235 #[ tokio:: test]
0 commit comments