Skip to content

Commit 11c2fbc

Browse files
authored
Misc minor optimization in the Physical Optimizer (#21216)
## Which issue does this PR close? - Closes #. ## Rationale for this change Similar to #21128, just trying to shave time off the optimizer. Locally, it improves some sql-planner benchmarks by up to 10% but they seem relatively noisy on my laptop. ## What changes are included in this PR? 1. Avoid allocation `plan.children()` in a loop in `sort_pushdown.rs`. 2. Try and avoid some expensive tree rewrites in `join_selection.rs` 3. Avoid deep clones of exec limit nodes in `limit_pushdown.rs`, and only mutate the plan if it was actually changed. 4. Use cheaper code path to change the limit on an `AggregateExec` in `limited_distinct_aggregation.rs`. 5. Use a read-only traversal in `sanity_checker.rs`. Its read only and `transform_up` is always more expensive. I've considered extending `TreeNode` but this seems to be basically the only place in the codebase that does something like this. There are a few places where we unconditionally return `Transformed::yes` which might unintended downstream consequences because it breaks pointer equality and I think it also just end up allocating more memory, but they are harder to untangle so I'll try and do them in followups. ## Are these changes tested? One new test for limits, otherwise the existing tests. ## Are there any user-facing changes? Removes the `LimitExec` type, I can't imagine why someone would use it, and its only used in one place. Happy to bring it back as a deprecated type. --------- Signed-off-by: Adam Gutglick <adamgsal@gmail.com>
1 parent 2c08ddb commit 11c2fbc

File tree

10 files changed

+135
-105
lines changed

10 files changed

+135
-105
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,3 +75,6 @@ rat.txt
7575

7676
# data generated by examples
7777
datafusion-examples/examples/datafusion-examples/
78+
79+
# Samply profile data
80+
profile.json.gz

datafusion/core/tests/physical_optimizer/enforce_sorting.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1256,7 +1256,8 @@ async fn test_union_inputs_different_sorted_with_limit() -> Result<()> {
12561256
let physical_plan = sort_preserving_merge_exec(ordering3, union);
12571257

12581258
let test = EnforceSortingTest::new(physical_plan).with_repartition_sorts(true);
1259-
// Should not change the unnecessarily fine `SortExec`s because there is `LimitExec`
1259+
// Should not change the unnecessarily fine `SortExec`s because there are
1260+
// explicit limit nodes above the second sort.
12601261
assert_snapshot!(test.run(), @r"
12611262
Input Plan:
12621263
SortPreservingMergeExec: [nullable_col@0 ASC]

datafusion/core/tests/physical_optimizer/limit_pushdown.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -682,3 +682,38 @@ fn preserves_skip_before_sort() -> Result<()> {
682682

683683
Ok(())
684684
}
685+
686+
#[test]
687+
fn no_limit_preserves_plan_identity() -> Result<()> {
688+
// When there is no limit in the plan, the optimizer should return the
689+
// exact same Arc (pointer-equal) for every node, avoiding unnecessary
690+
// plan reconstruction and property recomputation.
691+
let schema = create_schema();
692+
693+
let left = empty_exec(Arc::clone(&schema));
694+
let right = empty_exec(Arc::clone(&schema));
695+
let on = join_on_columns("c1", "c1");
696+
let join = hash_join_exec(left, right, on, None, &JoinType::Inner)?;
697+
let plan = filter_exec(Arc::clone(&schema), join)?;
698+
699+
let optimized =
700+
LimitPushdown::new().optimize(Arc::clone(&plan), &ConfigOptions::new())?;
701+
702+
assert!(
703+
Arc::ptr_eq(&plan, &optimized),
704+
"Expected optimizer to return the same Arc when no limit is present"
705+
);
706+
707+
let optimized = format_plan(&optimized);
708+
insta::assert_snapshot!(
709+
optimized,
710+
@r"
711+
FilterExec: c3@2 > 0
712+
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c1@0, c1@0)]
713+
EmptyExec
714+
EmptyExec
715+
"
716+
);
717+
718+
Ok(())
719+
}

datafusion/physical-optimizer/src/enforce_sorting/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -548,8 +548,9 @@ pub fn ensure_sorting(
548548

549549
/// Analyzes if there are any immediate sort removals by checking the `SortExec`s
550550
/// and their ordering requirement satisfactions with children
551-
/// If the sort is unnecessary, either replaces it with [`SortPreservingMergeExec`]/`LimitExec`
552-
/// or removes the [`SortExec`].
551+
/// If the sort is unnecessary, either replaces it with
552+
/// [`SortPreservingMergeExec`] and/or a limit node, or removes the
553+
/// [`SortExec`].
553554
/// Otherwise, returns the original plan
554555
fn analyze_immediate_sort_removal(
555556
mut node: PlanWithCorrespondingSort,

datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -680,8 +680,10 @@ fn handle_custom_pushdown(
680680
parent_required: OrderingRequirements,
681681
maintains_input_order: &[bool],
682682
) -> Result<Option<Vec<Option<OrderingRequirements>>>> {
683+
let plan_children = plan.children();
684+
683685
// If the plan has no children, return early:
684-
if plan.children().is_empty() {
686+
if plan_children.is_empty() {
685687
return Ok(None);
686688
}
687689

@@ -699,8 +701,7 @@ fn handle_custom_pushdown(
699701
.collect();
700702

701703
// Get the number of fields in each child's schema:
702-
let children_schema_lengths: Vec<usize> = plan
703-
.children()
704+
let children_schema_lengths: Vec<usize> = plan_children
704705
.iter()
705706
.map(|c| c.schema().fields().len())
706707
.collect();
@@ -734,7 +735,7 @@ fn handle_custom_pushdown(
734735
let updated_parent_req = requirement
735736
.into_iter()
736737
.map(|req| {
737-
let child_schema = plan.children()[maintained_child_idx].schema();
738+
let child_schema = plan_children[maintained_child_idx].schema();
738739
let updated_columns = req
739740
.expr
740741
.transform_up(|expr| {
@@ -809,13 +810,15 @@ fn handle_hash_join(
809810

810811
let all_from_right_child = all_indices.iter().all(|i| *i >= len_of_left_fields);
811812

813+
let plan_children = plan.children();
814+
812815
// If all columns are from the right child, update the parent requirements
813816
if all_from_right_child {
814817
// Transform the parent-required expression for the child schema by adjusting columns
815818
let updated_parent_req = requirement
816819
.into_iter()
817820
.map(|req| {
818-
let child_schema = plan.children()[1].schema();
821+
let child_schema = plan_children[1].schema();
819822
let updated_columns = req
820823
.expr
821824
.transform_up(|expr| {

datafusion/physical-optimizer/src/join_selection.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -563,10 +563,14 @@ fn apply_subrules(
563563
subrules: &Vec<Box<PipelineFixerSubrule>>,
564564
config_options: &ConfigOptions,
565565
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
566+
let original = Arc::clone(&input);
566567
for subrule in subrules {
567568
input = subrule(input, config_options)?;
568569
}
569-
Ok(Transformed::yes(input))
570+
571+
let transformed = !Arc::ptr_eq(&original, &input);
572+
573+
Ok(Transformed::new_transformed(input, transformed))
570574
}
571575

572576
// See tests in datafusion/core/tests/physical_optimizer

datafusion/physical-optimizer/src/limit_pushdown.rs

Lines changed: 46 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@
1919
//! data transfer as much as possible.
2020
//!
2121
//! # Plan Limit Absorption
22-
//! In addition to pushing down [`LimitExec`] in the plan, some operators can
23-
//! "absorb" a limit and stop early during execution.
22+
//! In addition to pushing down `GlobalLimitExec` and `LocalLimitExec` nodes in
23+
//! the plan, some operators can "absorb" a limit and stop early during
24+
//! execution.
2425
//!
2526
//! ## Background: vectorized volcano execution model
2627
//! DataFusion uses a batched volcano model. For most operators, output is
@@ -33,7 +34,7 @@
3334
//! ## Example
3435
//! For a join with an expensive, selective predicate:
3536
//! ```text
36-
//! LimitExec(fetch=10)
37+
//! GlobalLimitExec: skip=0, fetch=10
3738
//! -- NestedLoopJoinExec(on=expr_expensive_and_selective)
3839
//! --- DataSourceExec()
3940
//! --- DataSourceExec()
@@ -78,14 +79,13 @@ use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
7879
pub struct LimitPushdown {}
7980

8081
/// This is a "data class" we use within the [`LimitPushdown`] rule to push
81-
/// down [`LimitExec`] in the plan. GlobalRequirements are hold as a rule-wide state
82+
/// down limits in the plan. GlobalRequirements are hold as a rule-wide state
8283
/// and holds the fetch and skip information. The struct also has a field named
8384
/// satisfied which means if the "current" plan is valid in terms of limits or not.
8485
///
8586
/// For example: If the plan is satisfied with current fetch info, we decide to not add a LocalLimit
8687
///
8788
/// [`LimitPushdown`]: crate::limit_pushdown::LimitPushdown
88-
/// [`LimitExec`]: crate::limit_pushdown::LimitExec
8989
#[derive(Default, Clone, Debug)]
9090
pub struct GlobalRequirements {
9191
fetch: Option<usize>,
@@ -125,51 +125,11 @@ impl PhysicalOptimizerRule for LimitPushdown {
125125
}
126126
}
127127

128-
/// This enumeration makes `skip` and `fetch` calculations easier by providing
129-
/// a single API for both local and global limit operators.
130-
#[derive(Debug)]
131-
pub enum LimitExec {
132-
Global(GlobalLimitExec),
133-
Local(LocalLimitExec),
134-
}
135-
136-
impl LimitExec {
137-
fn input(&self) -> &Arc<dyn ExecutionPlan> {
138-
match self {
139-
Self::Global(global) => global.input(),
140-
Self::Local(local) => local.input(),
141-
}
142-
}
143-
144-
fn fetch(&self) -> Option<usize> {
145-
match self {
146-
Self::Global(global) => global.fetch(),
147-
Self::Local(local) => Some(local.fetch()),
148-
}
149-
}
150-
151-
fn skip(&self) -> usize {
152-
match self {
153-
Self::Global(global) => global.skip(),
154-
Self::Local(_) => 0,
155-
}
156-
}
157-
158-
fn preserve_order(&self) -> bool {
159-
match self {
160-
Self::Global(global) => global.required_ordering().is_some(),
161-
Self::Local(local) => local.required_ordering().is_some(),
162-
}
163-
}
164-
}
165-
166-
impl From<LimitExec> for Arc<dyn ExecutionPlan> {
167-
fn from(limit_exec: LimitExec) -> Self {
168-
match limit_exec {
169-
LimitExec::Global(global) => Arc::new(global),
170-
LimitExec::Local(local) => Arc::new(local),
171-
}
172-
}
128+
struct LimitInfo {
129+
input: Arc<dyn ExecutionPlan>,
130+
fetch: Option<usize>,
131+
skip: usize,
132+
preserve_order: bool,
173133
}
174134

175135
/// This function is the main helper function of the `LimitPushDown` rule.
@@ -184,26 +144,26 @@ pub fn pushdown_limit_helper(
184144
mut global_state: GlobalRequirements,
185145
) -> Result<(Transformed<Arc<dyn ExecutionPlan>>, GlobalRequirements)> {
186146
// Extract limit, if exist, and return child inputs.
187-
if let Some(limit_exec) = extract_limit(&pushdown_plan) {
147+
if let Some(limit_info) = extract_limit(&pushdown_plan) {
188148
// If we have fetch/skip info in the global state already, we need to
189149
// decide which one to continue with:
190150
let (skip, fetch) = combine_limit(
191151
global_state.skip,
192152
global_state.fetch,
193-
limit_exec.skip(),
194-
limit_exec.fetch(),
153+
limit_info.skip,
154+
limit_info.fetch,
195155
);
196156
global_state.skip = skip;
197157
global_state.fetch = fetch;
198-
global_state.preserve_order = limit_exec.preserve_order();
158+
global_state.preserve_order = limit_info.preserve_order;
199159
global_state.satisfied = false;
200160

201161
// Now the global state has the most recent information, we can remove
202-
// the `LimitExec` plan. We will decide later if we should add it again
203-
// or not.
162+
// the limit node. We will decide later if we should add it again or
163+
// not.
204164
return Ok((
205165
Transformed {
206-
data: Arc::clone(limit_exec.input()),
166+
data: limit_info.input,
207167
transformed: true,
208168
tnr: TreeNodeRecursion::Stop,
209169
},
@@ -253,7 +213,7 @@ pub fn pushdown_limit_helper(
253213
Ok((Transformed::no(pushdown_plan), global_state))
254214
} else if let Some(plan_with_fetch) = pushdown_plan.with_fetch(skip_and_fetch) {
255215
// This plan is combining input partitions, so we need to add the
256-
// fetch info to plan if possible. If not, we must add a `LimitExec`
216+
// fetch info to plan if possible. If not, we must add a limit node
257217
// with the information from the global state.
258218
let mut new_plan = plan_with_fetch;
259219
// Execution plans can't (yet) handle skip, so if we have one,
@@ -341,32 +301,45 @@ pub(crate) fn pushdown_limits(
341301

342302
// Apply pushdown limits in children
343303
let children = new_node.data.children();
304+
let mut changed = false;
344305
let new_children = children
345306
.into_iter()
346-
.map(|child| {
347-
pushdown_limits(Arc::<dyn ExecutionPlan>::clone(child), global_state.clone())
307+
.map(|child: &Arc<dyn ExecutionPlan>| {
308+
let new_child = pushdown_limits(
309+
Arc::<dyn ExecutionPlan>::clone(child),
310+
global_state.clone(),
311+
)?;
312+
// Tracking if any of the children changed
313+
changed |= !Arc::ptr_eq(child, &new_child);
314+
Ok(new_child)
348315
})
349316
.collect::<Result<_>>()?;
350-
new_node.data.with_new_children(new_children)
317+
318+
if changed {
319+
new_node.data.with_new_children(new_children)
320+
} else {
321+
Ok(new_node.data)
322+
}
351323
}
352324

353-
/// Transforms the [`ExecutionPlan`] into a [`LimitExec`] if it is a
325+
/// Extracts limit information from the [`ExecutionPlan`] if it is a
354326
/// [`GlobalLimitExec`] or a [`LocalLimitExec`].
355-
fn extract_limit(plan: &Arc<dyn ExecutionPlan>) -> Option<LimitExec> {
327+
fn extract_limit(plan: &Arc<dyn ExecutionPlan>) -> Option<LimitInfo> {
356328
if let Some(global_limit) = plan.as_any().downcast_ref::<GlobalLimitExec>() {
357-
Some(LimitExec::Global(GlobalLimitExec::new(
358-
Arc::clone(global_limit.input()),
359-
global_limit.skip(),
360-
global_limit.fetch(),
361-
)))
329+
Some(LimitInfo {
330+
input: Arc::clone(global_limit.input()),
331+
fetch: global_limit.fetch(),
332+
skip: global_limit.skip(),
333+
preserve_order: global_limit.required_ordering().is_some(),
334+
})
362335
} else {
363336
plan.as_any()
364337
.downcast_ref::<LocalLimitExec>()
365-
.map(|local_limit| {
366-
LimitExec::Local(LocalLimitExec::new(
367-
Arc::clone(local_limit.input()),
368-
local_limit.fetch(),
369-
))
338+
.map(|local_limit| LimitInfo {
339+
input: Arc::clone(local_limit.input()),
340+
fetch: Some(local_limit.fetch()),
341+
skip: 0,
342+
preserve_order: local_limit.required_ordering().is_some(),
370343
})
371344
}
372345
}

datafusion/physical-optimizer/src/limited_distinct_aggregation.rs

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -54,16 +54,8 @@ impl LimitedDistinctAggregation {
5454
}
5555

5656
// We found what we want: clone, copy the limit down, and return modified node
57-
let new_aggr = AggregateExec::try_new(
58-
*aggr.mode(),
59-
aggr.group_expr().clone(),
60-
aggr.aggr_expr().to_vec(),
61-
aggr.filter_expr().to_vec(),
62-
aggr.input().to_owned(),
63-
aggr.input_schema(),
64-
)
65-
.expect("Unable to copy Aggregate!")
66-
.with_limit_options(Some(LimitOptions::new(limit)));
57+
let new_aggr = aggr.with_new_limit_options(Some(LimitOptions::new(limit)));
58+
6759
Some(Arc::new(new_aggr))
6860
}
6961

datafusion/physical-optimizer/src/output_requirements.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,14 @@ fn require_top_ordering_helper(
430430
// be responsible for (i.e. the originator of) the global ordering.
431431
let (new_child, is_changed) =
432432
require_top_ordering_helper(Arc::clone(children.swap_remove(0)))?;
433-
Ok((plan.with_new_children(vec![new_child])?, is_changed))
433+
434+
let plan = if is_changed {
435+
plan.with_new_children(vec![new_child])?
436+
} else {
437+
plan
438+
};
439+
440+
Ok((plan, is_changed))
434441
} else {
435442
// Stop searching, there is no global ordering desired for the query.
436443
Ok((plan, false))

0 commit comments

Comments
 (0)