Skip to content

Commit 9c6e98f

Browse files
viiryaclaude
andcommitted
feat: Add multi-column support for null-aware anti joins
This commit extends null-aware anti join functionality to support multiple columns, enabling queries like: SELECT * FROM t1 WHERE (a, b) NOT IN (SELECT x, y FROM t2); and correlated multi-column NOT IN subqueries: SELECT * FROM t1 WHERE (c2, c3) NOT IN ( SELECT c2, c3 FROM t2 WHERE t1.c1 = t2.c1 ); Changes: Physical Execution Layer: - Remove single-column validation restriction in HashJoinExec - Extend NULL detection in probe phase to check ANY column for NULLs - Extend NULL filtering in final phase to filter rows with ANY NULL column - Add comprehensive unit tests for 2-column and 3-column joins SQL Planning Layer: - Allow tuple expressions in parse_in_subquery() - Add validation for tuple field count matching Query Optimization Layer: - Update InSubquery validation to allow struct expressions - Skip type coercion for struct expressions (handled in decorrelation) - Implement struct decomposition in decorrelate_predicate_subquery - Decompose struct(a, b) into individual join conditions a = x AND b = y - Handle both correlated and non-correlated multi-column subqueries Test Coverage: - Add 7 new SQL logic test cases (Tests 19-25) - Add 3 unit test functions with 15 test variants (5 batch sizes each) - Cover 2-column, 3-column, empty subquery, and NULL patterns - Include correlated multi-column NOT IN from issue #10583 Test Results: - 31/31 null-aware anti join tests passing - 369/369 total hash join tests passing - All optimizer tests passing 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent 1ab7e41 commit 9c6e98f

File tree

7 files changed

+572
-67
lines changed

7 files changed

+572
-67
lines changed

datafusion/expr/src/logical_plan/invariants.rs

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -222,14 +222,33 @@ pub fn check_subquery_expr(
222222
check_correlations_in_subquery(inner_plan)
223223
} else {
224224
if let Expr::InSubquery(subquery) = expr {
225-
// InSubquery should only return one column
226-
if subquery.subquery.subquery.schema().fields().len() > 1 {
225+
// InSubquery should only return one column UNLESS the left expression is a struct
226+
// (multi-column IN like: (a, b) NOT IN (SELECT x, y FROM ...))
227+
let is_struct = matches!(*subquery.expr, Expr::ScalarFunction(ref func) if func.func.name() == "struct");
228+
229+
let num_subquery_cols = subquery.subquery.subquery.schema().fields().len();
230+
231+
if !is_struct && num_subquery_cols > 1 {
227232
return plan_err!(
228233
"InSubquery should only return one column, but found {}: {}",
229-
subquery.subquery.subquery.schema().fields().len(),
234+
num_subquery_cols,
230235
subquery.subquery.subquery.schema().field_names().join(", ")
231236
);
232237
}
238+
239+
// For struct expressions, validate that the number of fields matches
240+
if is_struct {
241+
if let Expr::ScalarFunction(ref func) = *subquery.expr {
242+
let num_tuple_cols = func.args.len();
243+
if num_tuple_cols != num_subquery_cols {
244+
return plan_err!(
245+
"The number of columns in the tuple ({}) must match the number of columns in the subquery ({})",
246+
num_tuple_cols,
247+
num_subquery_cols
248+
);
249+
}
250+
}
251+
}
233252
}
234253
if let Expr::SetComparison(set_comparison) = expr
235254
&& set_comparison.subquery.subquery.schema().fields().len() > 1

datafusion/optimizer/src/analyzer/type_coercion.rs

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -482,23 +482,43 @@ impl TreeNodeRewriter for TypeCoercionRewriter<'_> {
482482
Arc::unwrap_or_clone(subquery.subquery),
483483
)?
484484
.data;
485-
let expr_type = expr.get_type(self.schema)?;
486-
let subquery_type = new_plan.schema().field(0).data_type();
487-
let common_type = comparison_coercion(&expr_type, subquery_type).ok_or(
488-
plan_datafusion_err!(
489-
"expr type {expr_type} can't cast to {subquery_type} in InSubquery"
490-
),
491-
)?;
492-
let new_subquery = Subquery {
493-
subquery: Arc::new(new_plan),
494-
outer_ref_columns: subquery.outer_ref_columns,
495-
spans: subquery.spans,
496-
};
497-
Ok(Transformed::yes(Expr::InSubquery(InSubquery::new(
498-
Box::new(expr.cast_to(&common_type, self.schema)?),
499-
cast_subquery(new_subquery, &common_type)?,
500-
negated,
501-
))))
485+
486+
// Check if this is a multi-column IN (struct expression)
487+
let is_struct = matches!(*expr, Expr::ScalarFunction(ref func) if func.func.name() == "struct");
488+
489+
if is_struct {
490+
// For multi-column IN, we don't need type coercion at this level
491+
// The decorrelation phase will handle this by creating join conditions
492+
let new_subquery = Subquery {
493+
subquery: Arc::new(new_plan),
494+
outer_ref_columns: subquery.outer_ref_columns,
495+
spans: subquery.spans,
496+
};
497+
Ok(Transformed::yes(Expr::InSubquery(InSubquery::new(
498+
expr,
499+
new_subquery,
500+
negated,
501+
))))
502+
} else {
503+
// Single-column IN: apply type coercion as before
504+
let expr_type = expr.get_type(self.schema)?;
505+
let subquery_type = new_plan.schema().field(0).data_type();
506+
let common_type = comparison_coercion(&expr_type, subquery_type).ok_or(
507+
plan_datafusion_err!(
508+
"expr type {expr_type} can't cast to {subquery_type} in InSubquery"
509+
),
510+
)?;
511+
let new_subquery = Subquery {
512+
subquery: Arc::new(new_plan),
513+
outer_ref_columns: subquery.outer_ref_columns,
514+
spans: subquery.spans,
515+
};
516+
Ok(Transformed::yes(Expr::InSubquery(InSubquery::new(
517+
Box::new(expr.cast_to(&common_type, self.schema)?),
518+
cast_subquery(new_subquery, &common_type)?,
519+
negated,
520+
))))
521+
}
502522
}
503523
Expr::SetComparison(SetComparison {
504524
expr,

datafusion/optimizer/src/decorrelate_predicate_subquery.rs

Lines changed: 95 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -386,9 +386,57 @@ fn build_join(
386386
right,
387387
})),
388388
) => {
389-
let right_col = create_col_from_scalar_expr(right.deref(), alias)?;
390-
let in_predicate = Expr::eq(left.deref().clone(), Expr::Column(right_col));
391-
in_predicate.and(join_filter)
389+
// Check if this is a multi-column IN (struct expression)
390+
if let Expr::ScalarFunction(func) = left.deref() {
391+
if func.func.name() == "struct" {
392+
// Decompose struct into individual field comparisons
393+
let struct_args = &func.args;
394+
395+
// The right side should be the subquery result
396+
// Note: After pull-up, the subquery may have additional correlated columns
397+
// We only care about the first N columns that match our struct fields
398+
let subquery_fields = sub_query_alias.schema().fields();
399+
400+
if struct_args.len() > subquery_fields.len() {
401+
return plan_err!(
402+
"Struct field count ({}) exceeds subquery column count ({})",
403+
struct_args.len(),
404+
subquery_fields.len()
405+
);
406+
}
407+
408+
// Create equality conditions for each field
409+
let mut conditions = Vec::new();
410+
for (i, arg) in struct_args.iter().enumerate() {
411+
let field = &subquery_fields[i];
412+
let right_col = Expr::Column(Column::new(
413+
Some(alias.clone()),
414+
field.name().to_string(),
415+
));
416+
conditions.push(Expr::eq(arg.clone(), right_col));
417+
}
418+
419+
// Combine all conditions with AND
420+
let in_predicate = conditions
421+
.into_iter()
422+
.reduce(|acc, cond| acc.and(cond))
423+
.unwrap_or(lit(true));
424+
425+
in_predicate.and(join_filter)
426+
} else {
427+
// Regular scalar function, handle as before
428+
let right_col = create_col_from_scalar_expr(right.deref(), alias)?;
429+
let in_predicate =
430+
Expr::eq(left.deref().clone(), Expr::Column(right_col));
431+
in_predicate.and(join_filter)
432+
}
433+
} else {
434+
// Not a struct, handle as before
435+
let right_col = create_col_from_scalar_expr(right.deref(), alias)?;
436+
let in_predicate =
437+
Expr::eq(left.deref().clone(), Expr::Column(right_col));
438+
in_predicate.and(join_filter)
439+
}
392440
}
393441
(Some(join_filter), _) => join_filter,
394442
(
@@ -399,9 +447,51 @@ fn build_join(
399447
right,
400448
})),
401449
) => {
402-
let right_col = create_col_from_scalar_expr(right.deref(), alias)?;
450+
// Check if this is a multi-column IN (struct expression)
451+
if let Expr::ScalarFunction(func) = left.deref() {
452+
if func.func.name() == "struct" {
453+
// Decompose struct into individual field comparisons
454+
let struct_args = &func.args;
455+
456+
// The right side should be the subquery result
457+
// Note: After pull-up, the subquery may have additional correlated columns
458+
// We only care about the first N columns that match our struct fields
459+
let subquery_fields = sub_query_alias.schema().fields();
460+
461+
if struct_args.len() > subquery_fields.len() {
462+
return plan_err!(
463+
"Struct field count ({}) exceeds subquery column count ({})",
464+
struct_args.len(),
465+
subquery_fields.len()
466+
);
467+
}
468+
469+
// Create equality conditions for each field
470+
let mut conditions = Vec::new();
471+
for (i, arg) in struct_args.iter().enumerate() {
472+
let field = &subquery_fields[i];
473+
let right_col = Expr::Column(Column::new(
474+
Some(alias.clone()),
475+
field.name().to_string(),
476+
));
477+
conditions.push(Expr::eq(arg.clone(), right_col));
478+
}
403479

404-
Expr::eq(left.deref().clone(), Expr::Column(right_col))
480+
// Combine all conditions with AND
481+
conditions
482+
.into_iter()
483+
.reduce(|acc, cond| acc.and(cond))
484+
.unwrap_or(lit(true))
485+
} else {
486+
// Regular scalar function, handle as before
487+
let right_col = create_col_from_scalar_expr(right.deref(), alias)?;
488+
Expr::eq(left.deref().clone(), Expr::Column(right_col))
489+
}
490+
} else {
491+
// Not a struct, handle as before
492+
let right_col = create_col_from_scalar_expr(right.deref(), alias)?;
493+
Expr::eq(left.deref().clone(), Expr::Column(right_col))
494+
}
405495
}
406496
(None, None) => lit(true),
407497
_ => return Ok(None),

0 commit comments

Comments
 (0)