Skip to content

Commit 9ab5291

Browse files
aalexandrovalamb
andauthored
fix(sql): fix a bug when planning semi- or antijoins (#20990)
- Closes #20989. ## Rationale for this change The planner should be consistent with the expected SQL behavior—swapping the names of tables that have identical structure in a SQL query should not affect the schema for that query. ## What changes are included in this PR? - A fix in the `exclude_using_columns` helper method in `datafusion/expr/src/utils.rs` that ensures that we don't retain columns from the projected side when deciding which USING columns to exclude and which to retain on top of semi- or antijoins. - Regression tests for the change in `test_using_join_wildcard_schema_semi_anti`. ## Are these changes tested? - Added a regression test. ## Are there any user-facing changes? Yes, the change is user facing, but I doubt that this behavior is expected and is documented anywhere. If existing docs need to be updated, please point me to the concrete places and I can take a look. Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent b3b721f commit 9ab5291

File tree

2 files changed

+97
-23
lines changed

2 files changed

+97
-23
lines changed

datafusion/expr/src/utils.rs

Lines changed: 32 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -382,30 +382,39 @@ fn get_exprs_except_skipped(
382382
}
383383
}
384384

385-
/// For each column specified in the USING JOIN condition, the JOIN plan outputs it twice
386-
/// (once for each join side), but an unqualified wildcard should include it only once.
387-
/// This function returns the columns that should be excluded.
385+
/// When a JOIN has a USING clause, the join columns appear in the output
386+
/// schema once per side (for inner/outer joins) or once total (for semi/anti
387+
/// joins). An unqualified wildcard should include each USING column only once.
388+
/// This function returns the duplicate columns that should be excluded.
388389
fn exclude_using_columns(plan: &LogicalPlan) -> Result<HashSet<Column>> {
389-
let using_columns = plan.using_columns()?;
390-
let excluded = using_columns
391-
.into_iter()
392-
// For each USING JOIN condition, only expand to one of each join column in projection
393-
.flat_map(|cols| {
394-
let mut cols = cols.into_iter().collect::<Vec<_>>();
395-
// sort join columns to make sure we consistently keep the same
396-
// qualified column
397-
cols.sort();
398-
let mut out_column_names: HashSet<String> = HashSet::new();
399-
cols.into_iter().filter_map(move |c| {
400-
if out_column_names.contains(&c.name) {
401-
Some(c)
402-
} else {
403-
out_column_names.insert(c.name);
404-
None
405-
}
406-
})
407-
})
408-
.collect::<HashSet<_>>();
390+
let output_columns: HashSet<_> = plan.schema().columns().iter().cloned().collect();
391+
let mut excluded = HashSet::new();
392+
for cols in plan.using_columns()? {
393+
// `using_columns()` returns join columns from both sides regardless of
394+
// the join type. For semi/anti joins, only one side's columns appear in
395+
// the output schema. Filter to output columns so that columns from the
396+
// non-output side don't participate in the deduplication process below
397+
// and displace real output columns.
398+
let mut cols: Vec<_> = cols
399+
.into_iter()
400+
.filter(|c| output_columns.contains(c))
401+
.collect();
402+
403+
// Sort so we keep the same qualified column, regardless of HashSet
404+
// iteration order.
405+
cols.sort();
406+
407+
// Keep only one column per name from the columns set, adding any
408+
// duplicates to the excluded set.
409+
let mut seen_names = HashSet::new();
410+
for col in cols {
411+
if seen_names.contains(col.name.as_str()) {
412+
excluded.insert(col); // exclude columns with already seen name
413+
} else {
414+
seen_names.insert(col.name.clone()); // mark column name as seen
415+
}
416+
}
417+
}
409418
Ok(excluded)
410419
}
411420

datafusion/sql/tests/sql_integration.rs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5004,6 +5004,71 @@ fn test_using_join_wildcard_schema() {
50045004
);
50055005
}
50065006

5007+
#[test]
5008+
fn test_using_join_wildcard_schema_semi_anti() {
5009+
let s_columns = &["s.x1", "s.x2", "s.x3"];
5010+
let t_columns = &["t.x1", "t.x2", "t.x3"];
5011+
5012+
let sql = "WITH
5013+
s AS (SELECT 1 AS x1, 2 AS x2, 3 AS x3),
5014+
t AS (SELECT 1 AS x1, 4 AS x2, 5 AS x3)
5015+
SELECT * FROM s LEFT SEMI JOIN t USING (x1)";
5016+
let plan = logical_plan(sql).unwrap();
5017+
assert_eq!(plan.schema().field_names(), s_columns);
5018+
5019+
let sql = "WITH
5020+
s AS (SELECT 1 AS x1, 2 AS x2, 3 AS x3),
5021+
t AS (SELECT 1 AS x1, 4 AS x2, 5 AS x3)
5022+
SELECT * FROM t RIGHT SEMI JOIN s USING (x1)";
5023+
let plan = logical_plan(sql).unwrap();
5024+
assert_eq!(plan.schema().field_names(), s_columns);
5025+
5026+
let sql = "WITH
5027+
s AS (SELECT 1 AS x1, 2 AS x2, 3 AS x3),
5028+
t AS (SELECT 1 AS x1, 4 AS x2, 5 AS x3)
5029+
SELECT * FROM s LEFT ANTI JOIN t USING (x1)";
5030+
let plan = logical_plan(sql).unwrap();
5031+
assert_eq!(plan.schema().field_names(), s_columns);
5032+
5033+
let sql = "WITH
5034+
s AS (SELECT 1 AS x1, 2 AS x2, 3 AS x3),
5035+
t AS (SELECT 1 AS x1, 4 AS x2, 5 AS x3)
5036+
SELECT * FROM t RIGHT ANTI JOIN s USING (x1)";
5037+
let plan = logical_plan(sql).unwrap();
5038+
assert_eq!(plan.schema().field_names(), s_columns);
5039+
5040+
// Same as above, but with swapped s and t sides.
5041+
// Tests the issue fixed with #20990.
5042+
5043+
let sql = "WITH
5044+
s AS (SELECT 1 AS x1, 2 AS x2, 3 AS x3),
5045+
t AS (SELECT 1 AS x1, 4 AS x2, 5 AS x3)
5046+
SELECT * FROM t LEFT SEMI JOIN s USING (x1)";
5047+
let plan = logical_plan(sql).unwrap();
5048+
assert_eq!(plan.schema().field_names(), t_columns);
5049+
5050+
let sql = "WITH
5051+
s AS (SELECT 1 AS x1, 2 AS x2, 3 AS x3),
5052+
t AS (SELECT 1 AS x1, 4 AS x2, 5 AS x3)
5053+
SELECT * FROM s RIGHT SEMI JOIN t USING (x1)";
5054+
let plan = logical_plan(sql).unwrap();
5055+
assert_eq!(plan.schema().field_names(), t_columns);
5056+
5057+
let sql = "WITH
5058+
s AS (SELECT 1 AS x1, 2 AS x2, 3 AS x3),
5059+
t AS (SELECT 1 AS x1, 4 AS x2, 5 AS x3)
5060+
SELECT * FROM t LEFT ANTI JOIN s USING (x1)";
5061+
let plan = logical_plan(sql).unwrap();
5062+
assert_eq!(plan.schema().field_names(), t_columns);
5063+
5064+
let sql = "WITH
5065+
s AS (SELECT 1 AS x1, 2 AS x2, 3 AS x3),
5066+
t AS (SELECT 1 AS x1, 4 AS x2, 5 AS x3)
5067+
SELECT * FROM s RIGHT ANTI JOIN t USING (x1)";
5068+
let plan = logical_plan(sql).unwrap();
5069+
assert_eq!(plan.schema().field_names(), t_columns);
5070+
}
5071+
50075072
#[test]
50085073
fn test_2_nested_lateral_join_with_the_deepest_join_referencing_the_outer_most_relation()
50095074
{

0 commit comments

Comments
 (0)