You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Add use_statistics_registry config and registry-aware join selection
Adds a pluggable statistics path for JoinSelection that uses the
StatisticsRegistry instead of each operator's built-in partition_statistics.
- Add optimizer.use_statistics_registry config flag (default=false)
- Override optimize_with_context in JoinSelection to pass the registry
to should_swap_join_order when the flag is enabled; if no registry is
set on SessionState the built-in default is constructed lazily
- Add statistics_registry.slt demonstrating how the registry produces
more conservative join estimates for skewed data (10*10=100 cartesian
fallback vs 10*10/3=33 range-NDV estimate), triggering the correct
build-side swap that the built-in estimator misses
style: fix prettier formatting in configs.md
use datafusion_physical_plan::operator_statistics::StatisticsRegistry;
40
43
use datafusion_physical_plan::{ExecutionPlan,ExecutionPlanProperties};
41
44
use std::sync::Arc;
42
45
@@ -53,36 +56,49 @@ impl JoinSelection {
53
56
}
54
57
}
55
58
59
+
/// Get statistics for a plan node, using the registry if available.
60
+
fnget_stats(
61
+
plan:&dynExecutionPlan,
62
+
registry:Option<&StatisticsRegistry>,
63
+
) -> Result<Arc<Statistics>>{
64
+
ifletSome(reg) = registry {
65
+
reg.compute(plan)
66
+
.map(|s| Arc::<Statistics>::clone(s.base_arc()))
67
+
}else{
68
+
plan.partition_statistics(None)
69
+
}
70
+
}
71
+
56
72
// TODO: We need some performance test for Right Semi/Right Join swap to Left Semi/Left Join in case that the right side is smaller but not much smaller.
57
73
// TODO: In PrestoSQL, the optimizer flips join sides only if one side is much smaller than the other by more than SIZE_DIFFERENCE_THRESHOLD times, by default is 8 times.
58
74
/// Checks whether join inputs should be swapped using available statistics.
59
75
///
60
76
/// It follows these steps:
61
-
/// 1. Compare the in-memory sizes of both sides, and place the smaller side on
77
+
/// 1. If a [`StatisticsRegistry`] is provided, use it for cross-operator estimates
78
+
/// (e.g., intermediate join outputs that would otherwise have `Absent` statistics).
79
+
/// 2. Compare the in-memory sizes of both sides, and place the smaller side on
62
80
/// the left (build) side.
63
-
/// 2. If in-memory byte sizes are unavailable, fall back to row counts.
64
-
/// 3. Do not reorder the join if neither statistic is available, or if
81
+
/// 3. If in-memory byte sizes are unavailable, fall back to row counts.
82
+
/// 4. Do not reorder the join if neither statistic is available, or if
65
83
/// `datafusion.optimizer.join_reordering` is disabled.
66
84
///
67
-
///
68
85
/// Used configurations inside arg `config`
69
86
/// - `config.optimizer.join_reordering`: allows or forbids statistics-driven join swapping
70
87
pub(crate)fnshould_swap_join_order(
71
88
left:&dynExecutionPlan,
72
89
right:&dynExecutionPlan,
73
90
config:&ConfigOptions,
91
+
registry:Option<&StatisticsRegistry>,
74
92
) -> Result<bool>{
75
93
if !config.optimizer.join_reordering{
76
94
returnOk(false);
77
95
}
78
96
79
-
// Get the left and right table's total bytes
80
-
// If both the left and right tables contain total_byte_size statistics,
81
-
// use `total_byte_size` to determine `should_swap_join_order`, else use `num_rows`
82
-
let left_stats = left.partition_statistics(None)?;
83
-
let right_stats = right.partition_statistics(None)?;
84
-
// First compare `total_byte_size` of left and right side,
85
-
// if information in this field is insufficient fallback to the `num_rows`
97
+
let left_stats = get_stats(left, registry)?;
98
+
let right_stats = get_stats(right, registry)?;
99
+
100
+
// First compare total_byte_size, then fall back to num_rows if byte
0 commit comments