Skip to content

Commit 989ff3d

Browse files
committed
Merge branch 'temp' into reclaimer-join-reorder
2 parents f625865 + 32d3c8d commit 989ff3d

8 files changed

Lines changed: 2479 additions & 2 deletions

File tree

datafusion/catalog/src/default_table_source.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use std::sync::Arc;
2323
use crate::TableProvider;
2424

2525
use arrow::datatypes::SchemaRef;
26-
use datafusion_common::{Constraints, internal_err};
26+
use datafusion_common::{Constraints, Statistics, internal_err};
2727
use datafusion_expr::{Expr, TableProviderFilterPushDown, TableSource, TableType};
2828

2929
/// Implements [`TableSource`] for a [`TableProvider`]
@@ -77,6 +77,10 @@ impl TableSource for DefaultTableSource {
7777
fn get_column_default(&self, column: &str) -> Option<&Expr> {
7878
self.table_provider.get_column_default(column)
7979
}
80+
81+
fn statistics(&self) -> Option<Statistics> {
82+
self.table_provider.statistics()
83+
}
8084
}
8185

8286
/// Wrap TableProvider in TableSource

datafusion/expr/src/table_source.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
use crate::{Expr, LogicalPlan};
2121

2222
use arrow::datatypes::SchemaRef;
23-
use datafusion_common::{Constraints, Result};
23+
use datafusion_common::{Constraints, Result, Statistics};
2424

2525
use std::{any::Any, borrow::Cow};
2626

@@ -127,6 +127,14 @@ pub trait TableSource: Any + Sync + Send {
127127
fn get_column_default(&self, _column: &str) -> Option<&Expr> {
128128
None
129129
}
130+
131+
/// Get statistics for this table, if available
132+
/// Although not presently used in mainline DataFusion, this allows implementation specific
133+
/// behavior for downstream repositories, in conjunction with specialized optimizer rules to
134+
/// perform operations such as re-ordering of joins.
135+
fn statistics(&self) -> Option<Statistics> {
136+
None
137+
}
130138
}
131139

132140
impl dyn TableSource {

datafusion/optimizer/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ pub mod optimizer;
6565
pub mod propagate_empty_relation;
6666
pub mod push_down_filter;
6767
pub mod push_down_limit;
68+
pub mod reorder_join;
6869
pub mod replace_distinct_aggregate;
6970
pub mod rewrite_set_comparison;
7071
pub mod scalar_subquery_to_join;
Lines changed: 324 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,324 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use datafusion_common::{Column, Result, plan_err, stats::Precision};
19+
use datafusion_expr::{Expr, JoinType, LogicalPlan};
20+
21+
use super::join_graph::Edge;
22+
23+
/// Fraction of preserved-side rows estimated to survive a semi/anti join
24+
/// when column NDV statistics are unavailable. Mirrors DuckDB's
25+
/// `CardinalityEstimator::DEFAULT_SEMI_ANTI_SELECTIVITY = 1/5`.
26+
const DEFAULT_SEMI_ANTI_SELECTIVITY: f64 = 0.2;
27+
28+
pub trait JoinCostEstimator: std::fmt::Debug {
29+
/// Cardinality of `plan`.
30+
///
31+
/// - `column = None`: number of output rows of `plan`.
32+
/// - `column = Some(c)`: number of distinct values of column `c`
33+
/// in `plan`'s output (NDV).
34+
fn cardinality(&self, plan: &LogicalPlan, column: Option<&Column>) -> Option<f64> {
35+
estimate_cardinality(plan, column).ok()
36+
}
37+
38+
/// Estimated selectivity of joining `left` with `right` via `edge`.
39+
///
40+
/// Default: `1 / max(NDV(left.key), NDV(right.key))` for equi-joins
41+
/// (inner and semi/anti) when both NDVs are available; otherwise a
42+
/// per-join-type constant.
43+
fn selectivity(&self, edge: &Edge, left: &LogicalPlan, right: &LogicalPlan) -> f64 {
44+
let fallback = match edge.join_type {
45+
JoinType::Inner => 0.1,
46+
JoinType::LeftSemi
47+
| JoinType::LeftAnti
48+
| JoinType::RightSemi
49+
| JoinType::RightAnti => DEFAULT_SEMI_ANTI_SELECTIVITY,
50+
_ => 1.0,
51+
};
52+
let is_eq_join = matches!(
53+
edge.join_type,
54+
JoinType::Inner
55+
| JoinType::LeftSemi
56+
| JoinType::LeftAnti
57+
| JoinType::RightSemi
58+
| JoinType::RightAnti
59+
);
60+
if !is_eq_join || edge.on.is_empty() {
61+
return fallback;
62+
}
63+
// Use only the first equi-pair. Compounding pairwise selectivities
64+
// under independence assumptions overestimates selectivity when
65+
// composite-key columns are correlated, which is the common case.
66+
let (a, b) = &edge.on[0];
67+
let (Some(col_a), Some(col_b)) = (key_column(a), key_column(b)) else {
68+
return fallback;
69+
};
70+
let ndv_a = ndv_for(self, col_a, left, right);
71+
let ndv_b = ndv_for(self, col_b, left, right);
72+
match edge.join_type {
73+
JoinType::Inner => match (ndv_a, ndv_b) {
74+
(Some(a), Some(b)) if a.max(b) > 0.0 => 1.0 / a.max(b),
75+
_ => fallback,
76+
},
77+
// Semi/anti containment estimator: surviving fraction of the
78+
// preserved side ≈ `min(NDV_preserved, NDV_filtering) / NDV_preserved`.
79+
// Edges normalized by `flatten_joins_recursive` always have
80+
// `on = (preserved_key, filtering_key)`, so the preserved
81+
// NDV is `ndv_a` for Left{Semi,Anti}. RightSemi/RightAnti
82+
// shouldn't appear in graph edges (they get normalized) but
83+
// are handled defensively.
84+
JoinType::LeftSemi | JoinType::LeftAnti => match (ndv_a, ndv_b) {
85+
(Some(a), Some(b)) if a > 0.0 => (a.min(b) / a).min(1.0),
86+
_ => fallback,
87+
},
88+
JoinType::RightSemi | JoinType::RightAnti => match (ndv_a, ndv_b) {
89+
(Some(a), Some(b)) if b > 0.0 => (a.min(b) / b).min(1.0),
90+
_ => fallback,
91+
},
92+
_ => fallback,
93+
}
94+
}
95+
96+
fn cost(&self, selectivity: f64, cardinality: f64) -> f64 {
97+
selectivity * cardinality
98+
}
99+
}
100+
101+
/// Default implementation of JoinCostEstimator
102+
#[derive(Debug, Clone, Copy)]
103+
pub struct DefaultCostEstimator;
104+
105+
impl JoinCostEstimator for DefaultCostEstimator {}
106+
107+
fn key_column(expr: &Expr) -> Option<&Column> {
108+
match expr {
109+
Expr::Column(c) => Some(c),
110+
_ => None,
111+
}
112+
}
113+
114+
/// Look up NDV of `column` on whichever side (left or right) owns it.
115+
fn ndv_for<E: JoinCostEstimator + ?Sized>(
116+
estimator: &E,
117+
column: &Column,
118+
left: &LogicalPlan,
119+
right: &LogicalPlan,
120+
) -> Option<f64> {
121+
if left.schema().has_column(column) {
122+
estimator.cardinality(left, Some(column))
123+
} else if right.schema().has_column(column) {
124+
estimator.cardinality(right, Some(column))
125+
} else {
126+
None
127+
}
128+
}
129+
130+
fn estimate_cardinality(plan: &LogicalPlan, column: Option<&Column>) -> Result<f64> {
131+
match plan {
132+
LogicalPlan::Filter(filter) => match column {
133+
None => {
134+
let input = estimate_cardinality(&filter.input, None)?;
135+
Ok(0.1 * input)
136+
}
137+
Some(c) => {
138+
// NDV is bounded above by the input's NDV and by the
139+
// surviving row count.
140+
let ndv_in = estimate_cardinality(&filter.input, Some(c))?;
141+
let rows_out = estimate_cardinality(plan, None).unwrap_or(ndv_in);
142+
Ok(ndv_in.min(rows_out))
143+
}
144+
},
145+
LogicalPlan::Aggregate(agg) => match column {
146+
None => {
147+
// Ungrouped aggregate → exactly 1 row.
148+
if agg.group_expr.is_empty() {
149+
return Ok(1.0);
150+
}
151+
let input = estimate_cardinality(&agg.input, None)?;
152+
// Per-group-key NDV from the child plan, where available.
153+
// Mirrors duckdb's `ExtractAggregationStats`
154+
// (relation_statistics_helper.cpp:380-415): start with the
155+
// product of per-key NDVs, apply a correlation correction,
156+
// then use the Occupancy-Problem formula to estimate the
157+
// number of group-key tuples actually occupied given
158+
// `input` rows.
159+
let ndvs: Vec<f64> = agg
160+
.group_expr
161+
.iter()
162+
.filter_map(|e| match e {
163+
Expr::Column(c) => Some(c),
164+
_ => None,
165+
})
166+
.filter_map(|c| estimate_cardinality(&agg.input, Some(c)).ok())
167+
.map(|n| if n <= 0.0 { 1.0 } else { n })
168+
.collect();
169+
if ndvs.is_empty() || ndvs.len() < agg.group_expr.len() {
170+
// No (or partial) per-key NDV. Half the input is a
171+
// less-pessimistic default than `0.1 * input`, matching
172+
// duckdb's fallback at relation_statistics_helper.cpp:394.
173+
return Ok((input / 2.0).max(1.0));
174+
}
175+
let product: f64 = ndvs.iter().product();
176+
let correction = 0.95_f64.powi((ndvs.len() as i32) - 1);
177+
let product = product * correction;
178+
let mult = 1.0 - (-input / product).exp();
179+
let new_card = if mult == 0.0 { input } else { product * mult };
180+
Ok(new_card.min(input).max(1.0))
181+
}
182+
Some(c) => {
183+
// Group-by keys are unique in the aggregate's output, so
184+
// NDV(group_key) equals the post-aggregate row count.
185+
// Match by column name only — a SubqueryAlias wrapping the
186+
// aggregate rewrites the relation prefix, so a strict
187+
// `relation == relation` comparison would miss legitimate
188+
// group keys.
189+
let is_group_key = agg.group_expr.iter().any(|e| match e {
190+
Expr::Column(g) => g.name == c.name,
191+
_ => false,
192+
});
193+
if is_group_key {
194+
estimate_cardinality(plan, None)
195+
} else {
196+
// For non-group columns, the post-aggregate NDV is
197+
// bounded by the row count (most one distinct value per
198+
// output row). Return that as a loose upper bound
199+
// instead of erroring, so callers (e.g.
200+
// `selectivity()`) can still compute a fallback.
201+
estimate_cardinality(plan, None)
202+
}
203+
}
204+
},
205+
LogicalPlan::TableScan(scan) => {
206+
let stats = scan.source.statistics().ok_or_else(|| {
207+
datafusion_common::DataFusionError::Plan(format!(
208+
"TableSource for `{}` does not expose statistics",
209+
scan.table_name
210+
))
211+
})?;
212+
match column {
213+
None => match stats.num_rows {
214+
Precision::Exact(n) | Precision::Inexact(n) => Ok(n as f64),
215+
Precision::Absent => plan_err!(
216+
"TableSource for `{}` does not provide a row count",
217+
scan.table_name
218+
),
219+
},
220+
Some(c) => {
221+
// `column_statistics` is indexed by the source schema
222+
// (pre-projection), so resolve the column there.
223+
let idx = scan.source.schema().index_of(&c.name).map_err(|_| {
224+
datafusion_common::DataFusionError::Plan(format!(
225+
"Column `{}` not found in source schema of `{}`",
226+
c.name, scan.table_name
227+
))
228+
})?;
229+
let col_stats =
230+
stats.column_statistics.get(idx).ok_or_else(|| {
231+
datafusion_common::DataFusionError::Plan(format!(
232+
"Column statistics missing for index {idx} \
233+
on `{}`",
234+
scan.table_name
235+
))
236+
})?;
237+
match col_stats.distinct_count {
238+
Precision::Exact(n) | Precision::Inexact(n) => Ok(n as f64),
239+
Precision::Absent => plan_err!(
240+
"Column `{}` on `{}` has no distinct-count statistic",
241+
c.name,
242+
scan.table_name
243+
),
244+
}
245+
}
246+
}
247+
}
248+
// Semi/anti joins do not grow rows: the output cardinality is
249+
// bounded by the preserved side. We size them via the
250+
// `DEFAULT_SEMI_ANTI_SELECTIVITY` heuristic. NDV queries on the
251+
// output route to whichever side is preserved.
252+
LogicalPlan::Join(j)
253+
if matches!(
254+
j.join_type,
255+
JoinType::LeftSemi
256+
| JoinType::LeftAnti
257+
| JoinType::RightSemi
258+
| JoinType::RightAnti
259+
) =>
260+
{
261+
let preserved = match j.join_type {
262+
JoinType::LeftSemi | JoinType::LeftAnti => &j.left,
263+
_ => &j.right,
264+
};
265+
match column {
266+
None => {
267+
let rows = estimate_cardinality(preserved, None)?;
268+
Ok(rows * DEFAULT_SEMI_ANTI_SELECTIVITY)
269+
}
270+
Some(c) => estimate_cardinality(preserved, Some(c)),
271+
}
272+
}
273+
// Inner joins (and the cross-product, encoded as Inner with empty
274+
// `on`) appear here when an upstream caller asks about a join
275+
// subtree that the flattener absorbed as an opaque graph node
276+
// (e.g. when a projection or other wrapper sits between joins).
277+
// Estimate via the same NDV-of-the-largest-side formula
278+
// `selectivity()` uses for inner equi-joins, falling back to 0.1
279+
// when NDV is unavailable.
280+
LogicalPlan::Join(j) if j.join_type == JoinType::Inner => {
281+
let left_card = estimate_cardinality(&j.left, None)?;
282+
let right_card = estimate_cardinality(&j.right, None)?;
283+
let cross = left_card * right_card;
284+
let sel = if let Some((a, b)) = j.on.first() {
285+
let ndv_max = match (a, b) {
286+
(Expr::Column(ca), Expr::Column(cb)) => {
287+
let na = estimate_cardinality(&j.left, Some(ca))
288+
.ok()
289+
.or_else(|| estimate_cardinality(&j.right, Some(ca)).ok());
290+
let nb = estimate_cardinality(&j.right, Some(cb))
291+
.ok()
292+
.or_else(|| estimate_cardinality(&j.left, Some(cb)).ok());
293+
match (na, nb) {
294+
(Some(x), Some(y)) if x.max(y) > 0.0 => Some(x.max(y)),
295+
_ => None,
296+
}
297+
}
298+
_ => None,
299+
};
300+
ndv_max.map(|n| 1.0 / n).unwrap_or(0.1)
301+
} else {
302+
1.0
303+
};
304+
match column {
305+
None => Ok((sel * cross).max(1.0)),
306+
Some(c) => {
307+
// NDV of a column on the join output is bounded by the
308+
// child-side NDV (joins don't create new distinct values
309+
// for already-existing columns).
310+
estimate_cardinality(&j.left, Some(c))
311+
.or_else(|_| estimate_cardinality(&j.right, Some(c)))
312+
}
313+
}
314+
}
315+
x => {
316+
let inputs = x.inputs();
317+
if inputs.len() == 1 {
318+
estimate_cardinality(inputs[0], column)
319+
} else {
320+
plan_err!("Cannot estimate cardinality for plan with multiple inputs")
321+
}
322+
}
323+
}
324+
}

0 commit comments

Comments
 (0)