Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 119 additions & 1 deletion datafusion/core/tests/physical_optimizer/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use datafusion_physical_optimizer::output_requirements::OutputRequirementExec;
use datafusion_physical_optimizer::projection_pushdown::ProjectionPushdown;
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion_physical_plan::coop::CooperativeExec;
use datafusion_physical_plan::filter::FilterExec;
use datafusion_physical_plan::filter::{FilterExec, FilterExecBuilder};
use datafusion_physical_plan::joins::utils::{ColumnIndex, JoinFilter};
use datafusion_physical_plan::joins::{
HashJoinExec, NestedLoopJoinExec, PartitionMode, StreamJoinPartitionMode,
Expand Down Expand Up @@ -1754,3 +1754,121 @@ fn test_hash_join_empty_projection_embeds() -> Result<()> {

Ok(())
}

/// Regression test for <https://github.com/apache/datafusion/issues/21459>
///
/// When a `ProjectionExec` sits on top of a `FilterExec` that already carries
/// an embedded projection, the `ProjectionPushdown` optimizer must not panic.
///
/// Before the fix, `FilterExecBuilder::from(self)` copied stale projection
/// indices (e.g. `[0, 1, 2]`). After swapping, the new input was narrower
/// (2 columns), so `.build()` panicked with "project index out of bounds".
#[test]
fn test_filter_with_embedded_projection_after_projection() -> Result<()> {
// DataSourceExec: [a, b, c, d, e]
let csv = create_simple_csv_exec();

// FilterExec: a > 0, projection=[0, 1, 2] → output: [a, b, c]
let predicate = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Gt,
Arc::new(Literal::new(ScalarValue::Int32(Some(0)))),
));
let filter: Arc<dyn ExecutionPlan> = Arc::new(
FilterExecBuilder::new(predicate, csv)
.apply_projection(Some(vec![0, 1, 2]))?
.build()?,
);

// ProjectionExec: narrows [a, b, c] → [a, b]
let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
vec![
ProjectionExpr::new(Arc::new(Column::new("a", 0)), "a"),
ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b"),
],
filter,
)?);

let initial = displayable(projection.as_ref()).indent(true).to_string();
let actual = initial.trim();
assert_snapshot!(
actual,
@r"
ProjectionExec: expr=[a@0 as a, b@1 as b]
FilterExec: a@0 > 0, projection=[a@0, b@1, c@2]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false
"
);

// This must not panic
let after_optimize =
ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
let after_optimize_string = displayable(after_optimize.as_ref())
.indent(true)
.to_string();
let actual = after_optimize_string.trim();
assert_snapshot!(
actual,
@r"
FilterExec: a@0 > 0
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b], file_type=csv, has_header=false
"
);

Ok(())
}

/// Same as above, but the outer ProjectionExec also renames columns.
/// Ensures the rename is preserved after the projection pushdown swap.
#[test]
fn test_filter_with_embedded_projection_after_renaming_projection() -> Result<()> {
let csv = create_simple_csv_exec();

// FilterExec: b > 10, projection=[0, 1, 2, 3] → output: [a, b, c, d]
let predicate = Arc::new(BinaryExpr::new(
Arc::new(Column::new("b", 1)),
Operator::Gt,
Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
));
let filter: Arc<dyn ExecutionPlan> = Arc::new(
FilterExecBuilder::new(predicate, csv)
.apply_projection(Some(vec![0, 1, 2, 3]))?
.build()?,
);

// ProjectionExec: [a as x, b as y] — narrows and renames
let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
vec![
ProjectionExpr::new(Arc::new(Column::new("a", 0)), "x"),
ProjectionExpr::new(Arc::new(Column::new("b", 1)), "y"),
],
filter,
)?);

let initial = displayable(projection.as_ref()).indent(true).to_string();
let actual = initial.trim();
assert_snapshot!(
actual,
@r"
ProjectionExec: expr=[a@0 as x, b@1 as y]
FilterExec: b@1 > 10, projection=[a@0, b@1, c@2, d@3]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false
"
);

let after_optimize =
ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
let after_optimize_string = displayable(after_optimize.as_ref())
.indent(true)
.to_string();
let actual = after_optimize_string.trim();
assert_snapshot!(
actual,
@r"
FilterExec: y@1 > 10
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a@0 as x, b@1 as y], file_type=csv, has_header=false
"
);

Ok(())
}
67 changes: 67 additions & 0 deletions datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,10 @@ impl ExecutionPlan for FilterExec {
return FilterExecBuilder::from(self)
.with_input(make_with_child(projection, self.input())?)
.with_predicate(new_predicate)
// The original FilterExec projection referenced columns from its old
// input. After the swap the new input is the ProjectionExec which
// already handles column selection, so clear the projection here.
.apply_projection(None)?
.build()
.map(|e| Some(Arc::new(e) as _));
}
Expand Down Expand Up @@ -2572,4 +2576,67 @@ mod tests {
);
Ok(())
}

/// Regression test: ProjectionExec on top of a FilterExec that already has
/// an explicit projection must not panic when `try_swapping_with_projection`
/// attempts to swap the two nodes.
///
/// Before the fix, `FilterExecBuilder::from(self)` copied the old projection
/// (e.g. `[0, 1, 2]`) from the FilterExec. After `.with_input` replaced the
/// input with the narrower ProjectionExec (2 columns), `.build()` tried to
/// validate the stale `[0, 1, 2]` projection against the 2-column schema and
/// panicked with "project index 2 out of bounds, max field 2".
#[test]
fn test_filter_with_projection_swap_does_not_panic() -> Result<()> {
use crate::projection::ProjectionExpr;
use datafusion_physical_expr::expressions::col;

// Schema: [ts: Int64, tokens: Int64, svc: Utf8]
let schema = Arc::new(Schema::new(vec![
Field::new("ts", DataType::Int64, false),
Field::new("tokens", DataType::Int64, false),
Field::new("svc", DataType::Utf8, false),
]));
let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));

// FilterExec: ts > 0, projection=[ts@0, tokens@1, svc@2] (all 3 cols)
let predicate = Arc::new(BinaryExpr::new(
Arc::new(Column::new("ts", 0)),
Operator::Gt,
Arc::new(Literal::new(ScalarValue::Int64(Some(0)))),
));
let filter = Arc::new(
FilterExecBuilder::new(predicate, input)
.apply_projection(Some(vec![0, 1, 2]))?
.build()?,
);

// ProjectionExec: narrows to [ts, tokens] (drops svc)
let proj_exprs = vec![
ProjectionExpr {
expr: col("ts", &filter.schema())?,
alias: "ts".to_string(),
},
ProjectionExpr {
expr: col("tokens", &filter.schema())?,
alias: "tokens".to_string(),
},
];
let projection = Arc::new(ProjectionExec::try_new(
proj_exprs,
Arc::clone(&filter) as _,
)?);

// This must not panic
let result = filter.try_swapping_with_projection(&projection)?;
assert!(result.is_some(), "swap should succeed");

let new_plan = result.unwrap();
// Output schema must still be [ts, tokens]
let out_schema = new_plan.schema();
assert_eq!(out_schema.fields().len(), 2);
assert_eq!(out_schema.field(0).name(), "ts");
assert_eq!(out_schema.field(1).name(), "tokens");
Ok(())
}
}
Loading