Skip to content

Commit 2e7b8e1

Browse files
wirybeaverclaude
andauthored
fix(physical-plan): make HashJoinExec dynamic filter pushdown idempotent (#22523)
## Which issue does this PR close? Related to apache/datafusion-ballista#1359 ## Rationale Ballista's Adaptive Query Execution (AQE) planner re-invokes DataFusion's full `PhysicalOptimizer` chain after every completed stage. `FilterPushdown::new_post_optimization()` is not idempotent on plans containing `HashJoinExec`. In the `Post` phase, `HashJoinExec::gather_filters_for_pushdown` unconditionally creates a new `DynamicFilterPhysicalExpr` and installs it on the probe-side child via `with_self_filter`. After pass 1 the join already carries a `dynamic_filter: Some(...)`, and the shared `Arc<DynamicFilterPhysicalExpr>` is already wired into the probe-side scan's predicate. On pass 2 a *second* dynamic filter is created and ANDed onto the existing predicate, producing `DynamicFilter AND DynamicFilter`. Each subsequent pass adds another duplicate, compounding indefinitely in AQE replan loops. ## What changes are included in this PR? - **Guard in `HashJoinExec::gather_filters_for_pushdown`**: skip dynamic-filter creation when `self.dynamic_filter.is_some()`, meaning a previous pass already installed one. The existing `Arc` remains valid and correctly wired into the probe-side scan. - **Comment** explaining why the guard is needed (AQE replan context). - **Test** `post_phase_is_idempotent_on_hash_join` in `tests/physical_optimizer/filter_pushdown.rs`: builds a `HashJoinExec`, runs `FilterPushdown::new_post_optimization()` twice, and asserts structural equality via `get_plan_string`. ## Are these changes tested? Yes. The new test fails without the fix (plan strings diverge due to duplicated dynamic filter predicates) and passes with it. ## Are there any user-facing changes? No. Dynamic filter pushdown is an internal optimization; the idempotence guard only affects re-optimization scenarios (AQE). --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent b6d4c25 commit 2e7b8e1

2 files changed

Lines changed: 46 additions & 1 deletion

File tree

datafusion/core/tests/physical_optimizer/filter_pushdown.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3269,3 +3269,42 @@ fn test_filter_pushdown_through_sort_with_projection() {
32693269
"
32703270
);
32713271
}
3272+
3273+
/// `FilterPushdown::new_post_optimization()` must be idempotent. When applied
3274+
/// to a HashJoinExec, the rule installs a dynamic filter on the probe-side
3275+
/// scan; before the fix in `HashJoinExec::gather_filters_for_pushdown`, every
3276+
/// invocation created a *new* `DynamicFilterPhysicalExpr` and ANDed it onto
3277+
/// the probe side's existing predicate, producing
3278+
/// `DynamicFilter AND DynamicFilter AND ...` after N passes.
3279+
///
3280+
/// AQE (datafusion-ballista#1359) re-runs the optimizer chain after every
3281+
/// completed stage, so this would compound indefinitely without the guard.
3282+
#[test]
3283+
fn post_phase_is_idempotent_on_hash_join() {
3284+
use crate::physical_optimizer::test_utils::{hash_join_exec, parquet_exec, schema};
3285+
use datafusion_common::JoinType;
3286+
use datafusion_physical_expr::expressions::Column;
3287+
use datafusion_physical_optimizer::filter_pushdown::FilterPushdown;
3288+
use datafusion_physical_plan::get_plan_string;
3289+
use datafusion_physical_plan::joins::utils::JoinOn;
3290+
3291+
let s = schema();
3292+
let left = parquet_exec(Arc::clone(&s));
3293+
let right = parquet_exec(Arc::clone(&s));
3294+
let join_on: JoinOn = vec![(
3295+
Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()),
3296+
Arc::new(Column::new_with_schema("a", &right.schema()).unwrap()),
3297+
)];
3298+
let plan = hash_join_exec(left, right, join_on, None, &JoinType::Inner).unwrap();
3299+
3300+
let config = ConfigOptions::new();
3301+
let rule = FilterPushdown::new_post_optimization();
3302+
let once = rule.optimize(plan, &config).unwrap();
3303+
let twice = rule.optimize(Arc::clone(&once), &config).unwrap();
3304+
3305+
assert_eq!(
3306+
get_plan_string(&once),
3307+
get_plan_string(&twice),
3308+
"second invocation of FilterPushdown::new_post_optimization mutated the plan",
3309+
);
3310+
}

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1635,8 +1635,14 @@ impl ExecutionPlan for HashJoinExec {
16351635
ChildFilterDescription::all_unsupported(&parent_filters)
16361636
};
16371637

1638-
// Add dynamic filters in Post phase if enabled
1638+
// Add dynamic filters in Post phase if enabled. Skip when this join
1639+
// already carries a dynamic filter from a previous pass — the shared
1640+
// `Arc<DynamicFilterPhysicalExpr>` is still wired into the probe-side
1641+
// scan's predicate, and re-creating it would AND a fresh duplicate
1642+
// onto every Post-phase invocation (apache/datafusion-ballista#1359
1643+
// surfaces this in AQE replan loops).
16391644
if phase == FilterPushdownPhase::Post
1645+
&& self.dynamic_filter.is_none()
16401646
&& self.allow_join_dynamic_filter_pushdown(config)
16411647
{
16421648
// Add actual dynamic filter to right side (probe side)

0 commit comments

Comments
 (0)