Skip to content

Commit b35c4a7

Browse files
EeshanBembialamb
andcommitted
fix: SanityCheckPlan error with window functions and NVL filter (#20231)
Closes #20194 A query with `ROW_NUMBER() OVER (... ORDER BY CASE WHEN col='0' THEN 1 ELSE 0 END)` combined with a filter `nvl(t2.value_2_3,'0')='0'` fails with a `SanityCheckPlan` error. This worked in 50.3.0 but broke in 52.1.0. **Root cause**: `collect_columns_from_predicate_inner` was extracting equality pairs where neither side was a `Column` (e.g. `nvl(col, '0') = '0'`), creating equivalence classes between complex expressions and literals. `normalize_expr`'s deep traversal would then replace the literal `'0'` inside unrelated sort/window CASE WHEN expressions with the complex NVL expression, corrupting the sort ordering and causing a mismatch between `SortExec`'s reported output ordering and `BoundedWindowAggExec`'s expected ordering. **Fix** (two changes in `filter.rs`): 1. **`collect_columns_from_predicate_inner`**: Only extract equality pairs where at least one side is a `Column` reference. This matches the function's documented intent ("Column-Pairs") and prevents complex-expression-to-literal equivalence classes from being created. 2. **`extend_constants`**: Recognize `Literal` expressions as inherently constant (previously only checked `is_expr_constant` on the input's equivalence properties, which doesn't know about literals). This ensures constant propagation still works for `complex_expr = literal` predicates — e.g. `nvl(col, '0')` is properly marked as constant after the filter. - Unit test `test_collect_columns_skips_non_column_pairs` verifying the filtering logic - Sqllogictest reproducing the exact query from the issue - Full test suites: equivalence tests (51 passed), physical-plan tests (1255 passed), physical-optimizer tests (20 passed) - Manual verification with datafusion-cli running the reproduction query - [x] Unit test for `collect_columns_from_predicate_inner` column filtering - [x] Sqllogictest regression test for #20194 - [x] Existing test suites pass - [x] Manual reproduction query succeeds --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent e5547e2 commit b35c4a7

File tree

2 files changed

+169
-13
lines changed

2 files changed

+169
-13
lines changed

datafusion/physical-plan/src/filter.rs

Lines changed: 123 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,12 @@ use datafusion_common::{
5555
use datafusion_execution::TaskContext;
5656
use datafusion_expr::Operator;
5757
use datafusion_physical_expr::equivalence::ProjectionMapping;
58-
use datafusion_physical_expr::expressions::{BinaryExpr, Column, lit};
58+
use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal, lit};
5959
use datafusion_physical_expr::intervals::utils::check_support;
6060
use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns};
6161
use datafusion_physical_expr::{
62-
AcrossPartitions, AnalysisContext, ConstExpr, ExprBoundaries, PhysicalExpr, analyze,
63-
conjunction, split_conjunction,
62+
AcrossPartitions, AnalysisContext, ConstExpr, EquivalenceProperties, ExprBoundaries,
63+
PhysicalExpr, analyze, conjunction, split_conjunction,
6464
};
6565

6666
use datafusion_physical_expr_common::physical_expr::fmt_sql;
@@ -243,6 +243,20 @@ impl FilterExec {
243243
})
244244
}
245245

246+
/// Returns the `AcrossPartitions` value for `expr` if it is constant:
247+
/// either already known constant in `input_eqs`, or a `Literal`
248+
/// (which is inherently constant across all partitions).
249+
fn expr_constant_or_literal(
250+
expr: &Arc<dyn PhysicalExpr>,
251+
input_eqs: &EquivalenceProperties,
252+
) -> Option<AcrossPartitions> {
253+
input_eqs.is_expr_constant(expr).or_else(|| {
254+
expr.as_any()
255+
.downcast_ref::<Literal>()
256+
.map(|l| AcrossPartitions::Uniform(Some(l.value().clone())))
257+
})
258+
}
259+
246260
fn extend_constants(
247261
input: &Arc<dyn ExecutionPlan>,
248262
predicate: &Arc<dyn PhysicalExpr>,
@@ -255,18 +269,24 @@ impl FilterExec {
255269
if let Some(binary) = conjunction.as_any().downcast_ref::<BinaryExpr>()
256270
&& binary.op() == &Operator::Eq
257271
{
258-
// Filter evaluates to single value for all partitions
259-
if input_eqs.is_expr_constant(binary.left()).is_some() {
260-
let across = input_eqs
261-
.is_expr_constant(binary.right())
262-
.unwrap_or_default();
272+
// Check if either side is constant — either already known
273+
// constant from the input equivalence properties, or a literal
274+
// value (which is inherently constant across all partitions).
275+
let left_const = Self::expr_constant_or_literal(binary.left(), input_eqs);
276+
let right_const =
277+
Self::expr_constant_or_literal(binary.right(), input_eqs);
278+
279+
if let Some(left_across) = left_const {
280+
// LEFT is constant, so RIGHT must also be constant.
281+
// Use RIGHT's known across value if available, otherwise
282+
// propagate LEFT's (e.g. Uniform from a literal).
283+
let across = right_const.unwrap_or(left_across);
263284
res_constants
264285
.push(ConstExpr::new(Arc::clone(binary.right()), across));
265-
} else if input_eqs.is_expr_constant(binary.right()).is_some() {
266-
let across = input_eqs
267-
.is_expr_constant(binary.left())
268-
.unwrap_or_default();
269-
res_constants.push(ConstExpr::new(Arc::clone(binary.left()), across));
286+
} else if let Some(right_across) = right_const {
287+
// RIGHT is constant, so LEFT must also be constant.
288+
res_constants
289+
.push(ConstExpr::new(Arc::clone(binary.left()), right_across));
270290
}
271291
}
272292
}
@@ -866,6 +886,19 @@ fn collect_columns_from_predicate_inner(
866886
let predicates = split_conjunction(predicate);
867887
predicates.into_iter().for_each(|p| {
868888
if let Some(binary) = p.as_any().downcast_ref::<BinaryExpr>() {
889+
// Only extract pairs where at least one side is a Column reference.
890+
// Pairs like `complex_expr = literal` should not create equivalence
891+
// classes — the literal could appear in many unrelated expressions
892+
// (e.g. sort keys), and normalize_expr's deep traversal would
893+
// replace those occurrences with the complex expression, corrupting
894+
// sort orderings. Constant propagation for such pairs is handled
895+
// separately by `extend_constants`.
896+
let has_direct_column_operand =
897+
binary.left().as_any().downcast_ref::<Column>().is_some()
898+
|| binary.right().as_any().downcast_ref::<Column>().is_some();
899+
if !has_direct_column_operand {
900+
return;
901+
}
869902
match binary.op() {
870903
Operator::Eq => {
871904
eq_predicate_columns.push((binary.left(), binary.right()))
@@ -1700,6 +1733,83 @@ mod tests {
17001733
from output schema (c@0) to input schema (c@2)"
17011734
);
17021735

1736+
Ok(())
1737+
}
1738+
/// Regression test for https://github.com/apache/datafusion/issues/20194
1739+
///
1740+
/// `collect_columns_from_predicate_inner` should only extract equality
1741+
/// pairs where at least one side is a Column. Pairs like
1742+
/// `complex_expr = literal` must not create equivalence classes because
1743+
/// `normalize_expr`'s deep traversal would replace the literal inside
1744+
/// unrelated expressions (e.g. sort keys) with the complex expression.
1745+
#[test]
1746+
fn test_collect_columns_skips_non_column_pairs() -> Result<()> {
1747+
let schema = test::aggr_test_schema();
1748+
1749+
// Simulate: nvl(c2, 0) = 0 → (c2 IS DISTINCT FROM 0) = 0
1750+
// Neither side is a Column, so this should NOT be extracted.
1751+
let complex_expr: Arc<dyn PhysicalExpr> = binary(
1752+
col("c2", &schema)?,
1753+
Operator::IsDistinctFrom,
1754+
lit(0u32),
1755+
&schema,
1756+
)?;
1757+
let predicate: Arc<dyn PhysicalExpr> =
1758+
binary(complex_expr, Operator::Eq, lit(0u32), &schema)?;
1759+
1760+
let (equal_pairs, _) = collect_columns_from_predicate_inner(&predicate);
1761+
assert_eq!(
1762+
0,
1763+
equal_pairs.len(),
1764+
"Should not extract equality pairs where neither side is a Column"
1765+
);
1766+
1767+
// But col = literal should still be extracted
1768+
let predicate: Arc<dyn PhysicalExpr> =
1769+
binary(col("c2", &schema)?, Operator::Eq, lit(0u32), &schema)?;
1770+
let (equal_pairs, _) = collect_columns_from_predicate_inner(&predicate);
1771+
assert_eq!(
1772+
1,
1773+
equal_pairs.len(),
1774+
"Should extract equality pairs where one side is a Column"
1775+
);
1776+
1777+
Ok(())
1778+
}
1779+
1780+
/// Columns with Absent min/max statistics should remain Absent after
1781+
/// FilterExec.
1782+
#[tokio::test]
1783+
async fn test_filter_statistics_absent_columns_stay_absent() -> Result<()> {
1784+
let schema = Schema::new(vec![
1785+
Field::new("a", DataType::Int32, false),
1786+
Field::new("b", DataType::Int32, false),
1787+
]);
1788+
let input = Arc::new(StatisticsExec::new(
1789+
Statistics {
1790+
num_rows: Precision::Inexact(1000),
1791+
total_byte_size: Precision::Absent,
1792+
column_statistics: vec![
1793+
ColumnStatistics::default(),
1794+
ColumnStatistics::default(),
1795+
],
1796+
},
1797+
schema.clone(),
1798+
));
1799+
1800+
let predicate = Arc::new(BinaryExpr::new(
1801+
Arc::new(Column::new("a", 0)),
1802+
Operator::Eq,
1803+
Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
1804+
));
1805+
let filter: Arc<dyn ExecutionPlan> =
1806+
Arc::new(FilterExec::try_new(predicate, input)?);
1807+
1808+
let statistics = filter.partition_statistics(None)?;
1809+
let col_b_stats = &statistics.column_statistics[1];
1810+
assert_eq!(col_b_stats.min_value, Precision::Absent);
1811+
assert_eq!(col_b_stats.max_value, Precision::Absent);
1812+
17031813
Ok(())
17041814
}
17051815
}

datafusion/sqllogictest/test_files/window.slt

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6081,3 +6081,49 @@ WHERE acctbal > (
60816081
);
60826082
----
60836083
1
6084+
6085+
# Regression test for https://github.com/apache/datafusion/issues/20194
6086+
# Window function with CASE WHEN in ORDER BY combined with NVL filter
6087+
# should not trigger SanityCheckPlan error from equivalence normalization
6088+
# replacing literals in sort expressions with complex filter expressions.
6089+
statement ok
6090+
CREATE TABLE issue_20194_t1 (
6091+
value_1_1 decimal(25) NULL,
6092+
value_1_2 int NULL,
6093+
value_1_3 bigint NULL
6094+
);
6095+
6096+
statement ok
6097+
CREATE TABLE issue_20194_t2 (
6098+
value_2_1 bigint NULL,
6099+
value_2_2 varchar(140) NULL,
6100+
value_2_3 varchar(140) NULL
6101+
);
6102+
6103+
statement ok
6104+
INSERT INTO issue_20194_t1 (value_1_1, value_1_2, value_1_3) VALUES (6774502793, 10040029, 1120);
6105+
6106+
statement ok
6107+
INSERT INTO issue_20194_t2 (value_2_1, value_2_2, value_2_3) VALUES (1120, '0', '0');
6108+
6109+
query RII
6110+
SELECT
6111+
t1.value_1_1, t1.value_1_2,
6112+
ROW_NUMBER() OVER (
6113+
PARTITION BY t1.value_1_1, t1.value_1_2
6114+
ORDER BY
6115+
CASE WHEN t2.value_2_2 = '0' THEN 1 ELSE 0 END ASC,
6116+
CASE WHEN t2.value_2_3 = '0' THEN 1 ELSE 0 END ASC
6117+
) AS ord
6118+
FROM issue_20194_t1 t1
6119+
INNER JOIN issue_20194_t2 t2
6120+
ON t1.value_1_3 = t2.value_2_1
6121+
AND nvl(t2.value_2_3, '0') = '0';
6122+
----
6123+
6774502793 10040029 1
6124+
6125+
statement ok
6126+
DROP TABLE issue_20194_t1;
6127+
6128+
statement ok
6129+
DROP TABLE issue_20194_t2;

0 commit comments

Comments
 (0)