Skip to content

Commit f6e8d68

Browse files
Dandandanclaude
andcommitted
simplify: trim verbose comments, restore #8296 link, tidy loop
- Cut WHAT-comments and PR-message-style prose from the diff; keep only non-obvious WHY (the #8296 link is back on the merging-beneficial guard, one-line note on why we hand the unified projection back to `remove_unnecessary_projections` for leaf absorption). - Drop the orphaned `/// Unifies projection with its input ...` doc line left over from the removed `try_unifying_projections`. - Hoist `column_ref_map` out of the per-iteration allocation and `.clear()` it each step. - Replace the `can_unify` flag with a labeled `break 'outer` and destructure `Transformed { data, transformed, .. }` in the optimizer-side loop. No behaviour change. All projection unit + integration tests still pass (24 + 54 + 23). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent eb4ac80 commit f6e8d68

3 files changed

Lines changed: 33 additions & 71 deletions

File tree

datafusion/core/benches/sql_planner_extended.rs

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -324,13 +324,9 @@ fn build_non_case_left_join_df_with_push_down_filter(
324324
rt.block_on(async { ctx.sql(&query).await.unwrap() })
325325
}
326326

327-
/// Build a query of the shape:
328-
/// join + wide-OR filter + N chained CTEs, each adding one column
329-
/// defined by a depth-K nested CASE ladder over the same input column.
330-
///
331-
/// The chained projections force the physical planner to walk a stack of
332-
/// `ProjectionExec`s whose expression trees all reference the same upstream
333-
/// column, which exercises the `ProjectionPushdown` physical rule heavily.
327+
/// Join + wide-OR filter + N chained CTEs, each adding one column defined
328+
/// by a depth-K nested CASE ladder over the same input column. Exercises
329+
/// the physical `ProjectionPushdown` rule on long projection chains.
334330
fn build_chained_case_projection_query(
335331
chained_steps: usize,
336332
case_depth: usize,
@@ -516,9 +512,6 @@ fn criterion_benchmark(c: &mut Criterion) {
516512
}
517513
control_group.finish();
518514

519-
// Hot-spot bench: N chained CTE projections, each adding one column
520-
// defined by a depth-K nested CASE ladder, on top of a join + wide-OR
521-
// filter. Exercises the physical `ProjectionPushdown` rule.
522515
let chained_df = build_chained_case_projection_df(&rt, 80, 23, 30);
523516
c.bench_function(
524517
"physical_plan_chained_case_projection_hotspot",

datafusion/optimizer/src/optimize_projections/mod.rs

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -536,26 +536,25 @@ fn optimize_subqueries(
536536
/// - `Ok(None)`: Signals that merge is not beneficial (and has not taken place).
537537
/// - `Err(error)`: An error occurred during the function call.
538538
fn merge_consecutive_projections(proj: Projection) -> Result<Transformed<Projection>> {
539-
// Iteratively merge as long as the current Projection's input is another
540-
// Projection that can be folded. Without this, a chain of N consecutive
541-
// projections (e.g. SQL of the form `SELECT *, <expr> AS dN FROM ...`
542-
// stacked N times) requires N applications of this rule (re-run by the
543-
// outer optimizer loop, each walking the full plan tree) before the
544-
// chain is fully collapsed.
539+
// Collapse the whole chain in one pass; otherwise an N-deep chain needs
540+
// N outer optimizer passes to fully fold.
545541
let mut current = proj;
546542
let mut transformed_any = false;
547543
loop {
548-
let result = merge_consecutive_projections_one_level(current)?;
549-
current = result.data;
550-
if !result.transformed {
551-
return Ok(if transformed_any {
552-
Transformed::yes(current)
553-
} else {
554-
Transformed::no(current)
555-
});
544+
let Transformed {
545+
data, transformed, ..
546+
} = merge_consecutive_projections_one_level(current)?;
547+
current = data;
548+
if !transformed {
549+
break;
556550
}
557551
transformed_any = true;
558552
}
553+
Ok(if transformed_any {
554+
Transformed::yes(current)
555+
} else {
556+
Transformed::no(current)
557+
})
559558
}
560559

561560
fn merge_consecutive_projections_one_level(

datafusion/physical-plan/src/projection.rs

Lines changed: 17 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -382,12 +382,6 @@ impl ExecutionPlan for ProjectionExec {
382382
&self,
383383
projection: &ProjectionExec,
384384
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
385-
// Collapse the entire run of consecutive `ProjectionExec`s in a single
386-
// pass instead of unifying one level at a time. The pairwise approach
387-
// had to allocate (and recompute equivalence properties for) one
388-
// intermediate `ProjectionExec` per level, which is the dominant cost
389-
// for plans with many chained projections (e.g. a long pipeline of
390-
// `SELECT *, <expr> AS dN FROM ...`).
391385
match try_collapse_projection_chain(projection)? {
392386
Some(plan) => Ok(Some(plan)),
393387
None => Ok(Some(Arc::new(projection.clone()))),
@@ -1017,37 +1011,25 @@ pub fn update_join_filter(
10171011
})
10181012
}
10191013

1020-
/// Unifies `projection` with its input (which is also a [`ProjectionExec`]).
1021-
/// Iteratively collapse a chain of consecutive [`ProjectionExec`]s starting
1022-
/// at `outer` into a single `ProjectionExec`. Returns `None` if the very
1023-
/// first level cannot be unified (matching the previous behavior of
1024-
/// [`try_unifying_projections`] + recursive `remove_unnecessary_projections`).
1025-
///
1026-
/// Functionally equivalent to repeatedly applying [`try_unifying_projections`]
1027-
/// and re-entering [`remove_unnecessary_projections`] on the result, but
1028-
/// avoids constructing an intermediate `ProjectionExec` (and recomputing its
1029-
/// [`PlanProperties`] / equivalence properties) for every level it walks
1030-
/// through. For long projection chains the saved work scales linearly with
1031-
/// the chain length.
1014+
/// Collapse a chain of consecutive [`ProjectionExec`]s into one. Returns
1015+
/// `None` if nothing could be merged.
10321016
fn try_collapse_projection_chain(
10331017
outer: &ProjectionExec,
10341018
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
10351019
let mut current_exprs: Vec<ProjectionExpr> = outer.expr().to_vec();
10361020
let mut current_input: Arc<dyn ExecutionPlan> = Arc::clone(outer.input());
1021+
let mut column_ref_map: HashMap<Column, usize> = HashMap::new();
10371022
let mut collapsed_any = false;
10381023

1039-
loop {
1040-
// Only continue if the current input is itself a projection.
1024+
'outer: loop {
10411025
let Some(inner_proj) = current_input.downcast_ref::<ProjectionExec>()
10421026
else {
10431027
break;
10441028
};
10451029

1046-
// Replicate the "merging is beneficial" guard from
1047-
// `try_unifying_projections`: if any column referenced more than once
1048-
// in `current_exprs` resolves to a non-trivial expression in the
1049-
// inner projection, fusing them would duplicate that computation.
1050-
let mut column_ref_map: HashMap<Column, usize> = HashMap::new();
1030+
// Merging a referenced-more-than-once column whose inner expression is
1031+
// non-trivial would duplicate that computation. See #8296.
1032+
column_ref_map.clear();
10511033
for proj_expr in &current_exprs {
10521034
proj_expr.expr.apply(|expr| {
10531035
if let Some(column) = expr.downcast_ref::<Column>() {
@@ -1068,24 +1050,15 @@ fn try_collapse_projection_chain(
10681050
break;
10691051
}
10701052

1071-
// Substitute each outer expression through the inner projection.
1072-
let mut new_exprs: Vec<ProjectionExpr> =
1073-
Vec::with_capacity(current_exprs.len());
1074-
let mut can_unify = true;
1053+
let mut new_exprs = Vec::with_capacity(current_exprs.len());
10751054
for proj_expr in &current_exprs {
1076-
match update_expr(&proj_expr.expr, inner_exprs, true)? {
1077-
Some(expr) => new_exprs.push(ProjectionExpr {
1078-
expr,
1079-
alias: proj_expr.alias.clone(),
1080-
}),
1081-
None => {
1082-
can_unify = false;
1083-
break;
1084-
}
1085-
}
1086-
}
1087-
if !can_unify {
1088-
break;
1055+
let Some(expr) = update_expr(&proj_expr.expr, inner_exprs, true)? else {
1056+
break 'outer;
1057+
};
1058+
new_exprs.push(ProjectionExpr {
1059+
expr,
1060+
alias: proj_expr.alias.clone(),
1061+
});
10891062
}
10901063

10911064
current_exprs = new_exprs;
@@ -1097,11 +1070,8 @@ fn try_collapse_projection_chain(
10971070
return Ok(None);
10981071
}
10991072

1100-
// After collapsing the projection chain, hand the unified projection back
1101-
// to `remove_unnecessary_projections` once. This gives the non-Projection
1102-
// input (e.g. `DataSourceExec`) the chance to absorb the projection via
1103-
// its own `try_swapping_with_projection` impl — the same behaviour the
1104-
// pairwise recursion used to provide for the final step.
1073+
// Give the (now non-Projection) input a chance to absorb the unified
1074+
// projection via its own `try_swapping_with_projection` impl.
11051075
let unified: Arc<dyn ExecutionPlan> =
11061076
Arc::new(ProjectionExec::try_new(current_exprs, current_input)?);
11071077
remove_unnecessary_projections(unified).data().map(Some)

0 commit comments

Comments
 (0)