diff --git a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs index 95cd34b6e455..6018d714c595 100644 --- a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs @@ -46,7 +46,7 @@ use datafusion_physical_optimizer::output_requirements::OutputRequirementExec; use datafusion_physical_optimizer::projection_pushdown::ProjectionPushdown; use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion_physical_plan::coop::CooperativeExec; -use datafusion_physical_plan::filter::FilterExec; +use datafusion_physical_plan::filter::{FilterExec, FilterExecBuilder}; use datafusion_physical_plan::joins::utils::{ColumnIndex, JoinFilter}; use datafusion_physical_plan::joins::{ HashJoinExec, NestedLoopJoinExec, PartitionMode, StreamJoinPartitionMode, @@ -1754,3 +1754,121 @@ fn test_hash_join_empty_projection_embeds() -> Result<()> { Ok(()) } + +/// Regression test for +/// +/// When a `ProjectionExec` sits on top of a `FilterExec` that already carries +/// an embedded projection, the `ProjectionPushdown` optimizer must not panic. +/// +/// Before the fix, `FilterExecBuilder::from(self)` copied stale projection +/// indices (e.g. `[0, 1, 2]`). After swapping, the new input was narrower +/// (2 columns), so `.build()` panicked with "project index out of bounds". +#[test] +fn test_filter_with_embedded_projection_after_projection() -> Result<()> { + // DataSourceExec: [a, b, c, d, e] + let csv = create_simple_csv_exec(); + + // FilterExec: a > 0, projection=[0, 1, 2] → output: [a, b, c] + let predicate = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Gt, + Arc::new(Literal::new(ScalarValue::Int32(Some(0)))), + )); + let filter: Arc = Arc::new( + FilterExecBuilder::new(predicate, csv) + .apply_projection(Some(vec![0, 1, 2]))? + .build()?, + ); + + // ProjectionExec: narrows [a, b, c] → [a, b] + let projection: Arc = Arc::new(ProjectionExec::try_new( + vec![ + ProjectionExpr::new(Arc::new(Column::new("a", 0)), "a"), + ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b"), + ], + filter, + )?); + + let initial = displayable(projection.as_ref()).indent(true).to_string(); + let actual = initial.trim(); + assert_snapshot!( + actual, + @r" + ProjectionExec: expr=[a@0 as a, b@1 as b] + FilterExec: a@0 > 0, projection=[a@0, b@1, c@2] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false + " + ); + + // This must not panic + let after_optimize = + ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; + let after_optimize_string = displayable(after_optimize.as_ref()) + .indent(true) + .to_string(); + let actual = after_optimize_string.trim(); + assert_snapshot!( + actual, + @r" + FilterExec: a@0 > 0 + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b], file_type=csv, has_header=false + " + ); + + Ok(()) +} + +/// Same as above, but the outer ProjectionExec also renames columns. +/// Ensures the rename is preserved after the projection pushdown swap. +#[test] +fn test_filter_with_embedded_projection_after_renaming_projection() -> Result<()> { + let csv = create_simple_csv_exec(); + + // FilterExec: b > 10, projection=[0, 1, 2, 3] → output: [a, b, c, d] + let predicate = Arc::new(BinaryExpr::new( + Arc::new(Column::new("b", 1)), + Operator::Gt, + Arc::new(Literal::new(ScalarValue::Int32(Some(10)))), + )); + let filter: Arc = Arc::new( + FilterExecBuilder::new(predicate, csv) + .apply_projection(Some(vec![0, 1, 2, 3]))? + .build()?, + ); + + // ProjectionExec: [a as x, b as y] — narrows and renames + let projection: Arc = Arc::new(ProjectionExec::try_new( + vec![ + ProjectionExpr::new(Arc::new(Column::new("a", 0)), "x"), + ProjectionExpr::new(Arc::new(Column::new("b", 1)), "y"), + ], + filter, + )?); + + let initial = displayable(projection.as_ref()).indent(true).to_string(); + let actual = initial.trim(); + assert_snapshot!( + actual, + @r" + ProjectionExec: expr=[a@0 as x, b@1 as y] + FilterExec: b@1 > 10, projection=[a@0, b@1, c@2, d@3] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false + " + ); + + let after_optimize = + ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; + let after_optimize_string = displayable(after_optimize.as_ref()) + .indent(true) + .to_string(); + let actual = after_optimize_string.trim(); + assert_snapshot!( + actual, + @r" + FilterExec: y@1 > 10 + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a@0 as x, b@1 as y], file_type=csv, has_header=false + " + ); + + Ok(()) +} diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index afe2b0ae810a..8720d5f7d223 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -568,6 +568,10 @@ impl ExecutionPlan for FilterExec { return FilterExecBuilder::from(self) .with_input(make_with_child(projection, self.input())?) .with_predicate(new_predicate) + // The original FilterExec projection referenced columns from its old + // input. After the swap the new input is the ProjectionExec which + // already handles column selection, so clear the projection here. + .apply_projection(None)? .build() .map(|e| Some(Arc::new(e) as _)); } @@ -2572,4 +2576,67 @@ mod tests { ); Ok(()) } + + /// Regression test: ProjectionExec on top of a FilterExec that already has + /// an explicit projection must not panic when `try_swapping_with_projection` + /// attempts to swap the two nodes. + /// + /// Before the fix, `FilterExecBuilder::from(self)` copied the old projection + /// (e.g. `[0, 1, 2]`) from the FilterExec. After `.with_input` replaced the + /// input with the narrower ProjectionExec (2 columns), `.build()` tried to + /// validate the stale `[0, 1, 2]` projection against the 2-column schema and + /// panicked with "project index 2 out of bounds, max field 2". + #[test] + fn test_filter_with_projection_swap_does_not_panic() -> Result<()> { + use crate::projection::ProjectionExpr; + use datafusion_physical_expr::expressions::col; + + // Schema: [ts: Int64, tokens: Int64, svc: Utf8] + let schema = Arc::new(Schema::new(vec![ + Field::new("ts", DataType::Int64, false), + Field::new("tokens", DataType::Int64, false), + Field::new("svc", DataType::Utf8, false), + ])); + let input = Arc::new(EmptyExec::new(Arc::clone(&schema))); + + // FilterExec: ts > 0, projection=[ts@0, tokens@1, svc@2] (all 3 cols) + let predicate = Arc::new(BinaryExpr::new( + Arc::new(Column::new("ts", 0)), + Operator::Gt, + Arc::new(Literal::new(ScalarValue::Int64(Some(0)))), + )); + let filter = Arc::new( + FilterExecBuilder::new(predicate, input) + .apply_projection(Some(vec![0, 1, 2]))? + .build()?, + ); + + // ProjectionExec: narrows to [ts, tokens] (drops svc) + let proj_exprs = vec![ + ProjectionExpr { + expr: col("ts", &filter.schema())?, + alias: "ts".to_string(), + }, + ProjectionExpr { + expr: col("tokens", &filter.schema())?, + alias: "tokens".to_string(), + }, + ]; + let projection = Arc::new(ProjectionExec::try_new( + proj_exprs, + Arc::clone(&filter) as _, + )?); + + // This must not panic + let result = filter.try_swapping_with_projection(&projection)?; + assert!(result.is_some(), "swap should succeed"); + + let new_plan = result.unwrap(); + // Output schema must still be [ts, tokens] + let out_schema = new_plan.schema(); + assert_eq!(out_schema.fields().len(), 2); + assert_eq!(out_schema.field(0).name(), "ts"); + assert_eq!(out_schema.field(1).name(), "tokens"); + Ok(()) + } }