Skip to content

Commit 0da8961

Browse files
Dandandanclaude
andauthored
perf: collapse chained projections in a single optimizer pass; reduce memory usage / recursion (apache#22389)
## Which issue does this PR close? - Closes apache#22391. ## Rationale for this change Long chains of consecutive projections (e.g. `SELECT *, <expr> AS dN FROM ...` stacked N times, with depth-K nested CASE per step) caused planning to use multi-GB of RAM and tens of seconds of wall time, scaling superlinearly in chain length and CASE depth. See apache#22391 for the OOM/timing data. ## What changes are included in this PR? Three independent fixes targeting the same workload: 1. **Physical chain collapse** (`datafusion/physical-plan/src/projection.rs`): replace the pairwise recursive unification in `ProjectionExec::try_swapping_with_projection` with `try_collapse_projection_chain`, which walks the entire run of consecutive `ProjectionExec`s and builds **one** final `ProjectionExec` (saves N-1 intermediate constructions and their `compute_properties` calls). Leaf pushdown into a non-`Projection` input is preserved by calling `remove_unnecessary_projections` once at the end. 2. **Logical iterative merge** (`datafusion/optimizer/src/optimize_projections/mod.rs`): wrap `merge_consecutive_projections` in an internal loop so an N-deep `LogicalPlan::Projection` chain collapses in a single rule application instead of N outer fixpoint passes. 3. **`update_expr` Column-equality short-circuit** (`datafusion/physical-expr/src/projection.rs`): when substituting a Column with one that equals it (the pass-through case during chain collapse), return `Transformed::no` so `transform_up` does not rebuild the enclosing `CaseExpr` / `CaseBody`. This is the OOM fix — it eliminates the O(N² · K²) cascade of CASE allocations. ## Are these changes tested? - yes, existing test + added bench | Stage | Time | Δ vs master | |---|---:|---:| | master | 623 ms | — | | + chain collapse + logical merge loop | 364 ms | −41.5% | | + `update_expr` Column-equality short-circuit | **155 ms** | **−75.4%** | (criterion p<0.05, CI [−75.67%, −75.16%]) ## Are there any user-facing changes? No public API change. --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 4055e44 commit 0da8961

5 files changed

Lines changed: 160 additions & 56 deletions

File tree

datafusion/core/benches/sql_planner_extended.rs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,57 @@ fn build_non_case_left_join_df_with_push_down_filter(
324324
rt.block_on(async { ctx.sql(&query).await.unwrap() })
325325
}
326326

327+
/// Join + wide-OR filter + N chained CTEs, each adding one column defined
328+
/// by a depth-K nested CASE ladder over the same input column. Exercises
329+
/// the physical `ProjectionPushdown` rule on long projection chains.
330+
fn build_chained_case_projection_query(
331+
chained_steps: usize,
332+
case_depth: usize,
333+
or_width: usize,
334+
) -> String {
335+
let mut q = String::new();
336+
q.push_str("WITH s0 AS (\n SELECT l.c0, l.c1 FROM t l LEFT JOIN t r ON l.c0 = r.c0");
337+
if or_width > 0 {
338+
q.push_str("\n WHERE (");
339+
for i in 0..or_width {
340+
if i > 0 {
341+
q.push_str(" OR ");
342+
}
343+
let _ = write!(&mut q, "l.c1 = '{i}'");
344+
}
345+
q.push(')');
346+
}
347+
q.push_str("\n)");
348+
349+
for n in 1..=chained_steps {
350+
q.push_str(",\n");
351+
let _ = write!(&mut q, "s{n} AS (SELECT *, ");
352+
for d in 0..case_depth {
353+
let _ = write!(&mut q, "CASE WHEN c0 = '{d}' THEN 'label' ELSE ");
354+
}
355+
q.push_str("c0");
356+
for _ in 0..case_depth {
357+
q.push_str(" END");
358+
}
359+
let _ = write!(&mut q, " AS d{n} FROM s{prev})", prev = n - 1);
360+
}
361+
362+
let _ = write!(&mut q, "\nSELECT * FROM s{chained_steps}");
363+
q
364+
}
365+
366+
fn build_chained_case_projection_df(
367+
rt: &Runtime,
368+
chained_steps: usize,
369+
case_depth: usize,
370+
or_width: usize,
371+
) -> DataFrame {
372+
let ctx = SessionContext::new();
373+
register_string_table(&ctx, 100, 1000);
374+
let query = build_chained_case_projection_query(chained_steps, case_depth, or_width);
375+
rt.block_on(async { ctx.sql(&query).await.unwrap() })
376+
}
377+
327378
fn criterion_benchmark(c: &mut Criterion) {
328379
let baseline_ctx = SessionContext::new();
329380
let case_heavy_ctx = SessionContext::new();
@@ -460,6 +511,16 @@ fn criterion_benchmark(c: &mut Criterion) {
460511
}
461512
}
462513
control_group.finish();
514+
515+
let chained_df = build_chained_case_projection_df(&rt, 80, 23, 30);
516+
c.bench_function("physical_plan_chained_case_projection_hotspot", |b| {
517+
b.iter(|| {
518+
let df_clone = chained_df.clone();
519+
black_box(
520+
rt.block_on(async { df_clone.create_physical_plan().await.unwrap() }),
521+
);
522+
})
523+
});
463524
}
464525

465526
criterion_group!(benches, criterion_benchmark);

datafusion/optimizer/src/optimize_projections/mod.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -536,6 +536,30 @@ fn optimize_subqueries(
536536
/// - `Ok(None)`: Signals that merge is not beneficial (and has not taken place).
537537
/// - `Err(error)`: An error occurred during the function call.
538538
fn merge_consecutive_projections(proj: Projection) -> Result<Transformed<Projection>> {
539+
// Collapse the whole chain in one pass; otherwise an N-deep chain needs
540+
// N outer optimizer passes to fully fold.
541+
let mut current = proj;
542+
let mut transformed_any = false;
543+
loop {
544+
let Transformed {
545+
data, transformed, ..
546+
} = merge_consecutive_projections_one_level(current)?;
547+
current = data;
548+
if !transformed {
549+
break;
550+
}
551+
transformed_any = true;
552+
}
553+
Ok(if transformed_any {
554+
Transformed::yes(current)
555+
} else {
556+
Transformed::no(current)
557+
})
558+
}
559+
560+
fn merge_consecutive_projections_one_level(
561+
proj: Projection,
562+
) -> Result<Transformed<Projection>> {
539563
let Projection {
540564
expr,
541565
input,

datafusion/optimizer/tests/optimizer_integration.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -824,10 +824,9 @@ fn extension_node_does_not_block_projection_pruning() -> Result<()> {
824824
OpaqueRequirementsExtension
825825
Sort: t.a ASC NULLS FIRST, t.ts ASC NULLS FIRST
826826
Projection: t.a, CAST(t.ts AS Timestamp(ms, "UTC")) AS ts
827-
Projection: t.a, t.ts
828-
Filter: __common_expr_3 > TimestampMillisecond(1000, Some("UTC")) AND __common_expr_3 < TimestampMillisecond(2000, Some("UTC"))
829-
Projection: CAST(t.ts AS Timestamp(ms, "UTC")) AS __common_expr_3, t.a, t.ts
830-
TableScan: t projection=[a, ts], partial_filters=[t.ts > TimestampNanosecond(1000000000, None), t.ts < TimestampNanosecond(2000000000, None), CAST(t.ts AS Timestamp(ms, "UTC")) > TimestampMillisecond(1000, Some("UTC")), CAST(t.ts AS Timestamp(ms, "UTC")) < TimestampMillisecond(2000, Some("UTC"))]
827+
Filter: __common_expr_3 > TimestampMillisecond(1000, Some("UTC")) AND __common_expr_3 < TimestampMillisecond(2000, Some("UTC"))
828+
Projection: CAST(t.ts AS Timestamp(ms, "UTC")) AS __common_expr_3, t.a, t.ts
829+
TableScan: t projection=[a, ts], partial_filters=[t.ts > TimestampNanosecond(1000000000, None), t.ts < TimestampNanosecond(2000000000, None), CAST(t.ts AS Timestamp(ms, "UTC")) > TimestampMillisecond(1000, Some("UTC")), CAST(t.ts AS Timestamp(ms, "UTC")) < TimestampMillisecond(2000, Some("UTC"))]
831830
"#,
832831
);
833832

datafusion/physical-expr/src/projection.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -963,15 +963,24 @@ pub fn update_expr(
963963
return Ok(Transformed::no(expr));
964964
};
965965
if unproject {
966-
state = RewriteState::RewrittenValid;
967-
// Update the index of `column`:
968966
let projected_expr = projected_exprs.get(column.index()).ok_or_else(|| {
969967
internal_datafusion_err!(
970968
"Column index {} out of bounds for projected expressions of length {}",
971969
column.index(),
972970
projected_exprs.len()
973971
)
974972
})?;
973+
// Skip rebuilding the parent if substituting with an equal
974+
// Column (e.g. pass-through `c0@0` -> `c0@0` during chained
975+
// projection collapse). Without this, every CASE/BinaryExpr
976+
// containing such a Column is reconstructed unnecessarily.
977+
if let Some(projected_col) =
978+
projected_expr.expr.downcast_ref::<Column>()
979+
&& projected_col == column
980+
{
981+
return Ok(Transformed::no(expr));
982+
}
983+
state = RewriteState::RewrittenValid;
975984
Ok(Transformed::yes(Arc::clone(&projected_expr.expr)))
976985
} else {
977986
// default to invalid, in case we can't find the relevant column

datafusion/physical-plan/src/projection.rs

Lines changed: 61 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -382,12 +382,9 @@ impl ExecutionPlan for ProjectionExec {
382382
&self,
383383
projection: &ProjectionExec,
384384
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
385-
let maybe_unified = try_unifying_projections(projection, self)?;
386-
if let Some(new_plan) = maybe_unified {
387-
// To unify 3 or more sequential projections:
388-
remove_unnecessary_projections(new_plan).data().map(Some)
389-
} else {
390-
Ok(Some(Arc::new(projection.clone())))
385+
match try_collapse_projection_chain(projection)? {
386+
Some(plan) => Ok(Some(plan)),
387+
None => Ok(Some(Arc::new(projection.clone()))),
391388
}
392389
}
393390

@@ -1014,55 +1011,69 @@ pub fn update_join_filter(
10141011
})
10151012
}
10161013

1017-
/// Unifies `projection` with its input (which is also a [`ProjectionExec`]).
1018-
fn try_unifying_projections(
1019-
projection: &ProjectionExec,
1020-
child: &ProjectionExec,
1014+
/// Collapse a chain of consecutive [`ProjectionExec`]s into one. Returns
1015+
/// `None` if nothing could be merged.
1016+
fn try_collapse_projection_chain(
1017+
outer: &ProjectionExec,
10211018
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
1022-
let mut projected_exprs = vec![];
1019+
let mut current_exprs: Vec<ProjectionExpr> = outer.expr().to_vec();
1020+
let mut current_input: Arc<dyn ExecutionPlan> = Arc::clone(outer.input());
10231021
let mut column_ref_map: HashMap<Column, usize> = HashMap::new();
1022+
let mut collapsed_any = false;
1023+
1024+
'outer: while let Some(inner_proj) = current_input.downcast_ref::<ProjectionExec>() {
1025+
// Collect the column references usage in the outer projection.
1026+
column_ref_map.clear();
1027+
for proj_expr in &current_exprs {
1028+
proj_expr.expr.apply(|expr| {
1029+
if let Some(column) = expr.downcast_ref::<Column>() {
1030+
*column_ref_map.entry(column.clone()).or_default() += 1;
1031+
}
1032+
Ok(TreeNodeRecursion::Continue)
1033+
})?;
1034+
}
1035+
let inner_exprs = inner_proj.expr();
1036+
// Merging these projections is not beneficial, e.g
1037+
// If an expression is not trivial (KeepInPlace) and it is referred more than 1, unifies projections will be
1038+
// beneficial as caching mechanism for non-trivial computations.
1039+
// See discussion in: https://github.com/apache/datafusion/issues/8296
1040+
let blocked = column_ref_map.iter().any(|(column, count)| {
1041+
*count > 1
1042+
&& !inner_exprs[column.index()]
1043+
.expr
1044+
.placement()
1045+
.should_push_to_leaves()
1046+
});
1047+
if blocked {
1048+
break;
1049+
}
10241050

1025-
// Collect the column references usage in the outer projection.
1026-
projection.expr().iter().for_each(|proj_expr| {
1027-
proj_expr
1028-
.expr
1029-
.apply(|expr| {
1030-
Ok({
1031-
if let Some(column) = expr.downcast_ref::<Column>() {
1032-
*column_ref_map.entry(column.clone()).or_default() += 1;
1033-
}
1034-
TreeNodeRecursion::Continue
1035-
})
1036-
})
1037-
.unwrap();
1038-
});
1039-
// Merging these projections is not beneficial, e.g
1040-
// If an expression is not trivial (KeepInPlace) and it is referred more than 1, unifies projections will be
1041-
// beneficial as caching mechanism for non-trivial computations.
1042-
// See discussion in: https://github.com/apache/datafusion/issues/8296
1043-
if column_ref_map.iter().any(|(column, count)| {
1044-
*count > 1
1045-
&& !child.expr()[column.index()]
1046-
.expr
1047-
.placement()
1048-
.should_push_to_leaves()
1049-
}) {
1050-
return Ok(None);
1051+
let mut new_phys: Vec<Arc<dyn PhysicalExpr>> =
1052+
Vec::with_capacity(current_exprs.len());
1053+
for proj_expr in &current_exprs {
1054+
// If there is no match in the input projection, we cannot unify these
1055+
// projections. This case will arise if the projection expression contains
1056+
// a `PhysicalExpr` variant `update_expr` doesn't support.
1057+
let Some(expr) = update_expr(&proj_expr.expr, inner_exprs, true)? else {
1058+
break 'outer;
1059+
};
1060+
new_phys.push(expr);
1061+
}
1062+
for (proj_expr, expr) in current_exprs.iter_mut().zip(new_phys) {
1063+
proj_expr.expr = expr;
1064+
}
1065+
current_input = Arc::clone(inner_proj.input());
1066+
collapsed_any = true;
10511067
}
1052-
for proj_expr in projection.expr() {
1053-
// If there is no match in the input projection, we cannot unify these
1054-
// projections. This case will arise if the projection expression contains
1055-
// a `PhysicalExpr` variant `update_expr` doesn't support.
1056-
let Some(expr) = update_expr(&proj_expr.expr, child.expr(), true)? else {
1057-
return Ok(None);
1058-
};
1059-
projected_exprs.push(ProjectionExpr {
1060-
expr,
1061-
alias: proj_expr.alias.clone(),
1062-
});
1068+
1069+
if !collapsed_any {
1070+
return Ok(None);
10631071
}
1064-
ProjectionExec::try_new(projected_exprs, Arc::clone(child.input()))
1065-
.map(|e| Some(Arc::new(e) as _))
1072+
1073+
// To unify 3 or more sequential projections:
1074+
let unified: Arc<dyn ExecutionPlan> =
1075+
Arc::new(ProjectionExec::try_new(current_exprs, current_input)?);
1076+
remove_unnecessary_projections(unified).data().map(Some)
10661077
}
10671078

10681079
/// Collect all column indices from the given projection expressions.

0 commit comments

Comments
 (0)