Skip to content

Commit e292f33

Browse files
authored
perf(physical-optimizer): skip ensure_distribution rebuild when children are unchanged (#22521)
## Which issue does this PR close? - Closes #22520. ## Rationale for this change `ensure_distribution` in `datafusion/physical-optimizer/src/ensure_requirements/enforce_distribution.rs` unconditionally calls `plan.with_new_children(children_plans)` after collecting the (possibly redistributed) children, even when none of those children were actually replaced. For nodes like `ProjectionExec`, that path runs through `try_new` and recomputes the schema, equivalence properties, output ordering, and output partitioning, then allocates a new `Arc<dyn ExecutionPlan>`. When every child Arc is pointer-identical to the input, that work produces a logically identical node — pure overhead. The cost is amplified by two factors: 1. **Plan depth.** Workloads dominated by point queries (no join / aggregate / unmet ordering — i.e. nothing for `ensure_distribution` to inject a `RepartitionExec` or `SortExec` for) hit this wasted rebuild at every node in the plan. A 5–30 deep `ProjectionExec` stack pays the cost N times. 2. **Schema width.** Most steps inside `ProjectionExec::try_new` are `O(num_columns)`: per-column `data_type` / `nullable` lookup to build the new schema, per-column remapping of equivalence classes through the projection mapping, and per-column lookup when rewriting `PhysicalSortExpr`s into the output ordering. Wide schemas (tens of columns) make every wasted rebuild proportionally heavier. Profiling a production point-query workload (wide schemas, deep `ProjectionExec` stacks) showed `ProjectionExec::with_new_children` as the single largest cost inside `ensure_distribution`: - `ensure_distribution` total: 2.87s of a 60s CPU sample - `ProjectionExec::with_new_children`: 1.94s (56% of the rule) - `SortExec::with_new_children`: 0.11s - Other ExecutionPlan nodes: 0.82s ## What changes are included in this PR? After collecting `children_plans`, compare each new child Arc with the original via `Arc::ptr_eq`. When every child is unchanged, reuse the existing `plan` Arc and skip `with_new_children`. The `UnionExec` to `InterleaveExec` special case still runs first because it intentionally produces a new node even when child Arcs are unchanged. This relies on the fact that `ensure_distribution` already produces pointer-identical Arcs for children that need no redistribution (it threads the original Arc through unchanged), so `Arc::ptr_eq` precisely distinguishes "rewritten" from "untouched" children at O(1) per child. ## Are these changes tested? Yes. The existing `enforce_distribution` suite passes unchanged (66/66): ``` cargo test --release -p datafusion --test core_integration -- physical_optimizer::enforce_distribution ``` The behavior is observable only as a CPU reduction; correctness is preserved because `ExecutionPlan` nodes are immutable, so reusing the original Arc produces the same plan tree as `with_new_children(unchanged_children)` would have, just without the schema / ordering / equivalence / partitioning recomputation. ## Are there any user-facing changes? No. Same plans, lower planning time. ## Micro-benchmark Plan shape: 30-deep `ProjectionExec` stack over a sorted parquet scan, 5000 iterations. - Without fix: 852.74 ms total, 170.55 us/call - With fix: 296.81 ms total, 59.36 us/call - ~2.87x speedup, -65% CPU per call Wider schemas (more projection expressions per node) widen the gap further because each skipped `with_new_children` avoids more O(num_columns) work.
1 parent 11a79a6 commit e292f33

2 files changed

Lines changed: 48 additions & 2 deletions

File tree

datafusion/core/tests/physical_optimizer/enforce_distribution.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3971,3 +3971,38 @@ fn adjust_input_keys_ordering_no_transform_for_filter_scan() -> Result<()> {
39713971
);
39723972
Ok(())
39733973
}
3974+
3975+
/// Verifies the `ensure_distribution` fast path: when no child of a node is
3976+
/// replaced (no `RepartitionExec` or `SortExec` injection is required),
3977+
/// the rule must reuse the input `Arc<dyn ExecutionPlan>` unchanged instead
3978+
/// of calling `with_new_children`. For a deep `ProjectionExec` chain over a
3979+
/// single-partition scan with `target_partitions = 1`, every node hits this
3980+
/// fast path, so the root returned by `ensure_distribution` must be the
3981+
/// same `Arc` as the input.
3982+
///
3983+
/// Regression test for the optimization that avoids
3984+
/// `ProjectionExec::with_new_children` (which recomputes schema, equivalence
3985+
/// properties, output ordering, and partitioning) on the common point-query
3986+
/// plan shape.
3987+
#[test]
3988+
fn ensure_distribution_reuses_plan_arc_when_no_redistribution_needed() -> Result<()> {
3989+
let scan = parquet_exec();
3990+
let proj1 = projection_exec_with_alias(
3991+
scan,
3992+
vec![
3993+
("a".to_string(), "a".to_string()),
3994+
("b".to_string(), "b".to_string()),
3995+
],
3996+
);
3997+
let proj2 =
3998+
projection_exec_with_alias(proj1, vec![("a".to_string(), "a".to_string())]);
3999+
let plan: Arc<dyn ExecutionPlan> = proj2;
4000+
4001+
let result = ensure_distribution_helper(Arc::clone(&plan), 1, false)?;
4002+
4003+
assert!(
4004+
Arc::ptr_eq(&result, &plan),
4005+
"ensure_distribution must reuse the input Arc when no children require redistribution"
4006+
);
4007+
Ok(())
4008+
}

datafusion/physical-optimizer/src/ensure_requirements/enforce_distribution.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,9 @@ use datafusion_physical_plan::tree_node::PlanContext;
6565
use datafusion_physical_plan::union::{InterleaveExec, UnionExec, can_interleave};
6666
use datafusion_physical_plan::windows::WindowAggExec;
6767
use datafusion_physical_plan::windows::{BoundedWindowAggExec, get_best_fitting_window};
68-
use datafusion_physical_plan::{Distribution, ExecutionPlan, Partitioning};
68+
use datafusion_physical_plan::{
69+
Distribution, ExecutionPlan, Partitioning, with_new_children_if_necessary,
70+
};
6971

7072
use itertools::izip;
7173

@@ -1362,7 +1364,16 @@ pub fn ensure_distribution(
13621364
// Data
13631365
Arc::new(InterleaveExec::try_new(children_plans)?)
13641366
} else {
1365-
plan.with_new_children(children_plans)?
1367+
// Route through `with_new_children_if_necessary` so the common
1368+
// case where no child was replaced above skips the expensive
1369+
// `with_new_children` rebuild. For nodes like `ProjectionExec`,
1370+
// `with_new_children` recomputes schema / equivalence properties /
1371+
// output ordering via `try_new` even when the input Arcs are
1372+
// identical, which dominates `ensure_distribution` time on deep
1373+
// projection stacks over plans where no distribution change
1374+
// applies (point queries with no join / aggregate / unmet
1375+
// ordering).
1376+
with_new_children_if_necessary(plan, children_plans)?
13661377
};
13671378

13681379
Ok(Transformed::yes(DistributionContext::new(

0 commit comments

Comments
 (0)