Skip to content

Commit d9ea38b

Browse files
wirybeaverclaude
andauthored
fix(physical-optimizer): make OutputRequirements idempotent (#22522)
## Which issue does this PR close? Related to apache/datafusion-ballista#1359 ## Rationale Ballista's Adaptive Query Execution (AQE) planner re-invokes DataFusion's full `PhysicalOptimizer` chain after every completed stage (`AdaptivePlanner::replan_stages`). Rules that are not idempotent (`rule(rule(x)) != rule(x)`) stack execution-plan nodes on each pass. `OutputRequirements::new_add_mode()` wraps the plan root with `OutputRequirementExec` to preserve global ordering/distribution requirements. On a second pass the wrapper's `maintains_input_order() == [true]` and `required_input_ordering() == [None]` cause `require_top_ordering_helper` to recurse through it and produce a *second* wrapper, yielding `OutputRequirementExec(OutputRequirementExec(...))`. Each AQE replan adds another layer. ## What changes are included in this PR? - **Guard in `require_top_ordering()`**: if the plan root is already an `OutputRequirementExec`, return it unchanged. This makes the rule idempotent with zero overhead for single-pass use. - **Doc-comment update** on `new_add_mode()` and `require_top_ordering()` documenting the idempotence guarantee. - **Two tests** in `tests/physical_optimizer/output_requirements.rs`: - `add_mode_is_idempotent_on_bare_scan` — bare `ParquetExec` (exercises `is_changed = false` path). - `add_mode_is_idempotent_on_sorted_plan` — `SortExec → ParquetExec` (exercises `is_changed = true` path). ## Are these changes tested? Yes. Two new tests run the rule twice on distinct fixtures and assert structural equality via `get_plan_string`. Both fail without the fix (double-wrapped `OutputRequirementExec`) and pass with it. ## Are there any user-facing changes? No. `OutputRequirementExec` is an internal ancillary node stripped before execution; the idempotence guard only affects re-optimization scenarios (AQE). Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent fa724c1 commit d9ea38b

3 files changed

Lines changed: 77 additions & 1 deletion

File tree

datafusion/core/tests/physical_optimizer/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ mod join_selection;
3030
#[expect(clippy::needless_pass_by_value)]
3131
mod limit_pushdown;
3232
mod limited_distinct_aggregation;
33+
mod output_requirements;
3334
mod partition_statistics;
3435
mod projection_pushdown;
3536
mod pushdown_sort;
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::sync::Arc;
19+
20+
use crate::physical_optimizer::test_utils::{parquet_exec, schema, sort_exec, sort_expr};
21+
22+
use datafusion_common::config::ConfigOptions;
23+
use datafusion_physical_expr_common::sort_expr::LexOrdering;
24+
use datafusion_physical_optimizer::PhysicalOptimizerRule;
25+
use datafusion_physical_optimizer::output_requirements::OutputRequirements;
26+
use datafusion_physical_plan::ExecutionPlan;
27+
use datafusion_physical_plan::get_plan_string;
28+
29+
/// `OutputRequirements::new_add_mode()` must be idempotent: re-applying it to
30+
/// its own output must not stack additional `OutputRequirementExec` wrappers.
31+
///
32+
/// AQE (datafusion-ballista#1359) re-runs the optimizer chain after every
33+
/// completed stage; without this guarantee, every replan adds another wrapper.
34+
#[test]
35+
fn add_mode_is_idempotent_on_bare_scan() {
36+
// Exercises the path where `require_top_ordering_helper` returns
37+
// `is_changed = false` and the rule adds a default (empty-requirement)
38+
// wrapper.
39+
assert_add_mode_idempotent(parquet_exec(schema()));
40+
}
41+
42+
#[test]
43+
fn add_mode_is_idempotent_on_sorted_plan() {
44+
// Exercises the path where the helper recognizes a top-level `SortExec`
45+
// and produces a wrapper carrying that ordering requirement
46+
// (`is_changed = true` branch).
47+
let s = schema();
48+
let ordering: LexOrdering = [sort_expr("a", &s)].into();
49+
let plan = sort_exec(ordering, parquet_exec(Arc::clone(&s)));
50+
assert_add_mode_idempotent(plan);
51+
}
52+
53+
fn assert_add_mode_idempotent(plan: Arc<dyn ExecutionPlan>) {
54+
let config = ConfigOptions::new();
55+
let rule = OutputRequirements::new_add_mode();
56+
57+
let once = rule.optimize(plan, &config).unwrap();
58+
let twice = rule.optimize(Arc::clone(&once), &config).unwrap();
59+
60+
assert_eq!(
61+
get_plan_string(&once),
62+
get_plan_string(&twice),
63+
"second invocation of OutputRequirements::new_add_mode mutated the plan",
64+
);
65+
}

datafusion/physical-optimizer/src/output_requirements.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,9 @@ impl OutputRequirements {
6161
/// Create a new rule which works in `Add` mode; i.e. it simply adds a
6262
/// top-level [`OutputRequirementExec`] into the physical plan to keep track
6363
/// of global ordering and distribution requirements if there are any.
64-
/// Note that this rule should run at the beginning.
64+
/// Note that this rule should run at the beginning. It is idempotent: when
65+
/// invoked on a plan that is already topped by an `OutputRequirementExec`,
66+
/// it returns the plan unchanged.
6567
pub fn new_add_mode() -> Self {
6668
Self {
6769
mode: RuleMode::Add,
@@ -325,7 +327,15 @@ impl PhysicalOptimizerRule for OutputRequirements {
325327

326328
/// This functions adds ancillary `OutputRequirementExec` to the physical plan, so that
327329
/// global requirements are not lost during optimization.
330+
///
331+
/// Idempotent: if the plan is already topped by an `OutputRequirementExec`, it
332+
/// is returned unchanged so that re-running this rule (as adaptive execution
333+
/// in datafusion-ballista AQE does after every completed stage, see
334+
/// datafusion-ballista#1359) does not stack wrappers.
328335
fn require_top_ordering(plan: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn ExecutionPlan>> {
336+
if plan.downcast_ref::<OutputRequirementExec>().is_some() {
337+
return Ok(plan);
338+
}
329339
let (new_plan, is_changed) = require_top_ordering_helper(plan)?;
330340
if is_changed {
331341
Ok(new_plan)

0 commit comments

Comments
 (0)