Skip to content

Commit 56a2a7c

Browse files
committed
Merge branch 'temp' into reclaimer-join-reorder
2 parents f625865 + a40a555 commit 56a2a7c

7 files changed

Lines changed: 2092 additions & 2 deletions

File tree

datafusion/catalog/src/default_table_source.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use std::sync::Arc;
2323
use crate::TableProvider;
2424

2525
use arrow::datatypes::SchemaRef;
26-
use datafusion_common::{Constraints, internal_err};
26+
use datafusion_common::{Constraints, Statistics, internal_err};
2727
use datafusion_expr::{Expr, TableProviderFilterPushDown, TableSource, TableType};
2828

2929
/// Implements [`TableSource`] for a [`TableProvider`]
@@ -77,6 +77,10 @@ impl TableSource for DefaultTableSource {
7777
fn get_column_default(&self, column: &str) -> Option<&Expr> {
7878
self.table_provider.get_column_default(column)
7979
}
80+
81+
fn statistics(&self) -> Option<Statistics> {
82+
self.table_provider.statistics()
83+
}
8084
}
8185

8286
/// Wrap TableProvider in TableSource

datafusion/expr/src/table_source.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
use crate::{Expr, LogicalPlan};
2121

2222
use arrow::datatypes::SchemaRef;
23-
use datafusion_common::{Constraints, Result};
23+
use datafusion_common::{Constraints, Result, Statistics};
2424

2525
use std::{any::Any, borrow::Cow};
2626

@@ -127,6 +127,14 @@ pub trait TableSource: Any + Sync + Send {
127127
fn get_column_default(&self, _column: &str) -> Option<&Expr> {
128128
None
129129
}
130+
131+
/// Get statistics for this table, if available
132+
/// Although not presently used in mainline DataFusion, this allows implementation specific
133+
/// behavior for downstream repositories, in conjunction with specialized optimizer rules to
134+
/// perform operations such as re-ordering of joins.
135+
fn statistics(&self) -> Option<Statistics> {
136+
None
137+
}
130138
}
131139

132140
impl dyn TableSource {

datafusion/optimizer/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ pub mod optimizer;
6565
pub mod propagate_empty_relation;
6666
pub mod push_down_filter;
6767
pub mod push_down_limit;
68+
pub mod reorder_join;
6869
pub mod replace_distinct_aggregate;
6970
pub mod rewrite_set_comparison;
7071
pub mod scalar_subquery_to_join;
Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use datafusion_common::{Column, Result, plan_err, stats::Precision};
19+
use datafusion_expr::{Expr, JoinType, LogicalPlan};
20+
21+
use super::join_graph::Edge;
22+
23+
/// Fraction of preserved-side rows estimated to survive a semi/anti join
24+
/// when column NDV statistics are unavailable. Mirrors DuckDB's
25+
/// `CardinalityEstimator::DEFAULT_SEMI_ANTI_SELECTIVITY = 1/5`.
26+
const DEFAULT_SEMI_ANTI_SELECTIVITY: f64 = 0.2;
27+
28+
pub trait JoinCostEstimator: std::fmt::Debug {
29+
/// Cardinality of `plan`.
30+
///
31+
/// - `column = None`: number of output rows of `plan`.
32+
/// - `column = Some(c)`: number of distinct values of column `c`
33+
/// in `plan`'s output (NDV).
34+
fn cardinality(&self, plan: &LogicalPlan, column: Option<&Column>) -> Option<f64> {
35+
estimate_cardinality(plan, column).ok()
36+
}
37+
38+
/// Estimated selectivity of joining `left` with `right` via `edge`.
39+
///
40+
/// Default: `1 / max(NDV(left.key), NDV(right.key))` for equi-joins
41+
/// (inner and semi/anti) when both NDVs are available; otherwise a
42+
/// per-join-type constant.
43+
fn selectivity(&self, edge: &Edge, left: &LogicalPlan, right: &LogicalPlan) -> f64 {
44+
let fallback = match edge.join_type {
45+
JoinType::Inner => 0.1,
46+
JoinType::LeftSemi
47+
| JoinType::LeftAnti
48+
| JoinType::RightSemi
49+
| JoinType::RightAnti => DEFAULT_SEMI_ANTI_SELECTIVITY,
50+
_ => 1.0,
51+
};
52+
let is_eq_join = matches!(
53+
edge.join_type,
54+
JoinType::Inner
55+
| JoinType::LeftSemi
56+
| JoinType::LeftAnti
57+
| JoinType::RightSemi
58+
| JoinType::RightAnti
59+
);
60+
if !is_eq_join || edge.on.is_empty() {
61+
return fallback;
62+
}
63+
// Use only the first equi-pair. Compounding pairwise selectivities
64+
// under independence assumptions overestimates selectivity when
65+
// composite-key columns are correlated, which is the common case.
66+
let (a, b) = &edge.on[0];
67+
let (Some(col_a), Some(col_b)) = (key_column(a), key_column(b)) else {
68+
return fallback;
69+
};
70+
let ndv_a = ndv_for(self, col_a, left, right);
71+
let ndv_b = ndv_for(self, col_b, left, right);
72+
match edge.join_type {
73+
JoinType::Inner => match (ndv_a, ndv_b) {
74+
(Some(a), Some(b)) if a.max(b) > 0.0 => 1.0 / a.max(b),
75+
_ => fallback,
76+
},
77+
// Semi/anti containment estimator: surviving fraction of the
78+
// preserved side ≈ `min(NDV_preserved, NDV_filtering) / NDV_preserved`.
79+
// Edges normalized by `flatten_joins_recursive` always have
80+
// `on = (preserved_key, filtering_key)`, so the preserved
81+
// NDV is `ndv_a` for Left{Semi,Anti}. RightSemi/RightAnti
82+
// shouldn't appear in graph edges (they get normalized) but
83+
// are handled defensively.
84+
JoinType::LeftSemi | JoinType::LeftAnti => match (ndv_a, ndv_b) {
85+
(Some(a), Some(b)) if a > 0.0 => (a.min(b) / a).min(1.0),
86+
_ => fallback,
87+
},
88+
JoinType::RightSemi | JoinType::RightAnti => match (ndv_a, ndv_b) {
89+
(Some(a), Some(b)) if b > 0.0 => (a.min(b) / b).min(1.0),
90+
_ => fallback,
91+
},
92+
_ => fallback,
93+
}
94+
}
95+
96+
fn cost(&self, selectivity: f64, cardinality: f64) -> f64 {
97+
selectivity * cardinality
98+
}
99+
}
100+
101+
/// Default implementation of JoinCostEstimator
102+
#[derive(Debug, Clone, Copy)]
103+
pub struct DefaultCostEstimator;
104+
105+
impl JoinCostEstimator for DefaultCostEstimator {}
106+
107+
fn key_column(expr: &Expr) -> Option<&Column> {
108+
match expr {
109+
Expr::Column(c) => Some(c),
110+
_ => None,
111+
}
112+
}
113+
114+
/// Look up NDV of `column` on whichever side (left or right) owns it.
115+
fn ndv_for<E: JoinCostEstimator + ?Sized>(
116+
estimator: &E,
117+
column: &Column,
118+
left: &LogicalPlan,
119+
right: &LogicalPlan,
120+
) -> Option<f64> {
121+
if left.schema().has_column(column) {
122+
estimator.cardinality(left, Some(column))
123+
} else if right.schema().has_column(column) {
124+
estimator.cardinality(right, Some(column))
125+
} else {
126+
None
127+
}
128+
}
129+
130+
fn estimate_cardinality(plan: &LogicalPlan, column: Option<&Column>) -> Result<f64> {
131+
match plan {
132+
LogicalPlan::Filter(filter) => match column {
133+
None => {
134+
let input = estimate_cardinality(&filter.input, None)?;
135+
Ok(0.1 * input)
136+
}
137+
Some(c) => {
138+
// NDV is bounded above by the input's NDV and by the
139+
// surviving row count.
140+
let ndv_in = estimate_cardinality(&filter.input, Some(c))?;
141+
let rows_out = estimate_cardinality(plan, None).unwrap_or(ndv_in);
142+
Ok(ndv_in.min(rows_out))
143+
}
144+
},
145+
LogicalPlan::Aggregate(agg) => match column {
146+
None => {
147+
let input = estimate_cardinality(&agg.input, None)?;
148+
Ok(0.1 * input)
149+
}
150+
Some(c) => {
151+
// Group-by keys are unique in the aggregate's output, so
152+
// NDV(group_key) equals the post-aggregate row count.
153+
let is_group_key = agg.group_expr.iter().any(|e| match e {
154+
Expr::Column(g) => g.name == c.name && g.relation == c.relation,
155+
_ => false,
156+
});
157+
if is_group_key {
158+
estimate_cardinality(plan, None)
159+
} else {
160+
plan_err!(
161+
"Cannot estimate NDV of non-group-by column \
162+
`{}` through Aggregate",
163+
c.name
164+
)
165+
}
166+
}
167+
},
168+
LogicalPlan::TableScan(scan) => {
169+
let stats = scan.source.statistics().ok_or_else(|| {
170+
datafusion_common::DataFusionError::Plan(format!(
171+
"TableSource for `{}` does not expose statistics",
172+
scan.table_name
173+
))
174+
})?;
175+
match column {
176+
None => match stats.num_rows {
177+
Precision::Exact(n) | Precision::Inexact(n) => Ok(n as f64),
178+
Precision::Absent => plan_err!(
179+
"TableSource for `{}` does not provide a row count",
180+
scan.table_name
181+
),
182+
},
183+
Some(c) => {
184+
// `column_statistics` is indexed by the source schema
185+
// (pre-projection), so resolve the column there.
186+
let idx = scan.source.schema().index_of(&c.name).map_err(|_| {
187+
datafusion_common::DataFusionError::Plan(format!(
188+
"Column `{}` not found in source schema of `{}`",
189+
c.name, scan.table_name
190+
))
191+
})?;
192+
let col_stats =
193+
stats.column_statistics.get(idx).ok_or_else(|| {
194+
datafusion_common::DataFusionError::Plan(format!(
195+
"Column statistics missing for index {idx} \
196+
on `{}`",
197+
scan.table_name
198+
))
199+
})?;
200+
match col_stats.distinct_count {
201+
Precision::Exact(n) | Precision::Inexact(n) => Ok(n as f64),
202+
Precision::Absent => plan_err!(
203+
"Column `{}` on `{}` has no distinct-count statistic",
204+
c.name,
205+
scan.table_name
206+
),
207+
}
208+
}
209+
}
210+
}
211+
// Semi/anti joins do not grow rows: the output cardinality is
212+
// bounded by the preserved side. We size them via the
213+
// `DEFAULT_SEMI_ANTI_SELECTIVITY` heuristic. NDV queries on the
214+
// output route to whichever side is preserved.
215+
LogicalPlan::Join(j)
216+
if matches!(
217+
j.join_type,
218+
JoinType::LeftSemi
219+
| JoinType::LeftAnti
220+
| JoinType::RightSemi
221+
| JoinType::RightAnti
222+
) =>
223+
{
224+
let preserved = match j.join_type {
225+
JoinType::LeftSemi | JoinType::LeftAnti => &j.left,
226+
_ => &j.right,
227+
};
228+
match column {
229+
None => {
230+
let rows = estimate_cardinality(preserved, None)?;
231+
Ok(rows * DEFAULT_SEMI_ANTI_SELECTIVITY)
232+
}
233+
Some(c) => estimate_cardinality(preserved, Some(c)),
234+
}
235+
}
236+
x => {
237+
let inputs = x.inputs();
238+
if inputs.len() == 1 {
239+
estimate_cardinality(inputs[0], column)
240+
} else {
241+
plan_err!("Cannot estimate cardinality for plan with multiple inputs")
242+
}
243+
}
244+
}
245+
}

0 commit comments

Comments
 (0)