Skip to content

Commit 9396f6a

Browse files
committed
Add use_statistics_registry config and registry-aware join selection
Adds a pluggable statistics path for JoinSelection that uses the StatisticsRegistry instead of each operator's built-in partition_statistics. - Add optimizer.use_statistics_registry config flag (default=false) - Override optimize_with_context in JoinSelection to pass the registry to should_swap_join_order when the flag is enabled; if no registry is set on SessionState the built-in default is constructed lazily - Add statistics_registry.slt demonstrating how the registry produces more conservative join estimates for skewed data (10*10=100 cartesian fallback vs 10*10/3=33 range-NDV estimate), triggering the correct build-side swap that the built-in estimator misses
1 parent 17b5653 commit 9396f6a

File tree

6 files changed

+387
-134
lines changed

6 files changed

+387
-134
lines changed

datafusion/common/src/config.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1231,6 +1231,14 @@ config_namespace! {
12311231
/// query is used.
12321232
pub join_reordering: bool, default = true
12331233

1234+
/// When set to true, the physical plan optimizer uses the pluggable
1235+
/// [`StatisticsRegistry`] for statistics propagation across operators.
1236+
/// This enables more accurate cardinality estimates compared to each
1237+
/// operator's built-in `partition_statistics`.
1238+
///
1239+
/// [`StatisticsRegistry`]: datafusion_physical_plan::operator_statistics::StatisticsRegistry
1240+
pub use_statistics_registry: bool, default = false
1241+
12341242
/// When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin.
12351243
/// HashJoin can work more efficiently than SortMergeJoin but consumes more memory
12361244
pub prefer_hash_join: bool, default = true

datafusion/physical-optimizer/src/join_selection.rs

Lines changed: 73 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
//! `PartitionMode` and the build side using the available statistics for hash joins.
2525
2626
use crate::PhysicalOptimizerRule;
27+
use crate::optimizer::{ConfigOnlyContext, PhysicalOptimizerContext};
28+
use datafusion_common::Statistics;
2729
use datafusion_common::config::ConfigOptions;
2830
use datafusion_common::error::Result;
2931
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
@@ -37,6 +39,7 @@ use datafusion_physical_plan::joins::{
3739
CrossJoinExec, HashJoinExec, NestedLoopJoinExec, PartitionMode,
3840
StreamJoinPartitionMode, SymmetricHashJoinExec,
3941
};
42+
use datafusion_physical_plan::operator_statistics::StatisticsRegistry;
4043
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
4144
use std::sync::Arc;
4245

@@ -53,36 +56,49 @@ impl JoinSelection {
5356
}
5457
}
5558

59+
/// Get statistics for a plan node, using the registry if available.
60+
fn get_stats(
61+
plan: &dyn ExecutionPlan,
62+
registry: Option<&StatisticsRegistry>,
63+
) -> Result<Arc<Statistics>> {
64+
if let Some(reg) = registry {
65+
reg.compute(plan)
66+
.map(|s| Arc::<Statistics>::clone(s.base_arc()))
67+
} else {
68+
plan.partition_statistics(None)
69+
}
70+
}
71+
5672
// TODO: We need some performance test for Right Semi/Right Join swap to Left Semi/Left Join in case that the right side is smaller but not much smaller.
5773
// TODO: In PrestoSQL, the optimizer flips join sides only if one side is much smaller than the other by more than SIZE_DIFFERENCE_THRESHOLD times, by default is 8 times.
5874
/// Checks whether join inputs should be swapped using available statistics.
5975
///
6076
/// It follows these steps:
61-
/// 1. Compare the in-memory sizes of both sides, and place the smaller side on
77+
/// 1. If a [`StatisticsRegistry`] is provided, use it for cross-operator estimates
78+
/// (e.g., intermediate join outputs that would otherwise have `Absent` statistics).
79+
/// 2. Compare the in-memory sizes of both sides, and place the smaller side on
6280
/// the left (build) side.
63-
/// 2. If in-memory byte sizes are unavailable, fall back to row counts.
64-
/// 3. Do not reorder the join if neither statistic is available, or if
81+
/// 3. If in-memory byte sizes are unavailable, fall back to row counts.
82+
/// 4. Do not reorder the join if neither statistic is available, or if
6583
/// `datafusion.optimizer.join_reordering` is disabled.
6684
///
67-
///
6885
/// Used configurations inside arg `config`
6986
/// - `config.optimizer.join_reordering`: allows or forbids statistics-driven join swapping
7087
pub(crate) fn should_swap_join_order(
7188
left: &dyn ExecutionPlan,
7289
right: &dyn ExecutionPlan,
7390
config: &ConfigOptions,
91+
registry: Option<&StatisticsRegistry>,
7492
) -> Result<bool> {
7593
if !config.optimizer.join_reordering {
7694
return Ok(false);
7795
}
7896

79-
// Get the left and right table's total bytes
80-
// If both the left and right tables contain total_byte_size statistics,
81-
// use `total_byte_size` to determine `should_swap_join_order`, else use `num_rows`
82-
let left_stats = left.partition_statistics(None)?;
83-
let right_stats = right.partition_statistics(None)?;
84-
// First compare `total_byte_size` of left and right side,
85-
// if information in this field is insufficient fallback to the `num_rows`
97+
let left_stats = get_stats(left, registry)?;
98+
let right_stats = get_stats(right, registry)?;
99+
100+
// First compare total_byte_size, then fall back to num_rows if byte
101+
// sizes are unavailable.
86102
match (
87103
left_stats.total_byte_size.get_value(),
88104
right_stats.total_byte_size.get_value(),
@@ -102,8 +118,9 @@ fn supports_collect_by_thresholds(
102118
plan: &dyn ExecutionPlan,
103119
threshold_byte_size: usize,
104120
threshold_num_rows: usize,
121+
registry: Option<&StatisticsRegistry>,
105122
) -> bool {
106-
let Ok(stats) = plan.partition_statistics(None) else {
123+
let Ok(stats) = get_stats(plan, registry) else {
107124
return false;
108125
};
109126

@@ -126,31 +143,38 @@ impl PhysicalOptimizerRule for JoinSelection {
126143
plan: Arc<dyn ExecutionPlan>,
127144
config: &ConfigOptions,
128145
) -> Result<Arc<dyn ExecutionPlan>> {
129-
// First, we make pipeline-fixing modifications to joins so as to accommodate
130-
// unbounded inputs. Each pipeline-fixing subrule, which is a function
131-
// of type `PipelineFixerSubrule`, takes a single [`PipelineStatePropagator`]
132-
// argument storing state variables that indicate the unboundedness status
133-
// of the current [`ExecutionPlan`] as we traverse the plan tree.
146+
self.optimize_with_context(plan, &ConfigOnlyContext::new(config))
147+
}
148+
149+
fn optimize_with_context(
150+
&self,
151+
plan: Arc<dyn ExecutionPlan>,
152+
context: &dyn PhysicalOptimizerContext,
153+
) -> Result<Arc<dyn ExecutionPlan>> {
154+
let config = context.config_options();
155+
let mut default_registry = None;
156+
let registry: Option<&StatisticsRegistry> =
157+
if config.optimizer.use_statistics_registry {
158+
Some(
159+
context.statistics_registry().unwrap_or_else(|| {
160+
default_registry
161+
.insert(StatisticsRegistry::default_with_builtin_providers())
162+
}),
163+
)
164+
} else {
165+
None
166+
};
134167
let subrules: Vec<Box<PipelineFixerSubrule>> = vec![
135168
Box::new(hash_join_convert_symmetric_subrule),
136169
Box::new(hash_join_swap_subrule),
137170
];
138171
let new_plan = plan
139172
.transform_up(|p| apply_subrules(p, &subrules, config))
140173
.data()?;
141-
// Next, we apply another subrule that tries to optimize joins using any
142-
// statistics their inputs might have.
143-
// - For a hash join with partition mode [`PartitionMode::Auto`], we will
144-
// make a cost-based decision to select which `PartitionMode` mode
145-
// (`Partitioned`/`CollectLeft`) is optimal. If the statistics information
146-
// is not available, we will fall back to [`PartitionMode::Partitioned`].
147-
// - We optimize/swap join sides so that the left (build) side of the join
148-
// is the small side. If the statistics information is not available, we
149-
// do not modify join sides.
150-
// - We will also swap left and right sides for cross joins so that the left
151-
// side is the small side.
152174
new_plan
153-
.transform_up(|plan| statistical_join_selection_subrule(plan, config))
175+
.transform_up(|plan| {
176+
statistical_join_selection_subrule(plan, config, registry)
177+
})
154178
.data()
155179
}
156180

@@ -178,6 +202,7 @@ pub(crate) fn try_collect_left(
178202
hash_join: &HashJoinExec,
179203
ignore_threshold: bool,
180204
config: &ConfigOptions,
205+
registry: Option<&StatisticsRegistry>,
181206
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
182207
let left = hash_join.left();
183208
let right = hash_join.right();
@@ -188,20 +213,22 @@ pub(crate) fn try_collect_left(
188213
&**left,
189214
optimizer_config.hash_join_single_partition_threshold,
190215
optimizer_config.hash_join_single_partition_threshold_rows,
216+
registry,
191217
);
192218
let right_can_collect = ignore_threshold
193219
|| supports_collect_by_thresholds(
194220
&**right,
195221
optimizer_config.hash_join_single_partition_threshold,
196222
optimizer_config.hash_join_single_partition_threshold_rows,
223+
registry,
197224
);
198225

199226
match (left_can_collect, right_can_collect) {
200227
(true, true) => {
201228
// Don't swap null-aware anti joins as they have specific side requirements
202229
if hash_join.join_type().supports_swap()
203230
&& !hash_join.null_aware
204-
&& should_swap_join_order(&**left, &**right, config)?
231+
&& should_swap_join_order(&**left, &**right, config, registry)?
205232
{
206233
Ok(Some(hash_join.swap_inputs(PartitionMode::CollectLeft)?))
207234
} else {
@@ -245,13 +272,14 @@ pub(crate) fn try_collect_left(
245272
pub(crate) fn partitioned_hash_join(
246273
hash_join: &HashJoinExec,
247274
config: &ConfigOptions,
275+
registry: Option<&StatisticsRegistry>,
248276
) -> Result<Arc<dyn ExecutionPlan>> {
249277
let left = hash_join.left();
250278
let right = hash_join.right();
251279
// Don't swap null-aware anti joins as they have specific side requirements
252280
if hash_join.join_type().supports_swap()
253281
&& !hash_join.null_aware
254-
&& should_swap_join_order(&**left, &**right, config)?
282+
&& should_swap_join_order(&**left, &**right, config, registry)?
255283
{
256284
hash_join.swap_inputs(PartitionMode::Partitioned)
257285
} else {
@@ -285,26 +313,27 @@ pub(crate) fn partitioned_hash_join(
285313
fn statistical_join_selection_subrule(
286314
plan: Arc<dyn ExecutionPlan>,
287315
config: &ConfigOptions,
316+
registry: Option<&StatisticsRegistry>,
288317
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
289318
let transformed = if let Some(hash_join) = plan.downcast_ref::<HashJoinExec>() {
290319
match hash_join.partition_mode() {
291-
PartitionMode::Auto => try_collect_left(hash_join, false, config)?
292-
.map_or_else(
293-
|| partitioned_hash_join(hash_join, config).map(Some),
294-
|v| Ok(Some(v)),
295-
)?,
296-
PartitionMode::CollectLeft => try_collect_left(hash_join, true, config)?
297-
.map_or_else(
298-
|| partitioned_hash_join(hash_join, config).map(Some),
299-
|v| Ok(Some(v)),
300-
)?,
320+
PartitionMode::Auto => try_collect_left(hash_join, false, config, registry)?
321+
.map_or_else(
322+
|| partitioned_hash_join(hash_join, config, registry).map(Some),
323+
|v| Ok(Some(v)),
324+
)?,
325+
PartitionMode::CollectLeft => try_collect_left(hash_join, true, config, registry)?
326+
.map_or_else(
327+
|| partitioned_hash_join(hash_join, config, registry).map(Some),
328+
|v| Ok(Some(v)),
329+
)?,
301330
PartitionMode::Partitioned => {
302331
let left = hash_join.left();
303332
let right = hash_join.right();
304333
// Don't swap null-aware anti joins as they have specific side requirements
305334
if hash_join.join_type().supports_swap()
306335
&& !hash_join.null_aware
307-
&& should_swap_join_order(&**left, &**right, config)?
336+
&& should_swap_join_order(&**left, &**right, config, registry)?
308337
{
309338
hash_join
310339
.swap_inputs(PartitionMode::Partitioned)
@@ -317,7 +346,7 @@ fn statistical_join_selection_subrule(
317346
} else if let Some(cross_join) = plan.downcast_ref::<CrossJoinExec>() {
318347
let left = cross_join.left();
319348
let right = cross_join.right();
320-
if should_swap_join_order(&**left, &**right, config)? {
349+
if should_swap_join_order(&**left, &**right, config, registry)? {
321350
cross_join.swap_inputs().map(Some)?
322351
} else {
323352
None
@@ -326,7 +355,7 @@ fn statistical_join_selection_subrule(
326355
let left = nl_join.left();
327356
let right = nl_join.right();
328357
if nl_join.join_type().supports_swap()
329-
&& should_swap_join_order(&**left, &**right, config)?
358+
&& should_swap_join_order(&**left, &**right, config, registry)?
330359
{
331360
nl_join.swap_inputs().map(Some)?
332361
} else {

0 commit comments

Comments
 (0)