Skip to content

Commit 4043d86

Browse files
committed
add statistics
1 parent 3cea38b commit 4043d86

4 files changed

Lines changed: 153 additions & 28 deletions

File tree

datafusion/catalog/src/default_table_source.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use std::sync::Arc;
2323
use crate::TableProvider;
2424

2525
use arrow::datatypes::SchemaRef;
26-
use datafusion_common::{Constraints, internal_err};
26+
use datafusion_common::{Constraints, Statistics, internal_err};
2727
use datafusion_expr::{Expr, TableProviderFilterPushDown, TableSource, TableType};
2828

2929
/// Implements [`TableSource`] for a [`TableProvider`]
@@ -77,6 +77,10 @@ impl TableSource for DefaultTableSource {
7777
fn get_column_default(&self, column: &str) -> Option<&Expr> {
7878
self.table_provider.get_column_default(column)
7979
}
80+
81+
fn statistics(&self) -> Option<Statistics> {
82+
self.table_provider.statistics()
83+
}
8084
}
8185

8286
/// Wrap TableProvider in TableSource

datafusion/expr/src/table_source.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
use crate::{Expr, LogicalPlan};
2121

2222
use arrow::datatypes::SchemaRef;
23-
use datafusion_common::{Constraints, Result};
23+
use datafusion_common::{Constraints, Result, Statistics};
2424

2525
use std::{any::Any, borrow::Cow};
2626

@@ -127,6 +127,14 @@ pub trait TableSource: Any + Sync + Send {
127127
fn get_column_default(&self, _column: &str) -> Option<&Expr> {
128128
None
129129
}
130+
131+
/// Get statistics for this table, if available
132+
/// Although not presently used in mainline DataFusion, this allows implementation specific
133+
/// behavior for downstream repositories, in conjunction with specialized optimizer rules to
134+
/// perform operations such as re-ordering of joins.
135+
fn statistics(&self) -> Option<Statistics> {
136+
None
137+
}
130138
}
131139

132140
impl dyn TableSource {

datafusion/optimizer/src/reorder_join/cost.rs

Lines changed: 134 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,45 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use datafusion_common::{Result, plan_err};
19-
use datafusion_expr::{JoinType, LogicalPlan};
18+
use datafusion_common::{Column, Result, plan_err, stats::Precision};
19+
use datafusion_expr::{Expr, JoinType, LogicalPlan};
2020

2121
use super::join_graph::Edge;
2222

2323
pub trait JoinCostEstimator: std::fmt::Debug {
24-
fn cardinality(&self, plan: &LogicalPlan) -> Option<f64> {
25-
estimate_cardinality(plan).ok()
24+
/// Cardinality of `plan`.
25+
///
26+
/// - `column = None`: number of output rows of `plan`.
27+
/// - `column = Some(c)`: number of distinct values of column `c`
28+
/// in `plan`'s output (NDV).
29+
fn cardinality(&self, plan: &LogicalPlan, column: Option<&Column>) -> Option<f64> {
30+
estimate_cardinality(plan, column).ok()
2631
}
2732

28-
fn selectivity(&self, edge: &Edge) -> f64 {
29-
match edge.join_type {
33+
/// Estimated selectivity of joining `left` with `right` via `edge`.
34+
///
35+
/// Default: `1 / max(NDV(left.key), NDV(right.key))` for inner equi-joins
36+
/// when both NDVs are available; otherwise a per-join-type constant.
37+
fn selectivity(&self, edge: &Edge, left: &LogicalPlan, right: &LogicalPlan) -> f64 {
38+
let fallback = match edge.join_type {
3039
JoinType::Inner => 0.1,
3140
_ => 1.0,
41+
};
42+
if edge.join_type != JoinType::Inner || edge.on.is_empty() {
43+
return fallback;
44+
}
45+
// Use only the first equi-pair. Compounding pairwise selectivities
46+
// under independence assumptions overestimates selectivity when
47+
// composite-key columns are correlated, which is the common case.
48+
let (a, b) = &edge.on[0];
49+
let (Some(col_a), Some(col_b)) = (key_column(a), key_column(b)) else {
50+
return fallback;
51+
};
52+
let ndv_a = ndv_for(self, col_a, left, right);
53+
let ndv_b = ndv_for(self, col_b, left, right);
54+
match (ndv_a, ndv_b) {
55+
(Some(a), Some(b)) if a.max(b) > 0.0 => 1.0 / a.max(b),
56+
_ => fallback,
3257
}
3358
}
3459

@@ -43,29 +68,114 @@ pub struct DefaultCostEstimator;
4368

4469
impl JoinCostEstimator for DefaultCostEstimator {}
4570

46-
fn estimate_cardinality(plan: &LogicalPlan) -> Result<f64> {
71+
fn key_column(expr: &Expr) -> Option<&Column> {
72+
match expr {
73+
Expr::Column(c) => Some(c),
74+
_ => None,
75+
}
76+
}
77+
78+
/// Look up NDV of `column` on whichever side (left or right) owns it.
79+
fn ndv_for<E: JoinCostEstimator + ?Sized>(
80+
estimator: &E,
81+
column: &Column,
82+
left: &LogicalPlan,
83+
right: &LogicalPlan,
84+
) -> Option<f64> {
85+
if left.schema().has_column(column) {
86+
estimator.cardinality(left, Some(column))
87+
} else if right.schema().has_column(column) {
88+
estimator.cardinality(right, Some(column))
89+
} else {
90+
None
91+
}
92+
}
93+
94+
fn estimate_cardinality(plan: &LogicalPlan, column: Option<&Column>) -> Result<f64> {
4795
match plan {
48-
LogicalPlan::Filter(filter) => {
49-
let input_cardinality = estimate_cardinality(&filter.input)?;
50-
Ok(0.1 * input_cardinality)
51-
}
52-
LogicalPlan::Aggregate(agg) => {
53-
let input_cardinality = estimate_cardinality(&agg.input)?;
54-
Ok(0.1 * input_cardinality)
55-
}
56-
LogicalPlan::TableScan(_) => {
57-
// The logical-plan-level `TableSource` trait doesn't expose row
58-
// statistics. To use cardinalities for base relations, override
59-
// `JoinCostEstimator::cardinality`.
60-
plan_err!(
61-
"Default JoinCostEstimator cannot size TableScan; \
62-
override `cardinality` to provide statistics"
63-
)
96+
LogicalPlan::Filter(filter) => match column {
97+
None => {
98+
let input = estimate_cardinality(&filter.input, None)?;
99+
Ok(0.1 * input)
100+
}
101+
Some(c) => {
102+
// NDV is bounded above by the input's NDV and by the
103+
// surviving row count.
104+
let ndv_in = estimate_cardinality(&filter.input, Some(c))?;
105+
let rows_out = estimate_cardinality(plan, None).unwrap_or(ndv_in);
106+
Ok(ndv_in.min(rows_out))
107+
}
108+
},
109+
LogicalPlan::Aggregate(agg) => match column {
110+
None => {
111+
let input = estimate_cardinality(&agg.input, None)?;
112+
Ok(0.1 * input)
113+
}
114+
Some(c) => {
115+
// Group-by keys are unique in the aggregate's output, so
116+
// NDV(group_key) equals the post-aggregate row count.
117+
let is_group_key = agg.group_expr.iter().any(|e| match e {
118+
Expr::Column(g) => g.name == c.name && g.relation == c.relation,
119+
_ => false,
120+
});
121+
if is_group_key {
122+
estimate_cardinality(plan, None)
123+
} else {
124+
plan_err!(
125+
"Cannot estimate NDV of non-group-by column \
126+
`{}` through Aggregate",
127+
c.name
128+
)
129+
}
130+
}
131+
},
132+
LogicalPlan::TableScan(scan) => {
133+
let stats = scan.source.statistics().ok_or_else(|| {
134+
datafusion_common::DataFusionError::Plan(format!(
135+
"TableSource for `{}` does not expose statistics",
136+
scan.table_name
137+
))
138+
})?;
139+
match column {
140+
None => match stats.num_rows {
141+
Precision::Exact(n) | Precision::Inexact(n) => Ok(n as f64),
142+
Precision::Absent => plan_err!(
143+
"TableSource for `{}` does not provide a row count",
144+
scan.table_name
145+
),
146+
},
147+
Some(c) => {
148+
// `column_statistics` is indexed by the source schema
149+
// (pre-projection), so resolve the column there.
150+
let idx = scan.source.schema().index_of(&c.name).map_err(|_| {
151+
datafusion_common::DataFusionError::Plan(format!(
152+
"Column `{}` not found in source schema of `{}`",
153+
c.name, scan.table_name
154+
))
155+
})?;
156+
let col_stats =
157+
stats.column_statistics.get(idx).ok_or_else(|| {
158+
datafusion_common::DataFusionError::Plan(format!(
159+
"Column statistics missing for index {idx} \
160+
on `{}`",
161+
scan.table_name
162+
))
163+
})?;
164+
match col_stats.distinct_count {
165+
Precision::Exact(n) | Precision::Inexact(n) => Ok(n as f64),
166+
Precision::Absent => plan_err!(
167+
"Column `{}` on `{}` has no distinct-count statistic",
168+
c.name,
169+
scan.table_name
170+
),
171+
}
172+
}
173+
}
64174
}
65175
x => {
66176
let inputs = x.inputs();
67177
if inputs.len() == 1 {
68-
estimate_cardinality(inputs[0])
178+
estimate_cardinality(inputs[0], column)
69179
} else {
70180
plan_err!("Cannot estimate cardinality for plan with multiple inputs")
71181
}

datafusion/optimizer/src/reorder_join/left_deep_join_plan.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,8 @@ impl<'graph> PrecedenceTreeNode<'graph> {
207207
let node = query_graph
208208
.get_node(node_id)
209209
.ok_or_else(|| plan_datafusion_err!("Root node not found"))?;
210-
let input_cardinality = cost_estimator.cardinality(&node.plan).unwrap_or(1.0);
210+
let input_cardinality =
211+
cost_estimator.cardinality(&node.plan, None).unwrap_or(1.0);
211212

212213
let children = node
213214
.connections()
@@ -220,7 +221,9 @@ impl<'graph> PrecedenceTreeNode<'graph> {
220221
.find(|x| *x != node_id && remaining.contains(x))?;
221222

222223
remaining.remove(&other);
223-
let child_selectivity = cost_estimator.selectivity(edge);
224+
let other_plan = &query_graph.get_node(other)?.plan;
225+
let child_selectivity =
226+
cost_estimator.selectivity(edge, &node.plan, other_plan);
224227
Some(PrecedenceTreeNode::from_query_node(
225228
other,
226229
child_selectivity,

0 commit comments

Comments
 (0)