Skip to content

Commit 5af7361

Browse files
EeshanBembialamb
andauthored
fix: SanityCheckPlan error with window functions and NVL filter (#20231)
## Which issue does this PR close? Closes #20194 ## Rationale for this change A query with `ROW_NUMBER() OVER (... ORDER BY CASE WHEN col='0' THEN 1 ELSE 0 END)` combined with a filter `nvl(t2.value_2_3,'0')='0'` fails with a `SanityCheckPlan` error. This worked in 50.3.0 but broke in 52.1.0. ## What changes are included in this PR? **Root cause**: `collect_columns_from_predicate_inner` was extracting equality pairs where neither side was a `Column` (e.g. `nvl(col, '0') = '0'`), creating equivalence classes between complex expressions and literals. `normalize_expr`'s deep traversal would then replace the literal `'0'` inside unrelated sort/window CASE WHEN expressions with the complex NVL expression, corrupting the sort ordering and causing a mismatch between `SortExec`'s reported output ordering and `BoundedWindowAggExec`'s expected ordering. **Fix** (two changes in `filter.rs`): 1. **`collect_columns_from_predicate_inner`**: Only extract equality pairs where at least one side is a `Column` reference. This matches the function's documented intent ("Column-Pairs") and prevents complex-expression-to-literal equivalence classes from being created. 2. **`extend_constants`**: Recognize `Literal` expressions as inherently constant (previously only checked `is_expr_constant` on the input's equivalence properties, which doesn't know about literals). This ensures constant propagation still works for `complex_expr = literal` predicates — e.g. `nvl(col, '0')` is properly marked as constant after the filter. ## How was this tested? - Unit test `test_collect_columns_skips_non_column_pairs` verifying the filtering logic - Sqllogictest reproducing the exact query from the issue - Full test suites: equivalence tests (51 passed), physical-plan tests (1255 passed), physical-optimizer tests (20 passed) - Manual verification with datafusion-cli running the reproduction query ## Test plan - [x] Unit test for `collect_columns_from_predicate_inner` column filtering - [x] Sqllogictest regression test for #20194 - [x] Existing test suites pass - [x] Manual reproduction query succeeds --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent 8e02b8e commit 5af7361

2 files changed

Lines changed: 134 additions & 13 deletions

File tree

datafusion/physical-plan/src/filter.rs

Lines changed: 88 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,12 @@ use datafusion_common::{
5858
use datafusion_execution::TaskContext;
5959
use datafusion_expr::Operator;
6060
use datafusion_physical_expr::equivalence::ProjectionMapping;
61-
use datafusion_physical_expr::expressions::{BinaryExpr, Column, lit};
61+
use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal, lit};
6262
use datafusion_physical_expr::intervals::utils::check_support;
6363
use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns};
6464
use datafusion_physical_expr::{
65-
AcrossPartitions, AnalysisContext, ConstExpr, ExprBoundaries, PhysicalExpr, analyze,
66-
conjunction, split_conjunction,
65+
AcrossPartitions, AnalysisContext, ConstExpr, EquivalenceProperties, ExprBoundaries,
66+
PhysicalExpr, analyze, conjunction, split_conjunction,
6767
};
6868

6969
use datafusion_physical_expr_common::physical_expr::fmt_sql;
@@ -349,6 +349,20 @@ impl FilterExec {
349349
})
350350
}
351351

352+
/// Returns the `AcrossPartitions` value for `expr` if it is constant:
353+
/// either already known constant in `input_eqs`, or a `Literal`
354+
/// (which is inherently constant across all partitions).
355+
fn expr_constant_or_literal(
356+
expr: &Arc<dyn PhysicalExpr>,
357+
input_eqs: &EquivalenceProperties,
358+
) -> Option<AcrossPartitions> {
359+
input_eqs.is_expr_constant(expr).or_else(|| {
360+
expr.as_any()
361+
.downcast_ref::<Literal>()
362+
.map(|l| AcrossPartitions::Uniform(Some(l.value().clone())))
363+
})
364+
}
365+
352366
fn extend_constants(
353367
input: &Arc<dyn ExecutionPlan>,
354368
predicate: &Arc<dyn PhysicalExpr>,
@@ -361,18 +375,24 @@ impl FilterExec {
361375
if let Some(binary) = conjunction.as_any().downcast_ref::<BinaryExpr>()
362376
&& binary.op() == &Operator::Eq
363377
{
364-
// Filter evaluates to single value for all partitions
365-
if input_eqs.is_expr_constant(binary.left()).is_some() {
366-
let across = input_eqs
367-
.is_expr_constant(binary.right())
368-
.unwrap_or_default();
378+
// Check if either side is constant — either already known
379+
// constant from the input equivalence properties, or a literal
380+
// value (which is inherently constant across all partitions).
381+
let left_const = Self::expr_constant_or_literal(binary.left(), input_eqs);
382+
let right_const =
383+
Self::expr_constant_or_literal(binary.right(), input_eqs);
384+
385+
if let Some(left_across) = left_const {
386+
// LEFT is constant, so RIGHT must also be constant.
387+
// Use RIGHT's known across value if available, otherwise
388+
// propagate LEFT's (e.g. Uniform from a literal).
389+
let across = right_const.unwrap_or(left_across);
369390
res_constants
370391
.push(ConstExpr::new(Arc::clone(binary.right()), across));
371-
} else if input_eqs.is_expr_constant(binary.right()).is_some() {
372-
let across = input_eqs
373-
.is_expr_constant(binary.left())
374-
.unwrap_or_default();
375-
res_constants.push(ConstExpr::new(Arc::clone(binary.left()), across));
392+
} else if let Some(right_across) = right_const {
393+
// RIGHT is constant, so LEFT must also be constant.
394+
res_constants
395+
.push(ConstExpr::new(Arc::clone(binary.left()), right_across));
376396
}
377397
}
378398
}
@@ -1012,6 +1032,19 @@ fn collect_columns_from_predicate_inner(
10121032
let predicates = split_conjunction(predicate);
10131033
predicates.into_iter().for_each(|p| {
10141034
if let Some(binary) = p.as_any().downcast_ref::<BinaryExpr>() {
1035+
// Only extract pairs where at least one side is a Column reference.
1036+
// Pairs like `complex_expr = literal` should not create equivalence
1037+
// classes — the literal could appear in many unrelated expressions
1038+
// (e.g. sort keys), and normalize_expr's deep traversal would
1039+
// replace those occurrences with the complex expression, corrupting
1040+
// sort orderings. Constant propagation for such pairs is handled
1041+
// separately by `extend_constants`.
1042+
let has_direct_column_operand =
1043+
binary.left().as_any().downcast_ref::<Column>().is_some()
1044+
|| binary.right().as_any().downcast_ref::<Column>().is_some();
1045+
if !has_direct_column_operand {
1046+
return;
1047+
}
10151048
match binary.op() {
10161049
Operator::Eq => {
10171050
eq_predicate_columns.push((binary.left(), binary.right()))
@@ -2164,6 +2197,48 @@ mod tests {
21642197
Ok(())
21652198
}
21662199

2200+
/// Regression test for https://github.com/apache/datafusion/issues/20194
2201+
///
2202+
/// `collect_columns_from_predicate_inner` should only extract equality
2203+
/// pairs where at least one side is a Column. Pairs like
2204+
/// `complex_expr = literal` must not create equivalence classes because
2205+
/// `normalize_expr`'s deep traversal would replace the literal inside
2206+
/// unrelated expressions (e.g. sort keys) with the complex expression.
2207+
#[test]
2208+
fn test_collect_columns_skips_non_column_pairs() -> Result<()> {
2209+
let schema = test::aggr_test_schema();
2210+
2211+
// Simulate: nvl(c2, 0) = 0 → (c2 IS DISTINCT FROM 0) = 0
2212+
// Neither side is a Column, so this should NOT be extracted.
2213+
let complex_expr: Arc<dyn PhysicalExpr> = binary(
2214+
col("c2", &schema)?,
2215+
Operator::IsDistinctFrom,
2216+
lit(0u32),
2217+
&schema,
2218+
)?;
2219+
let predicate: Arc<dyn PhysicalExpr> =
2220+
binary(complex_expr, Operator::Eq, lit(0u32), &schema)?;
2221+
2222+
let (equal_pairs, _) = collect_columns_from_predicate_inner(&predicate);
2223+
assert_eq!(
2224+
0,
2225+
equal_pairs.len(),
2226+
"Should not extract equality pairs where neither side is a Column"
2227+
);
2228+
2229+
// But col = literal should still be extracted
2230+
let predicate: Arc<dyn PhysicalExpr> =
2231+
binary(col("c2", &schema)?, Operator::Eq, lit(0u32), &schema)?;
2232+
let (equal_pairs, _) = collect_columns_from_predicate_inner(&predicate);
2233+
assert_eq!(
2234+
1,
2235+
equal_pairs.len(),
2236+
"Should extract equality pairs where one side is a Column"
2237+
);
2238+
2239+
Ok(())
2240+
}
2241+
21672242
/// Columns with Absent min/max statistics should remain Absent after
21682243
/// FilterExec.
21692244
#[tokio::test]

datafusion/sqllogictest/test_files/window.slt

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6081,3 +6081,49 @@ WHERE acctbal > (
60816081
);
60826082
----
60836083
1
6084+
6085+
# Regression test for https://github.com/apache/datafusion/issues/20194
6086+
# Window function with CASE WHEN in ORDER BY combined with NVL filter
6087+
# should not trigger SanityCheckPlan error from equivalence normalization
6088+
# replacing literals in sort expressions with complex filter expressions.
6089+
statement ok
6090+
CREATE TABLE issue_20194_t1 (
6091+
value_1_1 decimal(25) NULL,
6092+
value_1_2 int NULL,
6093+
value_1_3 bigint NULL
6094+
);
6095+
6096+
statement ok
6097+
CREATE TABLE issue_20194_t2 (
6098+
value_2_1 bigint NULL,
6099+
value_2_2 varchar(140) NULL,
6100+
value_2_3 varchar(140) NULL
6101+
);
6102+
6103+
statement ok
6104+
INSERT INTO issue_20194_t1 (value_1_1, value_1_2, value_1_3) VALUES (6774502793, 10040029, 1120);
6105+
6106+
statement ok
6107+
INSERT INTO issue_20194_t2 (value_2_1, value_2_2, value_2_3) VALUES (1120, '0', '0');
6108+
6109+
query RII
6110+
SELECT
6111+
t1.value_1_1, t1.value_1_2,
6112+
ROW_NUMBER() OVER (
6113+
PARTITION BY t1.value_1_1, t1.value_1_2
6114+
ORDER BY
6115+
CASE WHEN t2.value_2_2 = '0' THEN 1 ELSE 0 END ASC,
6116+
CASE WHEN t2.value_2_3 = '0' THEN 1 ELSE 0 END ASC
6117+
) AS ord
6118+
FROM issue_20194_t1 t1
6119+
INNER JOIN issue_20194_t2 t2
6120+
ON t1.value_1_3 = t2.value_2_1
6121+
AND nvl(t2.value_2_3, '0') = '0';
6122+
----
6123+
6774502793 10040029 1
6124+
6125+
statement ok
6126+
DROP TABLE issue_20194_t1;
6127+
6128+
statement ok
6129+
DROP TABLE issue_20194_t2;

0 commit comments

Comments
 (0)