Skip to content

Commit fa187fc

Browse files
authored
[branch-53] perf: skip ensure_distribution rebuild when children are unchanged (#58)
1 parent d66824f commit fa187fc

2 files changed

Lines changed: 48 additions & 2 deletions

File tree

datafusion/core/tests/physical_optimizer/enforce_distribution.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3695,3 +3695,38 @@ fn adjust_input_keys_ordering_no_transform_for_filter_scan() -> Result<()> {
36953695
);
36963696
Ok(())
36973697
}
3698+
3699+
/// Verifies the `ensure_distribution` fast path: when no child of a node is
3700+
/// replaced (no `RepartitionExec` or `SortExec` injection is required),
3701+
/// the rule must reuse the input `Arc<dyn ExecutionPlan>` unchanged instead
3702+
/// of calling `with_new_children`. For a deep `ProjectionExec` chain over a
3703+
/// single-partition scan with `target_partitions = 1`, every node hits this
3704+
/// fast path, so the root returned by `ensure_distribution` must be the
3705+
/// same `Arc` as the input.
3706+
///
3707+
/// Regression test for the optimization that avoids
3708+
/// `ProjectionExec::with_new_children` (which recomputes schema, equivalence
3709+
/// properties, output ordering, and partitioning) on the common point-query
3710+
/// plan shape. Cherry-pick of upstream apache/datafusion#22521.
3711+
#[test]
3712+
fn ensure_distribution_reuses_plan_arc_when_no_redistribution_needed() -> Result<()> {
3713+
let scan = parquet_exec();
3714+
let proj1 = projection_exec_with_alias(
3715+
scan,
3716+
vec![
3717+
("a".to_string(), "a".to_string()),
3718+
("b".to_string(), "b".to_string()),
3719+
],
3720+
);
3721+
let proj2 =
3722+
projection_exec_with_alias(proj1, vec![("a".to_string(), "a".to_string())]);
3723+
let plan: Arc<dyn ExecutionPlan> = proj2;
3724+
3725+
let result = ensure_distribution_helper(Arc::clone(&plan), 1, false)?;
3726+
3727+
assert!(
3728+
Arc::ptr_eq(&result, &plan),
3729+
"ensure_distribution must reuse the input Arc when no children require redistribution"
3730+
);
3731+
Ok(())
3732+
}

datafusion/physical-optimizer/src/enforce_distribution.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,9 @@ use datafusion_physical_plan::tree_node::PlanContext;
5858
use datafusion_physical_plan::union::{InterleaveExec, UnionExec, can_interleave};
5959
use datafusion_physical_plan::windows::WindowAggExec;
6060
use datafusion_physical_plan::windows::{BoundedWindowAggExec, get_best_fitting_window};
61-
use datafusion_physical_plan::{Distribution, ExecutionPlan, Partitioning};
61+
use datafusion_physical_plan::{
62+
Distribution, ExecutionPlan, Partitioning, with_new_children_if_necessary,
63+
};
6264

6365
use itertools::izip;
6466

@@ -1429,7 +1431,16 @@ pub fn ensure_distribution(
14291431
// Data
14301432
Arc::new(InterleaveExec::try_new(children_plans)?)
14311433
} else {
1432-
plan.with_new_children(children_plans)?
1434+
// Route through `with_new_children_if_necessary` so the common
1435+
// case where no child was replaced above skips the expensive
1436+
// `with_new_children` rebuild. For nodes like `ProjectionExec`,
1437+
// `with_new_children` recomputes schema / equivalence properties /
1438+
// output ordering via `try_new` even when the input Arcs are
1439+
// identical, which dominates `ensure_distribution` time on deep
1440+
// projection stacks over plans where no distribution change
1441+
// applies (point queries with no join / aggregate / unmet
1442+
// ordering). Cherry-pick of upstream apache/datafusion#22521.
1443+
with_new_children_if_necessary(plan, children_plans)?
14331444
};
14341445

14351446
Ok(Transformed::yes(DistributionContext::new(

0 commit comments

Comments
 (0)