Skip to content

Commit a38d3ae

Browse files
committed
Add use_statistics_registry config and registry-aware join selection
Adds a pluggable statistics path for JoinSelection that uses the StatisticsRegistry instead of each operator's built-in partition_statistics. - Add optimizer.use_statistics_registry config flag (default=false) - Always initialize StatisticsRegistry with built-in providers in SessionStateBuilder, so SET via SQL takes effect without rebuild - Override optimize_with_context in JoinSelection to pass the registry to should_swap_join_order when the flag is enabled - Add statistics_registry.slt demonstrating how the registry produces more conservative join estimates for skewed data (10*10=100 cartesian fallback vs 10*10/3=33 range-NDV estimate), triggering the correct build-side swap that the built-in estimator misses
1 parent af31fab commit a38d3ae

7 files changed

Lines changed: 348 additions & 32 deletions

File tree

datafusion/common/src/config.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1100,6 +1100,14 @@ config_namespace! {
11001100
/// query is used.
11011101
pub join_reordering: bool, default = true
11021102

1103+
/// When set to true, the physical plan optimizer uses the pluggable
1104+
/// [`StatisticsRegistry`] for statistics propagation across operators.
1105+
/// This enables more accurate cardinality estimates compared to each
1106+
/// operator's built-in `partition_statistics`.
1107+
///
1108+
/// [`StatisticsRegistry`]: datafusion_physical_plan::operator_statistics::StatisticsRegistry
1109+
pub use_statistics_registry: bool, default = false
1110+
11031111
/// When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin.
11041112
/// HashJoin can work more efficiently than SortMergeJoin but consumes more memory
11051113
pub prefer_hash_join: bool, default = true

datafusion/core/src/execution/session_state.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1598,7 +1598,11 @@ impl SessionStateBuilder {
15981598
runtime_env,
15991599
function_factory,
16001600
cache_factory,
1601-
statistics_registry,
1601+
// Always build a default registry with built-in providers when no custom
1602+
// registry was explicitly set; it is only consulted when
1603+
// use_statistics_registry = true at optimization time.
1604+
statistics_registry: statistics_registry
1605+
.or_else(|| Some(StatisticsRegistry::default_with_builtin_providers())),
16021606
prepared_plans: HashMap::new(),
16031607
};
16041608

datafusion/physical-optimizer/src/join_selection.rs

Lines changed: 86 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@
2323
//! pipeline-friendly ones. To achieve the second goal, it selects the proper
2424
//! `PartitionMode` and the build side using the available statistics for hash joins.
2525
26+
use crate::optimizer::PhysicalOptimizerContext;
2627
use crate::PhysicalOptimizerRule;
2728
use datafusion_common::config::ConfigOptions;
2829
use datafusion_common::error::Result;
30+
use datafusion_common::Statistics;
2931
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
3032
use datafusion_common::{JoinSide, JoinType, internal_err};
3133
use datafusion_expr_common::sort_properties::SortProperties;
@@ -37,6 +39,7 @@ use datafusion_physical_plan::joins::{
3739
CrossJoinExec, HashJoinExec, NestedLoopJoinExec, PartitionMode,
3840
StreamJoinPartitionMode, SymmetricHashJoinExec,
3941
};
42+
use datafusion_physical_plan::operator_statistics::{StatsCache, StatisticsRegistry};
4043
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
4144
use std::sync::Arc;
4245

@@ -53,36 +56,50 @@ impl JoinSelection {
5356
}
5457
}
5558

59+
/// Get statistics for a plan node, using the registry if available.
60+
fn get_stats(
61+
plan: &dyn ExecutionPlan,
62+
registry: Option<&StatisticsRegistry>,
63+
cache: &mut StatsCache,
64+
) -> Result<Arc<Statistics>> {
65+
if let Some(reg) = registry {
66+
reg.compute_cached(plan, cache).map(|s| s.base_arc().clone())
67+
} else {
68+
plan.partition_statistics(None)
69+
}
70+
}
71+
5672
// TODO: We need some performance test for Right Semi/Right Join swap to Left Semi/Left Join in case that the right side is smaller but not much smaller.
5773
// TODO: In PrestoSQL, the optimizer flips join sides only if one side is much smaller than the other by more than SIZE_DIFFERENCE_THRESHOLD times, by default is 8 times.
5874
/// Checks whether join inputs should be swapped using available statistics.
5975
///
6076
/// It follows these steps:
61-
/// 1. Compare the in-memory sizes of both sides, and place the smaller side on
77+
/// 1. If a [`StatisticsRegistry`] is provided, use it for cross-operator estimates
78+
/// (e.g., intermediate join outputs that would otherwise have `Absent` statistics).
79+
/// 2. Compare the in-memory sizes of both sides, and place the smaller side on
6280
/// the left (build) side.
63-
/// 2. If in-memory byte sizes are unavailable, fall back to row counts.
64-
/// 3. Do not reorder the join if neither statistic is available, or if
81+
/// 3. If in-memory byte sizes are unavailable, fall back to row counts.
82+
/// 4. Do not reorder the join if neither statistic is available, or if
6583
/// `datafusion.optimizer.join_reordering` is disabled.
6684
///
67-
///
6885
/// Used configurations inside arg `config`
6986
/// - `config.optimizer.join_reordering`: allows or forbids statistics-driven join swapping
7087
pub(crate) fn should_swap_join_order(
7188
left: &dyn ExecutionPlan,
7289
right: &dyn ExecutionPlan,
7390
config: &ConfigOptions,
91+
registry: Option<&StatisticsRegistry>,
92+
cache: &mut StatsCache,
7493
) -> Result<bool> {
7594
if !config.optimizer.join_reordering {
7695
return Ok(false);
7796
}
7897

79-
// Get the left and right table's total bytes
80-
// If both the left and right tables contain total_byte_size statistics,
81-
// use `total_byte_size` to determine `should_swap_join_order`, else use `num_rows`
82-
let left_stats = left.partition_statistics(None)?;
83-
let right_stats = right.partition_statistics(None)?;
84-
// First compare `total_byte_size` of left and right side,
85-
// if information in this field is insufficient fallback to the `num_rows`
98+
let left_stats = get_stats(left, registry, cache)?;
99+
let right_stats = get_stats(right, registry, cache)?;
100+
101+
// First compare total_byte_size, then fall back to num_rows if byte
102+
// sizes are unavailable.
86103
match (
87104
left_stats.total_byte_size.get_value(),
88105
right_stats.total_byte_size.get_value(),
@@ -102,8 +119,10 @@ fn supports_collect_by_thresholds(
102119
plan: &dyn ExecutionPlan,
103120
threshold_byte_size: usize,
104121
threshold_num_rows: usize,
122+
registry: Option<&StatisticsRegistry>,
123+
cache: &mut StatsCache,
105124
) -> bool {
106-
let Ok(stats) = plan.partition_statistics(None) else {
125+
let Ok(stats) = get_stats(plan, registry, cache) else {
107126
return false;
108127
};
109128

@@ -149,8 +168,38 @@ impl PhysicalOptimizerRule for JoinSelection {
149168
// do not modify join sides.
150169
// - We will also swap left and right sides for cross joins so that the left
151170
// side is the small side.
171+
let mut cache = StatsCache::new();
172+
new_plan
173+
.transform_up(|plan| statistical_join_selection_subrule(plan, config, None, &mut cache))
174+
.data()
175+
}
176+
177+
// TODO: this intentionally duplicates the subrule pipeline from optimize()
178+
// to reduce the change surface; once optimize_with_context fully replaces
179+
// optimize, the duplication goes away.
180+
fn optimize_with_context(
181+
&self,
182+
plan: Arc<dyn ExecutionPlan>,
183+
context: &dyn PhysicalOptimizerContext,
184+
) -> Result<Arc<dyn ExecutionPlan>> {
185+
let config = context.config_options();
186+
let registry = if config.optimizer.use_statistics_registry {
187+
context.statistics_registry()
188+
} else {
189+
None
190+
};
191+
let subrules: Vec<Box<PipelineFixerSubrule>> = vec![
192+
Box::new(hash_join_convert_symmetric_subrule),
193+
Box::new(hash_join_swap_subrule),
194+
];
195+
let new_plan = plan
196+
.transform_up(|p| apply_subrules(p, &subrules, config))
197+
.data()?;
198+
let mut cache = StatsCache::new();
152199
new_plan
153-
.transform_up(|plan| statistical_join_selection_subrule(plan, config))
200+
.transform_up(|plan| {
201+
statistical_join_selection_subrule(plan, config, registry, &mut cache)
202+
})
154203
.data()
155204
}
156205

@@ -178,6 +227,8 @@ pub(crate) fn try_collect_left(
178227
hash_join: &HashJoinExec,
179228
ignore_threshold: bool,
180229
config: &ConfigOptions,
230+
registry: Option<&StatisticsRegistry>,
231+
cache: &mut StatsCache,
181232
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
182233
let left = hash_join.left();
183234
let right = hash_join.right();
@@ -188,20 +239,24 @@ pub(crate) fn try_collect_left(
188239
&**left,
189240
optimizer_config.hash_join_single_partition_threshold,
190241
optimizer_config.hash_join_single_partition_threshold_rows,
242+
registry,
243+
cache,
191244
);
192245
let right_can_collect = ignore_threshold
193246
|| supports_collect_by_thresholds(
194247
&**right,
195248
optimizer_config.hash_join_single_partition_threshold,
196249
optimizer_config.hash_join_single_partition_threshold_rows,
250+
registry,
251+
cache,
197252
);
198253

199254
match (left_can_collect, right_can_collect) {
200255
(true, true) => {
201256
// Don't swap null-aware anti joins as they have specific side requirements
202257
if hash_join.join_type().supports_swap()
203258
&& !hash_join.null_aware
204-
&& should_swap_join_order(&**left, &**right, config)?
259+
&& should_swap_join_order(&**left, &**right, config, registry, cache)?
205260
{
206261
Ok(Some(hash_join.swap_inputs(PartitionMode::CollectLeft)?))
207262
} else {
@@ -245,13 +300,15 @@ pub(crate) fn try_collect_left(
245300
pub(crate) fn partitioned_hash_join(
246301
hash_join: &HashJoinExec,
247302
config: &ConfigOptions,
303+
registry: Option<&StatisticsRegistry>,
304+
cache: &mut StatsCache,
248305
) -> Result<Arc<dyn ExecutionPlan>> {
249306
let left = hash_join.left();
250307
let right = hash_join.right();
251308
// Don't swap null-aware anti joins as they have specific side requirements
252309
if hash_join.join_type().supports_swap()
253310
&& !hash_join.null_aware
254-
&& should_swap_join_order(&**left, &**right, config)?
311+
&& should_swap_join_order(&**left, &**right, config, registry, cache)?
255312
{
256313
hash_join.swap_inputs(PartitionMode::Partitioned)
257314
} else {
@@ -285,27 +342,31 @@ pub(crate) fn partitioned_hash_join(
285342
fn statistical_join_selection_subrule(
286343
plan: Arc<dyn ExecutionPlan>,
287344
config: &ConfigOptions,
345+
registry: Option<&StatisticsRegistry>,
346+
cache: &mut StatsCache,
288347
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
289348
let transformed =
290349
if let Some(hash_join) = plan.as_any().downcast_ref::<HashJoinExec>() {
291350
match hash_join.partition_mode() {
292-
PartitionMode::Auto => try_collect_left(hash_join, false, config)?
351+
PartitionMode::Auto => try_collect_left(hash_join, false, config, registry, cache)?
293352
.map_or_else(
294-
|| partitioned_hash_join(hash_join, config).map(Some),
295-
|v| Ok(Some(v)),
296-
)?,
297-
PartitionMode::CollectLeft => try_collect_left(hash_join, true, config)?
298-
.map_or_else(
299-
|| partitioned_hash_join(hash_join, config).map(Some),
353+
|| partitioned_hash_join(hash_join, config, registry, cache).map(Some),
300354
|v| Ok(Some(v)),
301355
)?,
356+
PartitionMode::CollectLeft => {
357+
try_collect_left(hash_join, true, config, registry, cache)?
358+
.map_or_else(
359+
|| partitioned_hash_join(hash_join, config, registry, cache).map(Some),
360+
|v| Ok(Some(v)),
361+
)?
362+
}
302363
PartitionMode::Partitioned => {
303364
let left = hash_join.left();
304365
let right = hash_join.right();
305366
// Don't swap null-aware anti joins as they have specific side requirements
306367
if hash_join.join_type().supports_swap()
307368
&& !hash_join.null_aware
308-
&& should_swap_join_order(&**left, &**right, config)?
369+
&& should_swap_join_order(&**left, &**right, config, registry, cache)?
309370
{
310371
hash_join
311372
.swap_inputs(PartitionMode::Partitioned)
@@ -318,7 +379,7 @@ fn statistical_join_selection_subrule(
318379
} else if let Some(cross_join) = plan.as_any().downcast_ref::<CrossJoinExec>() {
319380
let left = cross_join.left();
320381
let right = cross_join.right();
321-
if should_swap_join_order(&**left, &**right, config)? {
382+
if should_swap_join_order(&**left, &**right, config, registry, cache)? {
322383
cross_join.swap_inputs().map(Some)?
323384
} else {
324385
None
@@ -327,7 +388,7 @@ fn statistical_join_selection_subrule(
327388
let left = nl_join.left();
328389
let right = nl_join.right();
329390
if nl_join.join_type().supports_swap()
330-
&& should_swap_join_order(&**left, &**right, config)?
391+
&& should_swap_join_order(&**left, &**right, config, registry, cache)?
331392
{
332393
nl_join.swap_inputs().map(Some)?
333394
} else {

datafusion/physical-plan/src/operator_statistics/mod.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -947,8 +947,10 @@ impl StatisticsProvider for JoinStatisticsProvider {
947947
Precision::Inexact(estimated)
948948
};
949949

950-
// TODO: once #20184 lands, pass enhanced child_stats to partition_statistics
951-
// so column-level stats (NDV, min/max) propagate through the registry walk.
950+
// TODO: column-level stats (NDV, min/max) enriched by the registry walk
951+
// are lost here because partition_statistics(None) re-fetches raw child
952+
// stats internally. Once #20184 lands, pass enhanced child_stats so the
953+
// operator's built-in column mapping uses them instead.
952954
let mut base = Arc::unwrap_or_clone(plan.partition_statistics(None)?);
953955
rescale_byte_size(&mut base, num_rows);
954956
Ok(StatisticsResult::Computed(ExtendedStatistics::new(base)))
@@ -1001,8 +1003,10 @@ impl StatisticsProvider for LimitStatisticsProvider {
10011003
},
10021004
};
10031005

1004-
// TODO: once #20184 lands, pass enhanced child_stats to partition_statistics
1005-
// so column-level stats (NDV, min/max) propagate through the registry walk.
1006+
// TODO: column-level stats (NDV, min/max) enriched by the registry walk
1007+
// are lost here because partition_statistics(None) re-fetches raw child
1008+
// stats internally. Once #20184 lands, pass enhanced child_stats so the
1009+
// operator's built-in column mapping uses them instead.
10061010
let mut base = Arc::unwrap_or_clone(plan.partition_statistics(None)?);
10071011
rescale_byte_size(&mut base, num_rows);
10081012
Ok(StatisticsResult::Computed(ExtendedStatistics::new(base)))
@@ -1044,8 +1048,10 @@ impl StatisticsProvider for UnionStatisticsProvider {
10441048
},
10451049
)?;
10461050

1047-
// TODO: once #20184 lands, pass enhanced child_stats to partition_statistics
1048-
// so column-level stats (NDV, min/max) propagate through the registry walk.
1051+
// TODO: column-level stats (NDV, min/max) enriched by the registry walk
1052+
// are lost here because partition_statistics(None) re-fetches raw child
1053+
// stats internally. Once #20184 lands, pass enhanced child_stats so the
1054+
// operator's built-in column mapping uses them instead.
10491055
let mut base = Arc::unwrap_or_clone(plan.partition_statistics(None)?);
10501056
rescale_byte_size(&mut base, total);
10511057
Ok(StatisticsResult::Computed(ExtendedStatistics::new(base)))

datafusion/sqllogictest/test_files/information_schema.slt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,7 @@ datafusion.optimizer.hash_join_inlist_pushdown_max_size 131072
315315
datafusion.optimizer.hash_join_single_partition_threshold 1048576
316316
datafusion.optimizer.hash_join_single_partition_threshold_rows 131072
317317
datafusion.optimizer.join_reordering true
318+
datafusion.optimizer.use_statistics_registry false
318319
datafusion.optimizer.max_passes 3
319320
datafusion.optimizer.prefer_existing_sort false
320321
datafusion.optimizer.prefer_existing_union false
@@ -458,6 +459,7 @@ datafusion.optimizer.hash_join_inlist_pushdown_max_size 131072 Maximum size in b
458459
datafusion.optimizer.hash_join_single_partition_threshold 1048576 The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition
459460
datafusion.optimizer.hash_join_single_partition_threshold_rows 131072 The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition
460461
datafusion.optimizer.join_reordering true When set to true, the physical plan optimizer may swap join inputs based on statistics. When set to false, statistics-driven join input reordering is disabled and the original join order in the query is used.
462+
datafusion.optimizer.use_statistics_registry false When set to true, the physical plan optimizer uses the pluggable StatisticsRegistry for statistics propagation across operators. This enables more accurate cardinality estimates compared to each operator's built-in partition_statistics.
461463
datafusion.optimizer.max_passes 3 Number of times that the optimizer will attempt to optimize the plan
462464
datafusion.optimizer.prefer_existing_sort false When true, DataFusion will opportunistically remove sorts when the data is already sorted, (i.e. setting `preserve_order` to true on `RepartitionExec` and using `SortPreservingMergeExec`) When false, DataFusion will maximize plan parallelism using `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`.
463465
datafusion.optimizer.prefer_existing_union false When set to true, the optimizer will not attempt to convert Union to Interleave

0 commit comments

Comments
 (0)