diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index a1a636cfef9af..f59c7d3bcc6db 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -556,8 +556,190 @@ fn push_down_join( push_down_all_join(predicates, inferred_join_predicates, join, on_filters) } +/// Tracks column equivalence classes using a union-find algorithm. +/// +/// Used for transitive predicate propagation: if `a.x = b.y` and `b.y = c.z`, +/// then `a.x`, `b.y`, and `c.z` are all in the same equivalence class, and a +/// predicate on any one of them can be inferred for the others. +#[derive(Default)] +struct ColumnEquivalences { + parent: HashMap, +} + +impl ColumnEquivalences { + /// Find the root representative for a column, with path compression. + fn find(&mut self, col: &Column) -> Column { + if !self.parent.contains_key(col) { + return col.clone(); + } + let p = self.parent[col].clone(); + if &p == col { + return p; + } + let root = self.find(&p); + self.parent.insert(col.clone(), root.clone()); + root + } + + /// Merge the equivalence classes of two columns. + fn union(&mut self, a: &Column, b: &Column) { + let root_a = self.find(a); + let root_b = self.find(b); + if root_a != root_b { + // Ensure both roots are in the map so get_class can discover them + self.parent + .entry(root_a.clone()) + .or_insert_with(|| root_a.clone()); + self.parent.insert(root_b, root_a); + } + } + + /// Get all columns in the same equivalence class as the given column. + fn get_class(&mut self, col: &Column) -> Vec { + let root = self.find(col); + let all_cols: Vec = self.parent.keys().cloned().collect(); + let mut class = Vec::new(); + for c in all_cols { + if self.find(&c) == root { + class.push(c); + } + } + // Ensure the query column is included even if not in the map + if !class.contains(col) { + class.push(col.clone()); + } + class + } + + /// Returns true if no unions have been recorded. + fn is_empty(&self) -> bool { + self.parent.is_empty() + } +} + +/// If the expression is `col_a = col_b`, returns the two column references. +fn as_column_equality(expr: &Expr) -> Option<(&Column, &Column)> { + if let Expr::BinaryExpr(BinaryExpr { + left, + op: Operator::Eq, + right, + }) = expr + { + let left_col = left.try_as_col()?; + let right_col = right.try_as_col()?; + Some((left_col, right_col)) + } else { + None + } +} + +/// If the expression is a column equality (`col_a = col_b`), add it to the +/// equivalence classes. +fn try_add_column_equality(expr: &Expr, col_equivalences: &mut ColumnEquivalences) { + if let Some((left_col, right_col)) = as_column_equality(expr) { + col_equivalences.union(left_col, right_col); + } +} + +/// Collect column equalities from descendant join ON clauses and filters. +/// +/// Walks the plan subtree and adds column equality pairs to the equivalence +/// classes. Only adds ON-clause equalities from INNER joins (outer join +/// equalities don't unconditionally hold because the non-preserved side can +/// produce NULLs), but recurses through all join types to find nested INNER +/// joins deeper in the tree. Also collects `col = col` equalities from +/// Filter predicates. +/// +/// Stops recursing at nodes that produce new column identities (Projection, +/// Aggregate, SubqueryAlias, Window, Distinct, Union, Unnest) or alter row +/// cardinality (Limit, Sort with fetch). +fn collect_descendant_equalities( + plan: &LogicalPlan, + col_equivalences: &mut ColumnEquivalences, +) { + match plan { + LogicalPlan::Join(join) => { + // Only add equalities from INNER joins (bidirectional equality) + if join.join_type == JoinType::Inner { + for (l, r) in &join.on { + if let (Some(left_col), Some(right_col)) = + (l.try_as_col(), r.try_as_col()) + { + col_equivalences.union(left_col, right_col); + } + } + if let Some(filter) = &join.filter { + for conjunct in split_conjunction(filter) { + try_add_column_equality(conjunct, col_equivalences); + } + } + } + // Recurse through all join types to find nested INNER joins + collect_descendant_equalities(&join.left, col_equivalences); + collect_descendant_equalities(&join.right, col_equivalences); + } + LogicalPlan::Filter(filter) => { + for conjunct in split_conjunction(&filter.predicate) { + try_add_column_equality(conjunct, col_equivalences); + } + collect_descendant_equalities(&filter.input, col_equivalences); + } + LogicalPlan::Repartition(repart) => { + collect_descendant_equalities(&repart.input, col_equivalences); + } + _ => {} + } +} + +/// Infer predicates using equivalence classes built from the join tree. +/// +/// For each non-equality predicate, finds all columns in its equivalence class +/// and generates versions of the predicate with each equivalent column substituted. +fn infer_predicates_from_equivalence_classes( + col_equivalences: &mut ColumnEquivalences, + predicates: &[Expr], + inferred_predicates: &mut InferredPredicates, +) -> Result<()> { + for predicate in predicates { + // Skip column-column equalities — they are used for building equivalence + // classes, not for generating inferred predicates. Processing them would + // produce tautologies like `a.a = a.a`. + if as_column_equality(predicate).is_some() { + continue; + } + + let col_refs = predicate.column_refs(); + + // Only apply single-column replacement to predicates that reference + // exactly one column. Multi-column predicates (like `t1.a + t2.b > 5`) + // that reference columns from both join sides are handled by + // infer_join_predicates_impl which replaces all columns simultaneously. + // Replacing one column at a time in such predicates would produce + // unintended single-side predicates. + if col_refs.len() != 1 { + continue; + } + + let col = col_refs.iter().next().unwrap(); + let class = col_equivalences.get_class(col); + for equiv_col in &class { + if equiv_col != *col { + let replace_map = HashMap::from([(*col, equiv_col)]); + inferred_predicates + .try_build_predicate(predicate.clone(), &replace_map)?; + } + } + } + Ok(()) +} + /// Extracts any equi-join join predicates from the given filter expressions. /// +/// Uses equivalence classes built from the current join's keys and descendant +/// INNER join keys to enable transitive predicate propagation. For example, +/// given `a.x = b.y` (current join) and `b.y = c.z` (descendant join) and +/// predicate `a.x > 5`, this will infer both `b.y > 5` and `c.z > 5`. +/// /// Parameters /// * `join` the join in question /// @@ -585,12 +767,32 @@ fn infer_join_predicates( let mut inferred_predicates = InferredPredicates::new(join_type); - infer_join_predicates_from_predicates( - &join_col_keys, + // Build equivalence classes from the current join's keys and descendant joins + let mut col_equivalences = ColumnEquivalences::default(); + for (l, r) in &join_col_keys { + col_equivalences.union(l, r); + } + // Collect equalities from descendant inner joins + collect_descendant_equalities(&join.left, &mut col_equivalences); + collect_descendant_equalities(&join.right, &mut col_equivalences); + // Collect column equalities from WHERE predicates and ON filters + for pred in predicates.iter().chain(on_filters.iter()) { + try_add_column_equality(pred, &mut col_equivalences); + } + + if col_equivalences.is_empty() { + // No equivalence classes, nothing to infer + return Ok(vec![]); + } + + // Use equivalence classes for predicate inference from WHERE predicates + infer_predicates_from_equivalence_classes( + &mut col_equivalences, predicates, &mut inferred_predicates, )?; + // Keep existing ON filter inference (respects join type directionality) infer_join_predicates_from_on_filters( &join_col_keys, join_type, @@ -643,26 +845,6 @@ impl InferredPredicates { } } -/// Infer predicates from the pushed down predicates. -/// -/// Parameters -/// * `join_col_keys` column pairs from the join ON clause -/// -/// * `predicates` the pushed down predicates -/// -/// * `inferred_predicates` the inferred results -fn infer_join_predicates_from_predicates( - join_col_keys: &[(&Column, &Column)], - predicates: &[Expr], - inferred_predicates: &mut InferredPredicates, -) -> Result<()> { - infer_join_predicates_impl::( - join_col_keys, - predicates, - inferred_predicates, - ) -} - /// Infer predicates from the join filter. /// /// Parameters @@ -4383,4 +4565,404 @@ mod tests { " ) } + + /// Three-table INNER JOIN chain: a.a = b.a, b.a = c.a, WHERE a.a > 5 + /// Should transitively infer b.a > 5 AND c.a > 5 + #[test] + fn transitive_filter_three_table_inner_join_chain() -> Result<()> { + let table_a = test_table_scan_with_name("a")?; + let table_b = test_table_scan_with_name("b")?; + let table_c = test_table_scan_with_name("c")?; + let plan = LogicalPlanBuilder::from(table_a) + .join( + table_b, + JoinType::Inner, + ( + vec![Column::from_qualified_name("a.a")], + vec![Column::from_qualified_name("b.a")], + ), + None, + )? + .join( + table_c, + JoinType::Inner, + ( + vec![Column::from_qualified_name("b.a")], + vec![Column::from_qualified_name("c.a")], + ), + None, + )? + .filter(col("a.a").gt(lit(5i64)))? + .build()?; + + // c.a > 5 should be inferred transitively through b.a + assert_optimized_plan_equal!( + plan, + @r" + Inner Join: b.a = c.a + Inner Join: a.a = b.a + TableScan: a, full_filters=[a.a > Int64(5)] + TableScan: b, full_filters=[b.a > Int64(5)] + TableScan: c, full_filters=[c.a > Int64(5)] + " + ) + } + + /// Four-table INNER JOIN chain: a.a = b.a, b.a = c.a, c.a = d.a, WHERE a.a > 5 + #[test] + fn transitive_filter_four_table_inner_join_chain() -> Result<()> { + let table_a = test_table_scan_with_name("a")?; + let table_b = test_table_scan_with_name("b")?; + let table_c = test_table_scan_with_name("c")?; + let table_d = test_table_scan_with_name("d")?; + let plan = LogicalPlanBuilder::from(table_a) + .join( + table_b, + JoinType::Inner, + ( + vec![Column::from_qualified_name("a.a")], + vec![Column::from_qualified_name("b.a")], + ), + None, + )? + .join( + table_c, + JoinType::Inner, + ( + vec![Column::from_qualified_name("b.a")], + vec![Column::from_qualified_name("c.a")], + ), + None, + )? + .join( + table_d, + JoinType::Inner, + ( + vec![Column::from_qualified_name("c.a")], + vec![Column::from_qualified_name("d.a")], + ), + None, + )? + .filter(col("a.a").gt(lit(5i64)))? + .build()?; + + assert_optimized_plan_equal!( + plan, + @r" + Inner Join: c.a = d.a + Inner Join: b.a = c.a + Inner Join: a.a = b.a + TableScan: a, full_filters=[a.a > Int64(5)] + TableScan: b, full_filters=[b.a > Int64(5)] + TableScan: c, full_filters=[c.a > Int64(5)] + TableScan: d, full_filters=[d.a > Int64(5)] + " + ) + } + + /// INNER JOIN + LEFT JOIN chain: a INNER JOIN b, b LEFT JOIN c, WHERE a.a > 5 + /// Should infer b.a > 5 (inner join) and c.a > 5 (left join, null-restricting) + #[test] + fn transitive_filter_inner_then_left_join() -> Result<()> { + let table_a = test_table_scan_with_name("a")?; + let table_b = test_table_scan_with_name("b")?; + let table_c = test_table_scan_with_name("c")?; + let plan = LogicalPlanBuilder::from( + LogicalPlanBuilder::from(table_a) + .join( + table_b, + JoinType::Inner, + ( + vec![Column::from_qualified_name("a.a")], + vec![Column::from_qualified_name("b.a")], + ), + None, + )? + .build()?, + ) + .join( + table_c, + JoinType::Left, + ( + vec![Column::from_qualified_name("b.a")], + vec![Column::from_qualified_name("c.a")], + ), + None, + )? + .filter(col("a.a").gt(lit(5i64)))? + .build()?; + + // c.a > 5 is inferred because > 5 is null-restricting + assert_optimized_plan_equal!( + plan, + @r" + Left Join: b.a = c.a + Inner Join: a.a = b.a + TableScan: a, full_filters=[a.a > Int64(5)] + TableScan: b, full_filters=[b.a > Int64(5)] + TableScan: c, full_filters=[c.a > Int64(5)] + " + ) + } + + /// Filter-level column equality: WHERE a.a = b.a AND a.a > 5 + /// Should infer b.a > 5 from the WHERE clause equality + #[test] + fn transitive_filter_where_clause_equality() -> Result<()> { + let table_a = test_table_scan_with_name("a")?; + let table_b = test_table_scan_with_name("b")?; + let plan = LogicalPlanBuilder::from(table_a) + .cross_join(table_b)? + .filter(col("a.a").eq(col("b.a")).and(col("a.a").gt(lit(5i64))))? + .build()?; + + // b.a > 5 should be inferred from the WHERE clause equality a.a = b.a + // Note: the equality ends up as a join filter (not ON condition) because + // ExtractEquijoinPredicate is not run in this test + assert_optimized_plan_equal!( + plan, + @r" + Inner Join: Filter: a.a = b.a + TableScan: a, full_filters=[a.a > Int64(5)] + TableScan: b, full_filters=[b.a > Int64(5)] + " + ) + } + + /// FULL OUTER JOIN: transitive propagation should still work for + /// null-restricting predicates (the InferredPredicates check handles this) + #[test] + fn transitive_filter_full_join_no_transitive() -> Result<()> { + let table_a = test_table_scan_with_name("a")?; + let table_b = test_table_scan_with_name("b")?; + let table_c = test_table_scan_with_name("c")?; + let plan = LogicalPlanBuilder::from(table_a) + .join( + table_b, + JoinType::Full, + ( + vec![Column::from_qualified_name("a.a")], + vec![Column::from_qualified_name("b.a")], + ), + None, + )? + .join( + table_c, + JoinType::Inner, + ( + vec![Column::from_qualified_name("b.a")], + vec![Column::from_qualified_name("c.a")], + ), + None, + )? + .filter(col("a.a").gt(lit(5i64)))? + .build()?; + + // a.a > 5 stays above the FULL JOIN (can't push through full join). + // b.a > 5 is inferred at the FULL JOIN level (null-restricting). + // c.a is NOT inferred because the FULL JOIN's ON clause is not + // collected as an equivalence (only INNER joins are collected). + assert_optimized_plan_equal!( + plan, + @r" + Inner Join: b.a = c.a + Filter: a.a > Int64(5) + Full Join: a.a = b.a + TableScan: a + TableScan: b, full_filters=[b.a > Int64(5)] + TableScan: c + " + ) + } + + /// Self-join: transitive propagation should correctly distinguish + /// between different aliases of the same table + #[test] + fn transitive_filter_self_join() -> Result<()> { + let table_a = test_table_scan_with_name("a")?; + let table_a2 = test_table_scan_with_name("a2")?; + let table_a3 = test_table_scan_with_name("a3")?; + let plan = LogicalPlanBuilder::from(table_a) + .join( + table_a2, + JoinType::Inner, + ( + vec![Column::from_qualified_name("a.a")], + vec![Column::from_qualified_name("a2.a")], + ), + None, + )? + .join( + table_a3, + JoinType::Inner, + ( + vec![Column::from_qualified_name("a2.a")], + vec![Column::from_qualified_name("a3.a")], + ), + None, + )? + .filter(col("a.a").gt(lit(5i64)))? + .build()?; + + // All three aliases should get the filter + assert_optimized_plan_equal!( + plan, + @r" + Inner Join: a2.a = a3.a + Inner Join: a.a = a2.a + TableScan: a, full_filters=[a.a > Int64(5)] + TableScan: a2, full_filters=[a2.a > Int64(5)] + TableScan: a3, full_filters=[a3.a > Int64(5)] + " + ) + } + + /// Projection boundary: transitive propagation should NOT look through + /// projections since column identities change + #[test] + fn transitive_filter_stops_at_projection() -> Result<()> { + let table_a = test_table_scan_with_name("a")?; + let table_b = test_table_scan_with_name("b")?; + let table_c = test_table_scan_with_name("c")?; + // Build: a JOIN (SELECT a FROM b) AS b_proj JOIN c + // The projection between b and the outer join means we should NOT + // collect b's inner join equalities through the projection + let b_proj = LogicalPlanBuilder::from(table_b) + .project(vec![col("a")])? + .build()?; + let plan = LogicalPlanBuilder::from(table_a) + .join( + b_proj, + JoinType::Inner, + ( + vec![Column::from_qualified_name("a.a")], + vec![Column::from_name("a")], + ), + None, + )? + .join( + table_c, + JoinType::Inner, + ( + vec![Column::from_qualified_name("a.a")], + vec![Column::from_qualified_name("c.a")], + ), + None, + )? + .filter(col("a.a").gt(lit(5i64)))? + .build()?; + + // The filter should propagate through joins at this level. + // PushDownFilter rewrites the join key through the projection, + // so the filter is pushed through to the table scan. + assert_optimized_plan_equal!( + plan, + @r" + Inner Join: a.a = c.a + Inner Join: a.a = b.a + TableScan: a, full_filters=[a.a > Int64(5)] + Projection: b.a + TableScan: b, full_filters=[b.a > Int64(5)] + TableScan: c, full_filters=[c.a > Int64(5)] + " + ) + } + + /// Complex ON expression: `ON a.x + 1 = b.y` should NOT create + /// equivalences (only simple column refs are handled) + #[test] + fn transitive_filter_complex_on_expression() -> Result<()> { + let table_a = test_table_scan_with_name("a")?; + let table_b = test_table_scan_with_name("b")?; + let table_c = test_table_scan_with_name("c")?; + let plan = LogicalPlanBuilder::from(table_a) + .join( + table_b, + JoinType::Inner, + ( + vec![Column::from_qualified_name("a.a")], + vec![Column::from_qualified_name("b.a")], + ), + None, + )? + .join_on( + table_c, + JoinType::Inner, + vec![(col("b.a") + lit(1i64)).eq(col("c.a"))], + )? + .filter(col("a.a").gt(lit(5i64)))? + .build()?; + + // b.a > 5 is inferred (simple column key), but c.a > 5 is NOT + // because the join condition `b.a + 1 = c.a` is not a simple + // column equality + assert_optimized_plan_equal!( + plan, + @r" + Inner Join: Filter: b.a + Int64(1) = c.a + Inner Join: a.a = b.a + TableScan: a, full_filters=[a.a > Int64(5)] + TableScan: b, full_filters=[b.a > Int64(5)] + TableScan: c + " + ) + } + + #[test] + fn test_column_equivalences_basic() { + let col_a = Column::from_qualified_name("t1.a"); + let col_b = Column::from_qualified_name("t2.b"); + let col_c = Column::from_qualified_name("t3.c"); + + let mut eq = ColumnEquivalences::default(); + assert!(eq.is_empty()); + + eq.union(&col_a, &col_b); + assert!(!eq.is_empty()); + + // a and b should be in the same class + let class_a = eq.get_class(&col_a); + assert!(class_a.contains(&col_a)); + assert!(class_a.contains(&col_b)); + assert_eq!(class_a.len(), 2); + + // c should be alone + let class_c = eq.get_class(&col_c); + assert_eq!(class_c, vec![col_c.clone()]); + + // Chain: b = c, now a, b, c should all be in one class + eq.union(&col_b, &col_c); + let class = eq.get_class(&col_a); + assert_eq!(class.len(), 3); + assert!(class.contains(&col_a)); + assert!(class.contains(&col_b)); + assert!(class.contains(&col_c)); + } + + #[test] + fn test_column_equivalences_path_compression() { + let cols: Vec = (0..5) + .map(|i| Column::from_qualified_name(format!("t{i}.x"))) + .collect(); + + let mut eq = ColumnEquivalences::default(); + // Build a chain: t0.x = t1.x, t1.x = t2.x, t2.x = t3.x, t3.x = t4.x + for i in 0..4 { + eq.union(&cols[i], &cols[i + 1]); + } + + // All should be in one class + let class = eq.get_class(&cols[4]); + assert_eq!(class.len(), 5); + for col in &cols { + assert!(class.contains(col)); + } + + // After get_class, path compression should make subsequent finds faster + // (same root for all) + let root = eq.find(&cols[0]); + for col in &cols { + assert_eq!(eq.find(col), root); + } + } } diff --git a/datafusion/sqllogictest/test_files/push_down_filter_transitive.slt b/datafusion/sqllogictest/test_files/push_down_filter_transitive.slt new file mode 100644 index 0000000000000..b7ab6c42e8e67 --- /dev/null +++ b/datafusion/sqllogictest/test_files/push_down_filter_transitive.slt @@ -0,0 +1,224 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Tests for transitive predicate propagation through join chains + +statement ok +set datafusion.explain.logical_plan_only = true; + +# Tables with overlapping 'a' values to test join chains +# t1 ∩ t2 ∩ t3 on a: {1, 2, 5} +# t1 ∩ t2 ∩ t3 ∩ t4 on a: {1, 2, 5} + +statement ok +CREATE TABLE t1(a INT, b INT, c INT) +AS VALUES +(1, 10, 100), +(2, 20, 200), +(3, 30, 300), +(5, 50, 500), +(8, 80, 800); + +statement ok +CREATE TABLE t2(a INT, b INT, c INT) +AS VALUES +(1, 11, 101), +(2, 21, 201), +(3, 31, 301), +(5, 51, 501), +(6, 61, 601); + +statement ok +CREATE TABLE t3(a INT, b INT, c INT) +AS VALUES +(1, 12, 102), +(2, 22, 202), +(5, 52, 502), +(7, 72, 702), +(9, 92, 902); + +statement ok +CREATE TABLE t4(a INT, b INT, c INT) +AS VALUES +(1, 13, 103), +(2, 23, 203), +(3, 33, 303), +(5, 53, 503), +(8, 83, 803); + +#### +# Three-table INNER JOIN chain with transitive predicate +# Filter on t1.a > 3 should propagate to t2 and t3 through: +# t1.a = t2.a AND t2.a = t3.a +#### + +# Verify the optimized plan shows the filter pushed to all three tables +query TT +EXPLAIN SELECT t1.a, t2.b, t3.c +FROM t1 +JOIN t2 ON t1.a = t2.a +JOIN t3 ON t2.a = t3.a +WHERE t1.a > 3; +---- +logical_plan +01)Projection: t1.a, t2.b, t3.c +02)--Inner Join: t2.a = t3.a +03)----Inner Join: t1.a = t2.a +04)------Filter: t1.a > Int32(3) +05)--------TableScan: t1 projection=[a] +06)------Filter: t2.a > Int32(3) +07)--------TableScan: t2 projection=[a, b] +08)----Filter: t3.a > Int32(3) +09)------TableScan: t3 projection=[a, c] + +# Verify correct query results +query III +SELECT t1.a, t2.b, t3.c +FROM t1 +JOIN t2 ON t1.a = t2.a +JOIN t3 ON t2.a = t3.a +WHERE t1.a > 3; +---- +5 51 502 + +#### +# Four-table INNER JOIN chain +# t1.a = t2.a, t2.a = t3.a, t3.a = t4.a, WHERE t1.a = 1 +# Transitive filter should push t*.a = 1 to ALL four tables +#### + +query TT +EXPLAIN SELECT t1.a, t2.b, t3.b, t4.b +FROM t1 +JOIN t2 ON t1.a = t2.a +JOIN t3 ON t2.a = t3.a +JOIN t4 ON t3.a = t4.a +WHERE t1.a = 1; +---- +logical_plan +01)Projection: t1.a, t2.b, t3.b, t4.b +02)--Inner Join: t3.a = t4.a +03)----Projection: t1.a, t2.b, t3.a, t3.b +04)------Inner Join: t2.a = t3.a +05)--------Inner Join: t1.a = t2.a +06)----------Filter: t1.a = Int32(1) +07)------------TableScan: t1 projection=[a] +08)----------Filter: t2.a = Int32(1) +09)------------TableScan: t2 projection=[a, b] +10)--------Filter: t3.a = Int32(1) +11)----------TableScan: t3 projection=[a, b] +12)----Filter: t4.a = Int32(1) +13)------TableScan: t4 projection=[a, b] + +query IIII +SELECT t1.a, t2.b, t3.b, t4.b +FROM t1 +JOIN t2 ON t1.a = t2.a +JOIN t3 ON t2.a = t3.a +JOIN t4 ON t3.a = t4.a +WHERE t1.a = 1; +---- +1 11 12 13 + +#### +# Mixed INNER + LEFT JOIN chain +# Filter should propagate through INNER join and into LEFT join right side +# (because > is null-restricting) +#### + +query TT +EXPLAIN SELECT t1.a, t2.b, t3.c +FROM t1 +JOIN t2 ON t1.a = t2.a +LEFT JOIN t3 ON t2.a = t3.a +WHERE t1.a > 3; +---- +logical_plan +01)Projection: t1.a, t2.b, t3.c +02)--Left Join: t2.a = t3.a +03)----Inner Join: t1.a = t2.a +04)------Filter: t1.a > Int32(3) +05)--------TableScan: t1 projection=[a] +06)------Filter: t2.a > Int32(3) +07)--------TableScan: t2 projection=[a, b] +08)----Filter: t3.a > Int32(3) +09)------TableScan: t3 projection=[a, c] + +# LEFT JOIN preserves left side rows even when right has no match +query III rowsort +SELECT t1.a, t2.b, t3.c +FROM t1 +JOIN t2 ON t1.a = t2.a +LEFT JOIN t3 ON t2.a = t3.a +WHERE t1.a > 1; +---- +2 21 202 +3 31 NULL +5 51 502 + +#### +# Verify correctness: three-table join returns same results with and without filter +#### + +query III rowsort +SELECT t1.a, t2.b, t3.c +FROM t1 +JOIN t2 ON t1.a = t2.a +JOIN t3 ON t2.a = t3.a; +---- +1 11 102 +2 21 202 +5 51 502 + +# With filter that matches some rows +query III rowsort +SELECT t1.a, t2.b, t3.c +FROM t1 +JOIN t2 ON t1.a = t2.a +JOIN t3 ON t2.a = t3.a +WHERE t1.a >= 2; +---- +2 21 202 +5 51 502 + +# With filter that matches no rows +query III rowsort +SELECT t1.a, t2.b, t3.c +FROM t1 +JOIN t2 ON t1.a = t2.a +JOIN t3 ON t2.a = t3.a +WHERE t1.a > 100; +---- + +#### +# Cleanup +#### + +statement ok +DROP TABLE t1; + +statement ok +DROP TABLE t2; + +statement ok +DROP TABLE t3; + +statement ok +DROP TABLE t4; + +statement ok +set datafusion.explain.logical_plan_only = false;