Skip to content

Commit 91c2e04

Browse files
authored
fix: FilterExec should drop projection when apply projection pushdown (apache#21460)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes apache#123` indicates that this PR will close issue apache#123. --> - Closes apache#21459 ## Rationale for this change When a `ProjectionExec` sits on top of a `FilterExec` that already carries an explicit projection, the `ProjectionPushdown` optimizer attempts to swap them via `try_swapping_with_projection`. The swap replaces the `FilterExec's` input with the narrower `ProjectionExec`, but `FilterExecBuilder::from(self)` carried over the old projection indices (e.g. [0, 1, 2]). After the swap the new input only has the columns selected by the `ProjectionExec` (e.g. 2 columns), so .build() tries to validate the stale projection against the narrower schema and panics with "project index 2 out of bounds, max field 2". ## What changes are included in this PR? In `FilterExec::try_swapping_with_projection`, after replacing the input with the narrower ProjectionExec, clear the FilterExec's own projection via .`apply_projection(None)`. The ProjectionExec that is now the input already handles column selection, so the FilterExec no longer needs its own projection. ## Are these changes tested? yes, add test case ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent 5ba06ac commit 91c2e04

File tree

2 files changed

+186
-1
lines changed

2 files changed

+186
-1
lines changed

datafusion/core/tests/physical_optimizer/projection_pushdown.rs

Lines changed: 119 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ use datafusion_physical_optimizer::output_requirements::OutputRequirementExec;
4646
use datafusion_physical_optimizer::projection_pushdown::ProjectionPushdown;
4747
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
4848
use datafusion_physical_plan::coop::CooperativeExec;
49-
use datafusion_physical_plan::filter::FilterExec;
49+
use datafusion_physical_plan::filter::{FilterExec, FilterExecBuilder};
5050
use datafusion_physical_plan::joins::utils::{ColumnIndex, JoinFilter};
5151
use datafusion_physical_plan::joins::{
5252
HashJoinExec, NestedLoopJoinExec, PartitionMode, StreamJoinPartitionMode,
@@ -1754,3 +1754,121 @@ fn test_hash_join_empty_projection_embeds() -> Result<()> {
17541754

17551755
Ok(())
17561756
}
1757+
1758+
/// Regression test for <https://github.com/apache/datafusion/issues/21459>
1759+
///
1760+
/// When a `ProjectionExec` sits on top of a `FilterExec` that already carries
1761+
/// an embedded projection, the `ProjectionPushdown` optimizer must not panic.
1762+
///
1763+
/// Before the fix, `FilterExecBuilder::from(self)` copied stale projection
1764+
/// indices (e.g. `[0, 1, 2]`). After swapping, the new input was narrower
1765+
/// (2 columns), so `.build()` panicked with "project index out of bounds".
1766+
#[test]
1767+
fn test_filter_with_embedded_projection_after_projection() -> Result<()> {
1768+
// DataSourceExec: [a, b, c, d, e]
1769+
let csv = create_simple_csv_exec();
1770+
1771+
// FilterExec: a > 0, projection=[0, 1, 2] → output: [a, b, c]
1772+
let predicate = Arc::new(BinaryExpr::new(
1773+
Arc::new(Column::new("a", 0)),
1774+
Operator::Gt,
1775+
Arc::new(Literal::new(ScalarValue::Int32(Some(0)))),
1776+
));
1777+
let filter: Arc<dyn ExecutionPlan> = Arc::new(
1778+
FilterExecBuilder::new(predicate, csv)
1779+
.apply_projection(Some(vec![0, 1, 2]))?
1780+
.build()?,
1781+
);
1782+
1783+
// ProjectionExec: narrows [a, b, c] → [a, b]
1784+
let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
1785+
vec![
1786+
ProjectionExpr::new(Arc::new(Column::new("a", 0)), "a"),
1787+
ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b"),
1788+
],
1789+
filter,
1790+
)?);
1791+
1792+
let initial = displayable(projection.as_ref()).indent(true).to_string();
1793+
let actual = initial.trim();
1794+
assert_snapshot!(
1795+
actual,
1796+
@r"
1797+
ProjectionExec: expr=[a@0 as a, b@1 as b]
1798+
FilterExec: a@0 > 0, projection=[a@0, b@1, c@2]
1799+
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false
1800+
"
1801+
);
1802+
1803+
// This must not panic
1804+
let after_optimize =
1805+
ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
1806+
let after_optimize_string = displayable(after_optimize.as_ref())
1807+
.indent(true)
1808+
.to_string();
1809+
let actual = after_optimize_string.trim();
1810+
assert_snapshot!(
1811+
actual,
1812+
@r"
1813+
FilterExec: a@0 > 0
1814+
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b], file_type=csv, has_header=false
1815+
"
1816+
);
1817+
1818+
Ok(())
1819+
}
1820+
1821+
/// Same as above, but the outer ProjectionExec also renames columns.
1822+
/// Ensures the rename is preserved after the projection pushdown swap.
1823+
#[test]
1824+
fn test_filter_with_embedded_projection_after_renaming_projection() -> Result<()> {
1825+
let csv = create_simple_csv_exec();
1826+
1827+
// FilterExec: b > 10, projection=[0, 1, 2, 3] → output: [a, b, c, d]
1828+
let predicate = Arc::new(BinaryExpr::new(
1829+
Arc::new(Column::new("b", 1)),
1830+
Operator::Gt,
1831+
Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1832+
));
1833+
let filter: Arc<dyn ExecutionPlan> = Arc::new(
1834+
FilterExecBuilder::new(predicate, csv)
1835+
.apply_projection(Some(vec![0, 1, 2, 3]))?
1836+
.build()?,
1837+
);
1838+
1839+
// ProjectionExec: [a as x, b as y] — narrows and renames
1840+
let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
1841+
vec![
1842+
ProjectionExpr::new(Arc::new(Column::new("a", 0)), "x"),
1843+
ProjectionExpr::new(Arc::new(Column::new("b", 1)), "y"),
1844+
],
1845+
filter,
1846+
)?);
1847+
1848+
let initial = displayable(projection.as_ref()).indent(true).to_string();
1849+
let actual = initial.trim();
1850+
assert_snapshot!(
1851+
actual,
1852+
@r"
1853+
ProjectionExec: expr=[a@0 as x, b@1 as y]
1854+
FilterExec: b@1 > 10, projection=[a@0, b@1, c@2, d@3]
1855+
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false
1856+
"
1857+
);
1858+
1859+
let after_optimize =
1860+
ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
1861+
let after_optimize_string = displayable(after_optimize.as_ref())
1862+
.indent(true)
1863+
.to_string();
1864+
let actual = after_optimize_string.trim();
1865+
assert_snapshot!(
1866+
actual,
1867+
@r"
1868+
FilterExec: y@1 > 10
1869+
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a@0 as x, b@1 as y], file_type=csv, has_header=false
1870+
"
1871+
);
1872+
1873+
Ok(())
1874+
}

datafusion/physical-plan/src/filter.rs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -568,6 +568,10 @@ impl ExecutionPlan for FilterExec {
568568
return FilterExecBuilder::from(self)
569569
.with_input(make_with_child(projection, self.input())?)
570570
.with_predicate(new_predicate)
571+
// The original FilterExec projection referenced columns from its old
572+
// input. After the swap the new input is the ProjectionExec which
573+
// already handles column selection, so clear the projection here.
574+
.apply_projection(None)?
571575
.build()
572576
.map(|e| Some(Arc::new(e) as _));
573577
}
@@ -2572,4 +2576,67 @@ mod tests {
25722576
);
25732577
Ok(())
25742578
}
2579+
2580+
/// Regression test: ProjectionExec on top of a FilterExec that already has
2581+
/// an explicit projection must not panic when `try_swapping_with_projection`
2582+
/// attempts to swap the two nodes.
2583+
///
2584+
/// Before the fix, `FilterExecBuilder::from(self)` copied the old projection
2585+
/// (e.g. `[0, 1, 2]`) from the FilterExec. After `.with_input` replaced the
2586+
/// input with the narrower ProjectionExec (2 columns), `.build()` tried to
2587+
/// validate the stale `[0, 1, 2]` projection against the 2-column schema and
2588+
/// panicked with "project index 2 out of bounds, max field 2".
2589+
#[test]
2590+
fn test_filter_with_projection_swap_does_not_panic() -> Result<()> {
2591+
use crate::projection::ProjectionExpr;
2592+
use datafusion_physical_expr::expressions::col;
2593+
2594+
// Schema: [ts: Int64, tokens: Int64, svc: Utf8]
2595+
let schema = Arc::new(Schema::new(vec![
2596+
Field::new("ts", DataType::Int64, false),
2597+
Field::new("tokens", DataType::Int64, false),
2598+
Field::new("svc", DataType::Utf8, false),
2599+
]));
2600+
let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
2601+
2602+
// FilterExec: ts > 0, projection=[ts@0, tokens@1, svc@2] (all 3 cols)
2603+
let predicate = Arc::new(BinaryExpr::new(
2604+
Arc::new(Column::new("ts", 0)),
2605+
Operator::Gt,
2606+
Arc::new(Literal::new(ScalarValue::Int64(Some(0)))),
2607+
));
2608+
let filter = Arc::new(
2609+
FilterExecBuilder::new(predicate, input)
2610+
.apply_projection(Some(vec![0, 1, 2]))?
2611+
.build()?,
2612+
);
2613+
2614+
// ProjectionExec: narrows to [ts, tokens] (drops svc)
2615+
let proj_exprs = vec![
2616+
ProjectionExpr {
2617+
expr: col("ts", &filter.schema())?,
2618+
alias: "ts".to_string(),
2619+
},
2620+
ProjectionExpr {
2621+
expr: col("tokens", &filter.schema())?,
2622+
alias: "tokens".to_string(),
2623+
},
2624+
];
2625+
let projection = Arc::new(ProjectionExec::try_new(
2626+
proj_exprs,
2627+
Arc::clone(&filter) as _,
2628+
)?);
2629+
2630+
// This must not panic
2631+
let result = filter.try_swapping_with_projection(&projection)?;
2632+
assert!(result.is_some(), "swap should succeed");
2633+
2634+
let new_plan = result.unwrap();
2635+
// Output schema must still be [ts, tokens]
2636+
let out_schema = new_plan.schema();
2637+
assert_eq!(out_schema.fields().len(), 2);
2638+
assert_eq!(out_schema.field(0).name(), "ts");
2639+
assert_eq!(out_schema.field(1).name(), "tokens");
2640+
Ok(())
2641+
}
25752642
}

0 commit comments

Comments
 (0)