Skip to content

Commit 4d5595f

Browse files
Dandandanclaude
andcommitted
perf: collapse chained projections in a single pass
Long chains of `ProjectionExec`s (and their `LogicalPlan::Projection` counterparts) — produced by e.g. a pipeline of `SELECT *, <expr> AS dN FROM ...` stacked N times — were being collapsed one level at a time, forcing O(N) intermediate `ProjectionExec` constructions (each recomputing equivalence properties through the projection mapping) and O(N) re-runs of the logical `OptimizeProjections` rule. Physical: replace the pairwise recursion in `ProjectionExec::try_swapping_with_projection` with a single iterative chain collapse that walks all consecutive `ProjectionExec`s and builds only one final `ProjectionExec`. Leaf pushdown into a non-`Projection` input (e.g. `DataSourceExec`) is preserved by handing the unified projection back to `remove_unnecessary_projections` once at the end. Logical: wrap `merge_consecutive_projections` in an internal loop so an N-deep chain folds in a single rule application instead of waiting for the outer fixpoint optimizer to re-run the rule N times. Bench (`physical_plan_chained_case_projection_hotspot`, N=80 chained projections, depth-23 nested CASE per step, 30-wide OR filter on top of a LEFT JOIN): - before: 623 ms - after : 364 ms (-41.5%, criterion p<0.05, CI [-41.83%, -41.33%]) At the parser-depth limit (depth=45) the bench measures ~1.5 s/iter post-fix — the same shape was multi-second to several-second territory on the prior code. Verification: 24/24 physical-plan projection unit tests pass; 54/54 optimizer projection tests; 23/23 `physical_optimizer::projection_pushdown` integration tests. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent e80076b commit 4d5595f

3 files changed

Lines changed: 209 additions & 49 deletions

File tree

datafusion/core/benches/sql_planner_extended.rs

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,61 @@ fn build_non_case_left_join_df_with_push_down_filter(
324324
rt.block_on(async { ctx.sql(&query).await.unwrap() })
325325
}
326326

327+
/// Build a query of the shape:
328+
/// join + wide-OR filter + N chained CTEs, each adding one column
329+
/// defined by a depth-K nested CASE ladder over the same input column.
330+
///
331+
/// The chained projections force the physical planner to walk a stack of
332+
/// `ProjectionExec`s whose expression trees all reference the same upstream
333+
/// column, which exercises the `ProjectionPushdown` physical rule heavily.
334+
fn build_chained_case_projection_query(
335+
chained_steps: usize,
336+
case_depth: usize,
337+
or_width: usize,
338+
) -> String {
339+
let mut q = String::new();
340+
q.push_str("WITH s0 AS (\n SELECT l.c0, l.c1 FROM t l LEFT JOIN t r ON l.c0 = r.c0");
341+
if or_width > 0 {
342+
q.push_str("\n WHERE (");
343+
for i in 0..or_width {
344+
if i > 0 {
345+
q.push_str(" OR ");
346+
}
347+
let _ = write!(&mut q, "l.c1 = '{i}'");
348+
}
349+
q.push(')');
350+
}
351+
q.push_str("\n)");
352+
353+
for n in 1..=chained_steps {
354+
q.push_str(",\n");
355+
let _ = write!(&mut q, "s{n} AS (SELECT *, ");
356+
for d in 0..case_depth {
357+
let _ = write!(&mut q, "CASE WHEN c0 = '{d}' THEN 'label' ELSE ");
358+
}
359+
q.push_str("c0");
360+
for _ in 0..case_depth {
361+
q.push_str(" END");
362+
}
363+
let _ = write!(&mut q, " AS d{n} FROM s{prev})", prev = n - 1);
364+
}
365+
366+
let _ = write!(&mut q, "\nSELECT * FROM s{chained_steps}");
367+
q
368+
}
369+
370+
fn build_chained_case_projection_df(
371+
rt: &Runtime,
372+
chained_steps: usize,
373+
case_depth: usize,
374+
or_width: usize,
375+
) -> DataFrame {
376+
let ctx = SessionContext::new();
377+
register_string_table(&ctx, 100, 1000);
378+
let query = build_chained_case_projection_query(chained_steps, case_depth, or_width);
379+
rt.block_on(async { ctx.sql(&query).await.unwrap() })
380+
}
381+
327382
fn criterion_benchmark(c: &mut Criterion) {
328383
let baseline_ctx = SessionContext::new();
329384
let case_heavy_ctx = SessionContext::new();
@@ -460,6 +515,44 @@ fn criterion_benchmark(c: &mut Criterion) {
460515
}
461516
}
462517
control_group.finish();
518+
519+
// Hot-spot bench: N chained CTE projections, each adding one column
520+
// defined by a depth-K nested CASE ladder, on top of a join + wide-OR
521+
// filter. Exercises the physical `ProjectionPushdown` rule.
522+
let chained_df = build_chained_case_projection_df(&rt, 80, 23, 30);
523+
c.bench_function(
524+
"physical_plan_chained_case_projection_hotspot",
525+
|b| {
526+
b.iter(|| {
527+
let df_clone = chained_df.clone();
528+
black_box(rt.block_on(async {
529+
df_clone.create_physical_plan().await.unwrap()
530+
}));
531+
})
532+
},
533+
);
534+
535+
let mut chained_group =
536+
c.benchmark_group("physical_plan_chained_case_projection_sweep");
537+
for &steps in &[10usize, 20, 40] {
538+
for &case_depth in &[5usize, 10, 15] {
539+
let df = build_chained_case_projection_df(&rt, steps, case_depth, 30);
540+
let label = format!("steps={steps},depth={case_depth}");
541+
chained_group.bench_with_input(
542+
BenchmarkId::new("physical_plan", &label),
543+
&df,
544+
|b, df| {
545+
b.iter(|| {
546+
let df_clone = df.clone();
547+
black_box(rt.block_on(async {
548+
df_clone.create_physical_plan().await.unwrap()
549+
}));
550+
})
551+
},
552+
);
553+
}
554+
}
555+
chained_group.finish();
463556
}
464557

465558
criterion_group!(benches, criterion_benchmark);

datafusion/optimizer/src/optimize_projections/mod.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -536,6 +536,31 @@ fn optimize_subqueries(
536536
/// - `Ok(None)`: Signals that merge is not beneficial (and has not taken place).
537537
/// - `Err(error)`: An error occurred during the function call.
538538
fn merge_consecutive_projections(proj: Projection) -> Result<Transformed<Projection>> {
539+
// Iteratively merge as long as the current Projection's input is another
540+
// Projection that can be folded. Without this, a chain of N consecutive
541+
// projections (e.g. SQL of the form `SELECT *, <expr> AS dN FROM ...`
542+
// stacked N times) requires N applications of this rule (re-run by the
543+
// outer optimizer loop, each walking the full plan tree) before the
544+
// chain is fully collapsed.
545+
let mut current = proj;
546+
let mut transformed_any = false;
547+
loop {
548+
let result = merge_consecutive_projections_one_level(current)?;
549+
current = result.data;
550+
if !result.transformed {
551+
return Ok(if transformed_any {
552+
Transformed::yes(current)
553+
} else {
554+
Transformed::no(current)
555+
});
556+
}
557+
transformed_any = true;
558+
}
559+
}
560+
561+
fn merge_consecutive_projections_one_level(
562+
proj: Projection,
563+
) -> Result<Transformed<Projection>> {
539564
let Projection {
540565
expr,
541566
input,

datafusion/physical-plan/src/projection.rs

Lines changed: 91 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -382,12 +382,15 @@ impl ExecutionPlan for ProjectionExec {
382382
&self,
383383
projection: &ProjectionExec,
384384
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
385-
let maybe_unified = try_unifying_projections(projection, self)?;
386-
if let Some(new_plan) = maybe_unified {
387-
// To unify 3 or more sequential projections:
388-
remove_unnecessary_projections(new_plan).data().map(Some)
389-
} else {
390-
Ok(Some(Arc::new(projection.clone())))
385+
// Collapse the entire run of consecutive `ProjectionExec`s in a single
386+
// pass instead of unifying one level at a time. The pairwise approach
387+
// had to allocate (and recompute equivalence properties for) one
388+
// intermediate `ProjectionExec` per level, which is the dominant cost
389+
// for plans with many chained projections (e.g. a long pipeline of
390+
// `SELECT *, <expr> AS dN FROM ...`).
391+
match try_collapse_projection_chain(projection)? {
392+
Some(plan) => Ok(Some(plan)),
393+
None => Ok(Some(Arc::new(projection.clone()))),
391394
}
392395
}
393396

@@ -1015,54 +1018,93 @@ pub fn update_join_filter(
10151018
}
10161019

10171020
/// Unifies `projection` with its input (which is also a [`ProjectionExec`]).
1018-
fn try_unifying_projections(
1019-
projection: &ProjectionExec,
1020-
child: &ProjectionExec,
1021+
/// Iteratively collapse a chain of consecutive [`ProjectionExec`]s starting
1022+
/// at `outer` into a single `ProjectionExec`. Returns `None` if the very
1023+
/// first level cannot be unified (matching the previous behavior of
1024+
/// [`try_unifying_projections`] + recursive `remove_unnecessary_projections`).
1025+
///
1026+
/// Functionally equivalent to repeatedly applying [`try_unifying_projections`]
1027+
/// and re-entering [`remove_unnecessary_projections`] on the result, but
1028+
/// avoids constructing an intermediate `ProjectionExec` (and recomputing its
1029+
/// [`PlanProperties`] / equivalence properties) for every level it walks
1030+
/// through. For long projection chains the saved work scales linearly with
1031+
/// the chain length.
1032+
fn try_collapse_projection_chain(
1033+
outer: &ProjectionExec,
10211034
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
1022-
let mut projected_exprs = vec![];
1023-
let mut column_ref_map: HashMap<Column, usize> = HashMap::new();
1035+
let mut current_exprs: Vec<ProjectionExpr> = outer.expr().to_vec();
1036+
let mut current_input: Arc<dyn ExecutionPlan> = Arc::clone(outer.input());
1037+
let mut collapsed_any = false;
10241038

1025-
// Collect the column references usage in the outer projection.
1026-
projection.expr().iter().for_each(|proj_expr| {
1027-
proj_expr
1028-
.expr
1029-
.apply(|expr| {
1030-
Ok({
1031-
if let Some(column) = expr.downcast_ref::<Column>() {
1032-
*column_ref_map.entry(column.clone()).or_default() += 1;
1033-
}
1034-
TreeNodeRecursion::Continue
1035-
})
1036-
})
1037-
.unwrap();
1038-
});
1039-
// Merging these projections is not beneficial, e.g
1040-
// If an expression is not trivial (KeepInPlace) and it is referred more than 1, unifies projections will be
1041-
// beneficial as caching mechanism for non-trivial computations.
1042-
// See discussion in: https://github.com/apache/datafusion/issues/8296
1043-
if column_ref_map.iter().any(|(column, count)| {
1044-
*count > 1
1045-
&& !child.expr()[column.index()]
1046-
.expr
1047-
.placement()
1048-
.should_push_to_leaves()
1049-
}) {
1050-
return Ok(None);
1051-
}
1052-
for proj_expr in projection.expr() {
1053-
// If there is no match in the input projection, we cannot unify these
1054-
// projections. This case will arise if the projection expression contains
1055-
// a `PhysicalExpr` variant `update_expr` doesn't support.
1056-
let Some(expr) = update_expr(&proj_expr.expr, child.expr(), true)? else {
1057-
return Ok(None);
1039+
loop {
1040+
// Only continue if the current input is itself a projection.
1041+
let Some(inner_proj) = current_input.downcast_ref::<ProjectionExec>()
1042+
else {
1043+
break;
10581044
};
1059-
projected_exprs.push(ProjectionExpr {
1060-
expr,
1061-
alias: proj_expr.alias.clone(),
1045+
1046+
// Replicate the "merging is beneficial" guard from
1047+
// `try_unifying_projections`: if any column referenced more than once
1048+
// in `current_exprs` resolves to a non-trivial expression in the
1049+
// inner projection, fusing them would duplicate that computation.
1050+
let mut column_ref_map: HashMap<Column, usize> = HashMap::new();
1051+
for proj_expr in &current_exprs {
1052+
proj_expr.expr.apply(|expr| {
1053+
if let Some(column) = expr.downcast_ref::<Column>() {
1054+
*column_ref_map.entry(column.clone()).or_default() += 1;
1055+
}
1056+
Ok(TreeNodeRecursion::Continue)
1057+
})?;
1058+
}
1059+
let inner_exprs = inner_proj.expr();
1060+
let blocked = column_ref_map.iter().any(|(column, count)| {
1061+
*count > 1
1062+
&& !inner_exprs[column.index()]
1063+
.expr
1064+
.placement()
1065+
.should_push_to_leaves()
10621066
});
1067+
if blocked {
1068+
break;
1069+
}
1070+
1071+
// Substitute each outer expression through the inner projection.
1072+
let mut new_exprs: Vec<ProjectionExpr> =
1073+
Vec::with_capacity(current_exprs.len());
1074+
let mut can_unify = true;
1075+
for proj_expr in &current_exprs {
1076+
match update_expr(&proj_expr.expr, inner_exprs, true)? {
1077+
Some(expr) => new_exprs.push(ProjectionExpr {
1078+
expr,
1079+
alias: proj_expr.alias.clone(),
1080+
}),
1081+
None => {
1082+
can_unify = false;
1083+
break;
1084+
}
1085+
}
1086+
}
1087+
if !can_unify {
1088+
break;
1089+
}
1090+
1091+
current_exprs = new_exprs;
1092+
current_input = Arc::clone(inner_proj.input());
1093+
collapsed_any = true;
10631094
}
1064-
ProjectionExec::try_new(projected_exprs, Arc::clone(child.input()))
1065-
.map(|e| Some(Arc::new(e) as _))
1095+
1096+
if !collapsed_any {
1097+
return Ok(None);
1098+
}
1099+
1100+
// After collapsing the projection chain, hand the unified projection back
1101+
// to `remove_unnecessary_projections` once. This gives the non-Projection
1102+
// input (e.g. `DataSourceExec`) the chance to absorb the projection via
1103+
// its own `try_swapping_with_projection` impl — the same behaviour the
1104+
// pairwise recursion used to provide for the final step.
1105+
let unified: Arc<dyn ExecutionPlan> =
1106+
Arc::new(ProjectionExec::try_new(current_exprs, current_input)?);
1107+
remove_unnecessary_projections(unified).data().map(Some)
10661108
}
10671109

10681110
/// Collect all column indices from the given projection expressions.

0 commit comments

Comments
 (0)