Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 197 additions & 7 deletions datafusion/optimizer/src/propagate_empty_relation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ use std::sync::Arc;

use datafusion_common::JoinType;
use datafusion_common::tree_node::Transformed;
use datafusion_common::{Result, plan_err};
use datafusion_common::{Column, DFSchemaRef, Result, ScalarValue, plan_err};
use datafusion_expr::logical_plan::LogicalPlan;
use datafusion_expr::{EmptyRelation, Projection, Union};
use datafusion_expr::{EmptyRelation, Expr, Projection, Union, cast, lit};

use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};
Expand Down Expand Up @@ -73,12 +73,8 @@ impl OptimizerRule for PropagateEmptyRelation {
Ok(Transformed::no(plan))
}
LogicalPlan::Join(ref join) => {
// TODO: For Join, more join type need to be careful:
// For LeftOut/Full Join, if the right side is empty, the Join can be eliminated with a Projection with left side
// columns + right side columns replaced with null values.
// For RightOut/Full Join, if the left side is empty, the Join can be eliminated with a Projection with right side
// columns + left side columns replaced with null values.
let (left_empty, right_empty) = binary_plan_children_is_empty(&plan)?;
let left_field_count = join.left.schema().fields().len();

match join.join_type {
// For Full Join, only both sides are empty, the Join result is empty.
Expand All @@ -88,6 +84,24 @@ impl OptimizerRule for PropagateEmptyRelation {
schema: Arc::clone(&join.schema),
}),
)),
// For Full Join, if one side is empty, replace with a
// Projection that null-pads the empty side's columns.
JoinType::Full if right_empty => {
Ok(Transformed::yes(build_null_padded_projection(
Arc::clone(&join.left),
&join.schema,
left_field_count,
true,
)?))
}
JoinType::Full if left_empty => {
Ok(Transformed::yes(build_null_padded_projection(
Arc::clone(&join.right),
&join.schema,
left_field_count,
false,
)?))
}
JoinType::Inner if left_empty || right_empty => Ok(Transformed::yes(
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
Expand All @@ -100,12 +114,32 @@ impl OptimizerRule for PropagateEmptyRelation {
schema: Arc::clone(&join.schema),
}),
)),
// Left Join with empty right: all left rows survive
// with NULLs for right columns.
JoinType::Left if right_empty => {
Ok(Transformed::yes(build_null_padded_projection(
Arc::clone(&join.left),
&join.schema,
left_field_count,
true,
)?))
}
JoinType::Right if right_empty => Ok(Transformed::yes(
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: Arc::clone(&join.schema),
}),
)),
// Right Join with empty left: all right rows survive
// with NULLs for left columns.
JoinType::Right if left_empty => {
Ok(Transformed::yes(build_null_padded_projection(
Arc::clone(&join.right),
&join.schema,
left_field_count,
false,
)?))
}
JoinType::LeftSemi if left_empty || right_empty => Ok(
Transformed::yes(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
Expand Down Expand Up @@ -230,6 +264,57 @@ fn empty_child(plan: &LogicalPlan) -> Result<Option<LogicalPlan>> {
}
}

/// Builds a Projection that replaces one side of an outer join with NULL literals.
///
/// When one side of an outer join is an `EmptyRelation`, the join can be eliminated
/// by projecting the surviving side's columns as-is and replacing the empty side's
/// columns with `CAST(NULL AS <type>)`.
///
/// The join schema is used as the projection's output schema to preserve nullability
/// guarantees (important for FULL JOIN where the surviving side's columns are marked
/// nullable in the join schema even if they aren't in the source schema).
///
/// # Example
///
/// For a `LEFT JOIN` where the right side is empty:
/// ```text
/// Left Join (orders.id = returns.order_id) Projection(orders.id, orders.amount,
/// ├── TableScan: orders => CAST(NULL AS Int64) AS order_id,
/// └── EmptyRelation CAST(NULL AS Utf8) AS reason)
/// └── TableScan: orders
/// ```
fn build_null_padded_projection(
surviving_plan: Arc<LogicalPlan>,
join_schema: &DFSchemaRef,
left_field_count: usize,
empty_side_is_right: bool,
) -> Result<LogicalPlan> {
let exprs = join_schema
.iter()
.enumerate()
.map(|(i, (qualifier, field))| {
let on_empty_side = if empty_side_is_right {
i >= left_field_count
} else {
i < left_field_count
};

if on_empty_side {
cast(lit(ScalarValue::Null), field.data_type().clone())
.alias_qualified(qualifier.cloned(), field.name())
} else {
Expr::Column(Column::new(qualifier.cloned(), field.name()))
}
})
.collect::<Vec<_>>();

Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
exprs,
surviving_plan,
Arc::clone(join_schema),
)?))
}

#[cfg(test)]
mod tests {

Expand Down Expand Up @@ -570,6 +655,111 @@ mod tests {
assert_empty_left_empty_right_lp(true, false, JoinType::RightAnti, false)
}

#[test]
fn test_left_join_right_empty_null_pad() -> Result<()> {
let left =
LogicalPlanBuilder::from(test_table_scan_with_name("left")?).build()?;
let right_empty = LogicalPlanBuilder::from(test_table_scan_with_name("right")?)
.filter(lit(false))?
.build()?;

let plan = LogicalPlanBuilder::from(left)
.join_using(
right_empty,
JoinType::Left,
vec![Column::from_name("a".to_string())],
)?
.build()?;

let expected = "Projection: left.a, left.b, left.c, CAST(NULL AS UInt32) AS a, CAST(NULL AS UInt32) AS b, CAST(NULL AS UInt32) AS c\n TableScan: left";
assert_together_optimized_plan(plan, expected, true)
}

#[test]
fn test_right_join_left_empty_null_pad() -> Result<()> {
let left_empty = LogicalPlanBuilder::from(test_table_scan_with_name("left")?)
.filter(lit(false))?
.build()?;
let right =
LogicalPlanBuilder::from(test_table_scan_with_name("right")?).build()?;

let plan = LogicalPlanBuilder::from(left_empty)
.join_using(
right,
JoinType::Right,
vec![Column::from_name("a".to_string())],
)?
.build()?;

let expected = "Projection: CAST(NULL AS UInt32) AS a, CAST(NULL AS UInt32) AS b, CAST(NULL AS UInt32) AS c, right.a, right.b, right.c\n TableScan: right";
assert_together_optimized_plan(plan, expected, true)
}

#[test]
fn test_full_join_right_empty_null_pad() -> Result<()> {
let left =
LogicalPlanBuilder::from(test_table_scan_with_name("left")?).build()?;
let right_empty = LogicalPlanBuilder::from(test_table_scan_with_name("right")?)
.filter(lit(false))?
.build()?;

let plan = LogicalPlanBuilder::from(left)
.join_using(
right_empty,
JoinType::Full,
vec![Column::from_name("a".to_string())],
)?
.build()?;

let expected = "Projection: left.a, left.b, left.c, CAST(NULL AS UInt32) AS a, CAST(NULL AS UInt32) AS b, CAST(NULL AS UInt32) AS c\n TableScan: left";
assert_together_optimized_plan(plan, expected, true)
}

#[test]
fn test_full_join_left_empty_null_pad() -> Result<()> {
let left_empty = LogicalPlanBuilder::from(test_table_scan_with_name("left")?)
.filter(lit(false))?
.build()?;
let right =
LogicalPlanBuilder::from(test_table_scan_with_name("right")?).build()?;

let plan = LogicalPlanBuilder::from(left_empty)
.join_using(
right,
JoinType::Full,
vec![Column::from_name("a".to_string())],
)?
.build()?;

let expected = "Projection: CAST(NULL AS UInt32) AS a, CAST(NULL AS UInt32) AS b, CAST(NULL AS UInt32) AS c, right.a, right.b, right.c\n TableScan: right";
assert_together_optimized_plan(plan, expected, true)
}

#[test]
fn test_left_join_complex_on_right_empty_null_pad() -> Result<()> {
let left =
LogicalPlanBuilder::from(test_table_scan_with_name("left")?).build()?;
let right_empty = LogicalPlanBuilder::from(test_table_scan_with_name("right")?)
.filter(lit(false))?
.build()?;

// Complex ON condition: left.a = right.a AND left.b > right.b
let plan = LogicalPlanBuilder::from(left)
.join(
right_empty,
JoinType::Left,
(
vec![Column::from_name("a".to_string())],
vec![Column::from_name("a".to_string())],
),
Some(col("left.b").gt(col("right.b"))),
)?
.build()?;

let expected = "Projection: left.a, left.b, left.c, CAST(NULL AS UInt32) AS a, CAST(NULL AS UInt32) AS b, CAST(NULL AS UInt32) AS c\n TableScan: left";
assert_together_optimized_plan(plan, expected, true)
}

#[test]
fn test_empty_with_non_empty() -> Result<()> {
let table_scan = test_table_scan()?;
Expand Down
Loading
Loading