Skip to content

Commit e62533b

Browse files
authored
Substrait join consumer should not merge nullability of join keys (#21121)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #21124 ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> When a Substrait join expression contains both equal and is_not_distinct_from predicates (e.g. Spark pushes a null-safe filter into a join that already has a regular equality key), the `split_eq_and_noneq_join_predicate_with_nulls_equality` function uses a single `nulls_equal_nulls` boolean that gets overwritten per-predicate. Whichever operator is processed last determines the `NullEquality` for all keys, silently dropping NULL-matching rows. Since NullEquality is a join-level setting (not per-key) across all physical join implementations (HashJoinExec, SortMergeJoinExec, SymmetricHashJoinExec), the correct fix is to match DataFusion's own SQL planner behavior: demote IS NOT DISTINCT FROM keys to the join filter when mixed with Eq keys. This is already correctly handled for SQL as shown in [join_is_not_distinct_from.slt:L188](https://sourcegraph.com/r/github.com/apache/datafusion@2b7d4f9a5b005905b23128274ad37c3306ffcd15/-/blob/datafusion/sqllogictest/test_files/join_is_not_distinct_from.slt?L188) ``` # Test mixed equal and IS NOT DISTINCT FROM conditions # The `IS NOT DISTINCT FROM` expr should NOT in HashJoin's `on` predicate query TT EXPLAIN SELECT t1.id AS t1_id, t2.id AS t2_id, t1.val, t2.val FROM t1 JOIN t2 ON t1.id = t2.id AND t1.val IS NOT DISTINCT FROM t2.val ---- logical_plan 01)Projection: t1.id AS t1_id, t2.id AS t2_id, t1.val, t2.val 02)--Inner Join: t1.id = t2.id Filter: t1.val IS NOT DISTINCT FROM t2.val 03)----TableScan: t1 projection=[id, val] 04)----TableScan: t2 projection=[id, val] ``` ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> `datafusion/substrait/src/logical_plan/consumer/rel/join_rel.rs`: - Collect eq_keys and indistinct_keys separately instead of using a single vec with an overwritable boolean - When both are present (mixed case), use eq_keys as equijoin keys with NullEqualsNothing and reconstruct the IsNotDistinctFrom expressions into the join filter - Return NullEquality directly instead of converting from bool ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Yes, three levels of coverage: 1. Unit tests (join_rel.rs) — directly assert the output of split_eq_and_noneq_join_predicate_with_nulls_equality for eq-only, indistinct-only, mixed, and non-column-operand cases 2. Integration test (consumer_integration.rs) — loads a JSON-encoded Substrait plan with a JoinRel containing both operators through from_substrait_plan, executes it, and asserts 6 rows (including NULL=NULL matches) 3. Existing SLT (join_is_not_distinct_from.slt:179-205) — confirms the SQL planner already exhibits the same demotion behavior that this PR adds to the Substrait consumer ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> No API changes. Substrait plans with mixed equal/is_not_distinct_from join predicates now correctly preserve null-safe semantics instead of silently dropping NULL-matching rows.
1 parent 4084a18 commit e62533b

5 files changed

Lines changed: 486 additions & 40 deletions

File tree

datafusion/sqllogictest/test_files/join_is_not_distinct_from.slt

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,36 @@ JOIN t4 ON (t3.val1 IS NOT DISTINCT FROM t4.val1) AND (t3.val2 IS NOT DISTINCT F
291291
2 2 NULL NULL 200 200
292292
3 3 30 30 NULL NULL
293293

294+
# Test mixed: 1 Eq key + multiple IS NOT DISTINCT FROM keys.
295+
# The optimizer unconditionally favours Eq keys (see extract_equijoin_predicate.rs,
296+
# "Only convert when there are NO equijoin predicates, to be conservative").
297+
# All IS NOT DISTINCT FROM predicates should be demoted to filter, even when they outnumber the Eq key.
298+
query TT
299+
EXPLAIN SELECT t3.id AS t3_id, t4.id AS t4_id, t3.val1, t4.val1, t3.val2, t4.val2
300+
FROM t3
301+
JOIN t4 ON (t3.id = t4.id) AND (t3.val1 IS NOT DISTINCT FROM t4.val1) AND (t3.val2 IS NOT DISTINCT FROM t4.val2)
302+
----
303+
logical_plan
304+
01)Projection: t3.id AS t3_id, t4.id AS t4_id, t3.val1, t4.val1, t3.val2, t4.val2
305+
02)--Inner Join: t3.id = t4.id Filter: t3.val1 IS NOT DISTINCT FROM t4.val1 AND t3.val2 IS NOT DISTINCT FROM t4.val2
306+
03)----TableScan: t3 projection=[id, val1, val2]
307+
04)----TableScan: t4 projection=[id, val1, val2]
308+
physical_plan
309+
01)ProjectionExec: expr=[id@0 as t3_id, id@3 as t4_id, val1@1 as val1, val1@4 as val1, val2@2 as val2, val2@5 as val2]
310+
02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, id@0)], filter=val1@0 IS NOT DISTINCT FROM val1@2 AND val2@1 IS NOT DISTINCT FROM val2@3
311+
03)----DataSourceExec: partitions=1, partition_sizes=[1]
312+
04)----DataSourceExec: partitions=1, partition_sizes=[1]
313+
314+
# Verify correct results: all 3 rows should match (including NULL=NULL via IS NOT DISTINCT FROM in filter)
315+
query IIIIII rowsort
316+
SELECT t3.id AS t3_id, t4.id AS t4_id, t3.val1, t4.val1, t3.val2, t4.val2
317+
FROM t3
318+
JOIN t4 ON (t3.id = t4.id) AND (t3.val1 IS NOT DISTINCT FROM t4.val1) AND (t3.val2 IS NOT DISTINCT FROM t4.val2)
319+
----
320+
1 1 10 10 100 100
321+
2 2 NULL NULL 200 200
322+
3 3 30 30 NULL NULL
323+
294324
statement ok
295325
drop table t0;
296326

datafusion/substrait/src/logical_plan/consumer/rel/join_rel.rs

Lines changed: 146 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
use crate::logical_plan::consumer::SubstraitConsumer;
1919
use datafusion::common::{Column, JoinType, NullEquality, not_impl_err, plan_err};
2020
use datafusion::logical_expr::requalify_sides_if_needed;
21-
use datafusion::logical_expr::utils::split_conjunction;
21+
use datafusion::logical_expr::utils::split_conjunction_owned;
2222
use datafusion::logical_expr::{
2323
BinaryExpr, Expr, LogicalPlan, LogicalPlanBuilder, Operator,
2424
};
@@ -56,15 +56,10 @@ pub async fn from_join_rel(
5656
// So we extract each part as follows:
5757
// - If an Eq or IsNotDistinctFrom op is encountered, add the left column, right column and is_null_equal_nulls to `join_ons` vector
5858
// - Otherwise we add the expression to join_filter (use conjunction if filter already exists)
59-
let (join_ons, nulls_equal_nulls, join_filter) =
60-
split_eq_and_noneq_join_predicate_with_nulls_equality(&on);
59+
let (join_ons, null_equality, join_filter) =
60+
split_eq_and_noneq_join_predicate_with_nulls_equality(on);
6161
let (left_cols, right_cols): (Vec<_>, Vec<_>) =
6262
itertools::multiunzip(join_ons);
63-
let null_equality = if nulls_equal_nulls {
64-
NullEquality::NullEqualsNull
65-
} else {
66-
NullEquality::NullEqualsNothing
67-
};
6863
left.join_detailed(
6964
right.build()?,
7065
join_type,
@@ -89,49 +84,61 @@ pub async fn from_join_rel(
8984
}
9085

9186
fn split_eq_and_noneq_join_predicate_with_nulls_equality(
92-
filter: &Expr,
93-
) -> (Vec<(Column, Column)>, bool, Option<Expr>) {
94-
let exprs = split_conjunction(filter);
87+
filter: Expr,
88+
) -> (Vec<(Column, Column)>, NullEquality, Option<Expr>) {
89+
let exprs = split_conjunction_owned(filter);
9590

96-
let mut accum_join_keys: Vec<(Column, Column)> = vec![];
91+
let mut eq_keys: Vec<(Column, Column)> = vec![];
92+
let mut indistinct_keys: Vec<(Column, Column)> = vec![];
9793
let mut accum_filters: Vec<Expr> = vec![];
98-
let mut nulls_equal_nulls = false;
9994

10095
for expr in exprs {
101-
#[expect(clippy::collapsible_match)]
10296
match expr {
103-
Expr::BinaryExpr(binary_expr) => match binary_expr {
104-
x @ (BinaryExpr {
105-
left,
106-
op: Operator::Eq,
107-
right,
97+
Expr::BinaryExpr(BinaryExpr {
98+
left,
99+
op: op @ (Operator::Eq | Operator::IsNotDistinctFrom),
100+
right,
101+
}) => match (*left, *right) {
102+
(Expr::Column(l), Expr::Column(r)) => match op {
103+
Operator::Eq => eq_keys.push((l, r)),
104+
Operator::IsNotDistinctFrom => indistinct_keys.push((l, r)),
105+
_ => unreachable!(),
106+
},
107+
(left, right) => {
108+
accum_filters.push(Expr::BinaryExpr(BinaryExpr {
109+
left: Box::new(left),
110+
op,
111+
right: Box::new(right),
112+
}));
108113
}
109-
| BinaryExpr {
110-
left,
111-
op: Operator::IsNotDistinctFrom,
112-
right,
113-
}) => {
114-
nulls_equal_nulls = match x.op {
115-
Operator::Eq => false,
116-
Operator::IsNotDistinctFrom => true,
117-
_ => unreachable!(),
118-
};
119-
120-
match (left.as_ref(), right.as_ref()) {
121-
(Expr::Column(l), Expr::Column(r)) => {
122-
accum_join_keys.push((l.clone(), r.clone()));
123-
}
124-
_ => accum_filters.push(expr.clone()),
125-
}
126-
}
127-
_ => accum_filters.push(expr.clone()),
128114
},
129-
_ => accum_filters.push(expr.clone()),
115+
_ => accum_filters.push(expr),
130116
}
131117
}
132118

119+
let (join_keys, null_equality) =
120+
match (eq_keys.is_empty(), indistinct_keys.is_empty()) {
121+
// Mixed: use eq_keys as equijoin keys, demote indistinct keys to filter
122+
(false, false) => {
123+
for (l, r) in indistinct_keys {
124+
accum_filters.push(Expr::BinaryExpr(BinaryExpr {
125+
left: Box::new(Expr::Column(l)),
126+
op: Operator::IsNotDistinctFrom,
127+
right: Box::new(Expr::Column(r)),
128+
}));
129+
}
130+
(eq_keys, NullEquality::NullEqualsNothing)
131+
}
132+
// Only eq keys
133+
(false, true) => (eq_keys, NullEquality::NullEqualsNothing),
134+
// Only indistinct keys
135+
(true, false) => (indistinct_keys, NullEquality::NullEqualsNull),
136+
// No keys at all
137+
(true, true) => (vec![], NullEquality::NullEqualsNothing),
138+
};
139+
133140
let join_filter = accum_filters.into_iter().reduce(Expr::and);
134-
(accum_join_keys, nulls_equal_nulls, join_filter)
141+
(join_keys, null_equality, join_filter)
135142
}
136143

137144
fn from_substrait_jointype(join_type: i32) -> datafusion::common::Result<JoinType> {
@@ -153,3 +160,102 @@ fn from_substrait_jointype(join_type: i32) -> datafusion::common::Result<JoinTyp
153160
plan_err!("invalid join type variant {join_type}")
154161
}
155162
}
163+
164+
#[cfg(test)]
165+
mod tests {
166+
use super::*;
167+
168+
fn col(name: &str) -> Expr {
169+
Expr::Column(Column::from_name(name))
170+
}
171+
172+
fn indistinct(left: Expr, right: Expr) -> Expr {
173+
Expr::BinaryExpr(BinaryExpr {
174+
left: Box::new(left),
175+
op: Operator::IsNotDistinctFrom,
176+
right: Box::new(right),
177+
})
178+
}
179+
180+
fn fmt_keys(keys: &[(Column, Column)]) -> String {
181+
keys.iter()
182+
.map(|(l, r)| format!("{l} = {r}"))
183+
.collect::<Vec<_>>()
184+
.join(", ")
185+
}
186+
187+
#[test]
188+
fn split_only_eq_keys() {
189+
let expr = col("a").eq(col("b"));
190+
let (keys, null_eq, filter) =
191+
split_eq_and_noneq_join_predicate_with_nulls_equality(expr);
192+
193+
assert_eq!(fmt_keys(&keys), "a = b");
194+
assert_eq!(null_eq, NullEquality::NullEqualsNothing);
195+
assert!(filter.is_none());
196+
}
197+
198+
#[test]
199+
fn split_only_indistinct_keys() {
200+
let expr = indistinct(col("a"), col("b"));
201+
let (keys, null_eq, filter) =
202+
split_eq_and_noneq_join_predicate_with_nulls_equality(expr);
203+
204+
assert_eq!(fmt_keys(&keys), "a = b");
205+
assert_eq!(null_eq, NullEquality::NullEqualsNull);
206+
assert!(filter.is_none());
207+
}
208+
209+
/// Regression: mixed `equal` + `is_not_distinct_from` must demote
210+
/// the indistinct key to the join filter so the single NullEquality
211+
/// flag stays consistent (NullEqualsNothing for the eq keys).
212+
#[test]
213+
fn split_mixed_eq_and_indistinct_demotes_indistinct_to_filter() {
214+
let expr =
215+
indistinct(col("val_l"), col("val_r")).and(col("id_l").eq(col("id_r")));
216+
217+
let (keys, null_eq, filter) =
218+
split_eq_and_noneq_join_predicate_with_nulls_equality(expr);
219+
220+
assert_eq!(fmt_keys(&keys), "id_l = id_r");
221+
assert_eq!(null_eq, NullEquality::NullEqualsNothing);
222+
assert_eq!(
223+
filter.unwrap().to_string(),
224+
"val_l IS NOT DISTINCT FROM val_r"
225+
);
226+
}
227+
228+
/// Multiple IS NOT DISTINCT FROM keys with a single Eq key should demote
229+
/// all indistinct keys to the filter.
230+
#[test]
231+
fn split_mixed_multiple_indistinct_demoted() {
232+
let expr = indistinct(col("a_l"), col("a_r"))
233+
.and(indistinct(col("b_l"), col("b_r")))
234+
.and(col("id_l").eq(col("id_r")));
235+
236+
let (keys, null_eq, filter) =
237+
split_eq_and_noneq_join_predicate_with_nulls_equality(expr);
238+
239+
assert_eq!(fmt_keys(&keys), "id_l = id_r");
240+
assert_eq!(null_eq, NullEquality::NullEqualsNothing);
241+
assert_eq!(
242+
filter.unwrap().to_string(),
243+
"a_l IS NOT DISTINCT FROM a_r AND b_l IS NOT DISTINCT FROM b_r"
244+
);
245+
}
246+
247+
#[test]
248+
fn split_non_column_eq_goes_to_filter() {
249+
let expr = Expr::Literal(
250+
datafusion::common::ScalarValue::Utf8(Some("x".into())),
251+
None,
252+
)
253+
.eq(col("b"));
254+
255+
let (keys, _, filter) =
256+
split_eq_and_noneq_join_predicate_with_nulls_equality(expr);
257+
258+
assert!(keys.is_empty());
259+
assert_eq!(filter.unwrap().to_string(), "Utf8(\"x\") = b");
260+
}
261+
}

datafusion/substrait/tests/cases/consumer_integration.rs

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
#[cfg(test)]
2626
mod tests {
2727
use crate::utils::test::add_plan_schemas_to_ctx;
28+
use datafusion::arrow::record_batch::RecordBatch;
29+
use datafusion::arrow::util::pretty::pretty_format_batches;
2830
use datafusion::common::Result;
2931
use datafusion::prelude::SessionContext;
3032
use datafusion_substrait::logical_plan::consumer::from_substrait_plan;
@@ -33,6 +35,34 @@ mod tests {
3335
use std::io::BufReader;
3436
use substrait::proto::Plan;
3537

38+
async fn execute_plan(name: &str) -> Result<Vec<RecordBatch>> {
39+
let path = format!("tests/testdata/test_plans/{name}");
40+
let proto = serde_json::from_reader::<_, Plan>(BufReader::new(
41+
File::open(path).expect("file not found"),
42+
))
43+
.expect("failed to parse json");
44+
let ctx = SessionContext::new();
45+
let plan = from_substrait_plan(&ctx.state(), &proto).await?;
46+
ctx.execute_logical_plan(plan).await?.collect().await
47+
}
48+
49+
/// Pretty-print batches as a table with header on top and data rows sorted.
50+
fn pretty_sorted(batches: &[RecordBatch]) -> String {
51+
let pretty = pretty_format_batches(batches).unwrap().to_string();
52+
let all_lines: Vec<&str> = pretty.trim().lines().collect();
53+
let header = &all_lines[..3];
54+
let mut data: Vec<&str> = all_lines[3..all_lines.len() - 1].to_vec();
55+
data.sort();
56+
let footer = &all_lines[all_lines.len() - 1..];
57+
header
58+
.iter()
59+
.copied()
60+
.chain(data)
61+
.chain(footer.iter().copied())
62+
.collect::<Vec<_>>()
63+
.join("\n")
64+
}
65+
3666
async fn tpch_plan_to_string(query_id: i32) -> Result<String> {
3767
let path =
3868
format!("tests/testdata/tpch_substrait_plans/query_{query_id:02}_plan.json");
@@ -762,4 +792,80 @@ mod tests {
762792

763793
Ok(())
764794
}
795+
796+
/// Substrait join with both `equal` and `is_not_distinct_from` must demote
797+
/// `IS NOT DISTINCT FROM` to the join filter.
798+
#[tokio::test]
799+
async fn test_mixed_join_equal_and_indistinct_inner_join() -> Result<()> {
800+
let plan_str =
801+
test_plan_to_string("mixed_join_equal_and_indistinct.json").await?;
802+
// Eq becomes the equijoin key; IS NOT DISTINCT FROM is demoted to filter.
803+
assert_snapshot!(
804+
plan_str,
805+
@r#"
806+
Projection: left.id, left.val, left.comment, right.id AS id0, right.val AS val0, right.comment AS comment0
807+
Inner Join: left.id = right.id Filter: left.val IS NOT DISTINCT FROM right.val
808+
SubqueryAlias: left
809+
Values: (Utf8("1"), Utf8("a"), Utf8("c1")), (Utf8("2"), Utf8("b"), Utf8("c2")), (Utf8("3"), Utf8(NULL), Utf8("c3")), (Utf8("4"), Utf8(NULL), Utf8("c4")), (Utf8("5"), Utf8("e"), Utf8("c5"))...
810+
SubqueryAlias: right
811+
Values: (Utf8("1"), Utf8("a"), Utf8("c1")), (Utf8("2"), Utf8("b"), Utf8("c2")), (Utf8("3"), Utf8(NULL), Utf8("c3")), (Utf8("4"), Utf8(NULL), Utf8("c4")), (Utf8("5"), Utf8("e"), Utf8("c5"))...
812+
"#
813+
);
814+
815+
// Execute and verify actual rows, including NULL=NULL matches (ids 3,4).
816+
let results = execute_plan("mixed_join_equal_and_indistinct.json").await?;
817+
assert_snapshot!(pretty_sorted(&results),
818+
@r"
819+
+----+-----+---------+-----+------+----------+
820+
| id | val | comment | id0 | val0 | comment0 |
821+
+----+-----+---------+-----+------+----------+
822+
| 1 | a | c1 | 1 | a | c1 |
823+
| 2 | b | c2 | 2 | b | c2 |
824+
| 3 | | c3 | 3 | | c3 |
825+
| 4 | | c4 | 4 | | c4 |
826+
| 5 | e | c5 | 5 | e | c5 |
827+
| 6 | f | c6 | 6 | f | c6 |
828+
+----+-----+---------+-----+------+----------+
829+
"
830+
);
831+
832+
Ok(())
833+
}
834+
835+
/// Substrait join with both `equal` and `is_not_distinct_from` must demote
836+
/// `IS NOT DISTINCT FROM` to the join filter.
837+
#[tokio::test]
838+
async fn test_mixed_join_equal_and_indistinct_left_join() -> Result<()> {
839+
let plan_str =
840+
test_plan_to_string("mixed_join_equal_and_indistinct_left.json").await?;
841+
assert_snapshot!(
842+
plan_str,
843+
@r#"
844+
Projection: left.id, left.val, left.comment, right.id AS id0, right.val AS val0, right.comment AS comment0
845+
Left Join: left.id = right.id Filter: left.val IS NOT DISTINCT FROM right.val
846+
SubqueryAlias: left
847+
Values: (Utf8("1"), Utf8("a"), Utf8("c1")), (Utf8("2"), Utf8("b"), Utf8("c2")), (Utf8("3"), Utf8(NULL), Utf8("c3")), (Utf8("4"), Utf8(NULL), Utf8("c4")), (Utf8("5"), Utf8("e"), Utf8("c5"))...
848+
SubqueryAlias: right
849+
Values: (Utf8("1"), Utf8("a"), Utf8("c1")), (Utf8("2"), Utf8("b"), Utf8("c2")), (Utf8("3"), Utf8(NULL), Utf8("c3")), (Utf8("4"), Utf8(NULL), Utf8("c4")), (Utf8("5"), Utf8("e"), Utf8("c5"))...
850+
"#
851+
);
852+
853+
let results = execute_plan("mixed_join_equal_and_indistinct_left.json").await?;
854+
assert_snapshot!(pretty_sorted(&results),
855+
@r"
856+
+----+-----+---------+-----+------+----------+
857+
| id | val | comment | id0 | val0 | comment0 |
858+
+----+-----+---------+-----+------+----------+
859+
| 1 | a | c1 | 1 | a | c1 |
860+
| 2 | b | c2 | 2 | b | c2 |
861+
| 3 | | c3 | 3 | | c3 |
862+
| 4 | | c4 | 4 | | c4 |
863+
| 5 | e | c5 | 5 | e | c5 |
864+
| 6 | f | c6 | 6 | f | c6 |
865+
+----+-----+---------+-----+------+----------+
866+
"
867+
);
868+
869+
Ok(())
870+
}
765871
}

0 commit comments

Comments
 (0)