Skip to content

Commit ebcee69

Browse files
JanKaulclaude
andcommitted
reorder_join: guard QueryNode::rank against cost=0, improve Aggregate/Inner-Join cardinality
- Mirror the cost==0 guard from PrecedenceTreeNode::rank to QueryNode::rank. Without this, root nodes (which are hardcoded cost=0) combined with a selectivity=1.0 fallback produced (1-1)/0 = NaN, which then panicked inside denormalize's `partial_cmp(...).unwrap()`. Q18 in IN-subquery form hit this and crashed the optimizer. - Port duckdb's ExtractAggregationStats heuristic (relation_statistics_helper.cpp:359-437) into estimate_cardinality::Aggregate: - Ungrouped aggregate → 1 row. - With per-group-key NDV: product * 0.95^(n-1) correction, then the Occupancy-Problem formula `product * (1 - exp(-input/product))`, clamped to `[1, input]`. - Without NDV: fall back to `max(input/2, 1)` instead of `0.1*input`. This makes the aggregate's estimated cardinality reflect the actual number of distinct group keys when they can be sized, instead of passing input rows straight through. - Relax is_group_key matching to compare by column name only. A SubqueryAlias wrapping an aggregate rewrites the relation prefix on the way back up, so strict relation-equality dropped legitimate group-key references (e.g. `t.l_orderkey` failing to match `lineitem.l_orderkey`). - For non-group columns asked of an Aggregate, return the post-aggregate row count as a loose NDV upper bound instead of erroring. The error used to bubble up and force callers (Filter, Projection, SubqueryAlias) into the multi-input catch-all, leaving the join's ndv lookup empty and the selectivity stuck at the 0.1 fallback. - Add an explicit Inner-Join arm to estimate_cardinality. Without it, any caller asking for the cardinality of a join subtree the flattener absorbed as an opaque node (e.g. when a Projection sits between two Inner Joins, as optimize_projections inserts) errored with "Cannot estimate cardinality for plan with multiple inputs", again degrading the cost model to constants. End-to-end on TPC-H Q18 against an iceberg FileCatalog at SF=100: - IN-subquery form (q18.sql, threshold 313) no longer panics. The reorder rule flips the LeftSemi over (customer x lineitem x orders) to a RightSemi with the aggregated subquery on the build side. - CTE form (q18s.sql, threshold 300): the reorder rule swaps the top Inner Join's children so the aggregated CTE is the logical LEFT (intended build) side. The physical join_selection rule may still re-pick based on physical Statistics, which is upstream of this optimizer. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 1340df0 commit ebcee69

2 files changed

Lines changed: 91 additions & 8 deletions

File tree

datafusion/optimizer/src/reorder_join/cost.rs

Lines changed: 86 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -144,24 +144,61 @@ fn estimate_cardinality(plan: &LogicalPlan, column: Option<&Column>) -> Result<f
144144
},
145145
LogicalPlan::Aggregate(agg) => match column {
146146
None => {
147+
// Ungrouped aggregate → exactly 1 row.
148+
if agg.group_expr.is_empty() {
149+
return Ok(1.0);
150+
}
147151
let input = estimate_cardinality(&agg.input, None)?;
148-
Ok(0.1 * input)
152+
// Per-group-key NDV from the child plan, where available.
153+
// Mirrors duckdb's `ExtractAggregationStats`
154+
// (relation_statistics_helper.cpp:380-415): start with the
155+
// product of per-key NDVs, apply a correlation correction,
156+
// then use the Occupancy-Problem formula to estimate the
157+
// number of group-key tuples actually occupied given
158+
// `input` rows.
159+
let ndvs: Vec<f64> = agg
160+
.group_expr
161+
.iter()
162+
.filter_map(|e| match e {
163+
Expr::Column(c) => Some(c),
164+
_ => None,
165+
})
166+
.filter_map(|c| estimate_cardinality(&agg.input, Some(c)).ok())
167+
.map(|n| if n <= 0.0 { 1.0 } else { n })
168+
.collect();
169+
if ndvs.is_empty() || ndvs.len() < agg.group_expr.len() {
170+
// No (or partial) per-key NDV. Half the input is a
171+
// less-pessimistic default than `0.1 * input`, matching
172+
// duckdb's fallback at relation_statistics_helper.cpp:394.
173+
return Ok((input / 2.0).max(1.0));
174+
}
175+
let product: f64 = ndvs.iter().product();
176+
let correction = 0.95_f64.powi((ndvs.len() as i32) - 1);
177+
let product = product * correction;
178+
let mult = 1.0 - (-input / product).exp();
179+
let new_card = if mult == 0.0 { input } else { product * mult };
180+
Ok(new_card.min(input).max(1.0))
149181
}
150182
Some(c) => {
151183
// Group-by keys are unique in the aggregate's output, so
152184
// NDV(group_key) equals the post-aggregate row count.
185+
// Match by column name only — a SubqueryAlias wrapping the
186+
// aggregate rewrites the relation prefix, so a strict
187+
// `relation == relation` comparison would miss legitimate
188+
// group keys.
153189
let is_group_key = agg.group_expr.iter().any(|e| match e {
154-
Expr::Column(g) => g.name == c.name && g.relation == c.relation,
190+
Expr::Column(g) => g.name == c.name,
155191
_ => false,
156192
});
157193
if is_group_key {
158194
estimate_cardinality(plan, None)
159195
} else {
160-
plan_err!(
161-
"Cannot estimate NDV of non-group-by column \
162-
`{}` through Aggregate",
163-
c.name
164-
)
196+
// For non-group columns, the post-aggregate NDV is
197+
// bounded by the row count (most one distinct value per
198+
// output row). Return that as a loose upper bound
199+
// instead of erroring, so callers (e.g.
200+
// `selectivity()`) can still compute a fallback.
201+
estimate_cardinality(plan, None)
165202
}
166203
}
167204
},
@@ -233,6 +270,48 @@ fn estimate_cardinality(plan: &LogicalPlan, column: Option<&Column>) -> Result<f
233270
Some(c) => estimate_cardinality(preserved, Some(c)),
234271
}
235272
}
273+
// Inner joins (and the cross-product, encoded as Inner with empty
274+
// `on`) appear here when an upstream caller asks about a join
275+
// subtree that the flattener absorbed as an opaque graph node
276+
// (e.g. when a projection or other wrapper sits between joins).
277+
// Estimate via the same NDV-of-the-largest-side formula
278+
// `selectivity()` uses for inner equi-joins, falling back to 0.1
279+
// when NDV is unavailable.
280+
LogicalPlan::Join(j) if j.join_type == JoinType::Inner => {
281+
let left_card = estimate_cardinality(&j.left, None)?;
282+
let right_card = estimate_cardinality(&j.right, None)?;
283+
let cross = left_card * right_card;
284+
let sel = if let Some((a, b)) = j.on.first() {
285+
let ndv_max = match (a, b) {
286+
(Expr::Column(ca), Expr::Column(cb)) => {
287+
let na = estimate_cardinality(&j.left, Some(ca))
288+
.ok()
289+
.or_else(|| estimate_cardinality(&j.right, Some(ca)).ok());
290+
let nb = estimate_cardinality(&j.right, Some(cb))
291+
.ok()
292+
.or_else(|| estimate_cardinality(&j.left, Some(cb)).ok());
293+
match (na, nb) {
294+
(Some(x), Some(y)) if x.max(y) > 0.0 => Some(x.max(y)),
295+
_ => None,
296+
}
297+
}
298+
_ => None,
299+
};
300+
ndv_max.map(|n| 1.0 / n).unwrap_or(0.1)
301+
} else {
302+
1.0
303+
};
304+
match column {
305+
None => Ok((sel * cross).max(1.0)),
306+
Some(c) => {
307+
// NDV of a column on the join output is bounded by the
308+
// child-side NDV (joins don't create new distinct values
309+
// for already-existing columns).
310+
estimate_cardinality(&j.left, Some(c))
311+
.or_else(|_| estimate_cardinality(&j.right, Some(c)))
312+
}
313+
}
314+
}
236315
x => {
237316
let inputs = x.inputs();
238317
if inputs.len() == 1 {

datafusion/optimizer/src/reorder_join/left_deep_join_plan.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,11 @@ struct QueryNode {
159159

160160
impl QueryNode {
161161
fn rank(&self) -> f64 {
162-
(self.selectivity - 1.0) / self.cost
162+
if self.cost == 0.0 {
163+
0.0
164+
} else {
165+
(self.selectivity - 1.0) / self.cost
166+
}
163167
}
164168
}
165169

0 commit comments

Comments
 (0)