Skip to content

Commit 7cadeca

Browse files
committed
refactor: scope SPM prefetch buffer to sort elimination path only
Add prefetch field to SortPreservingMergeExec (default 1) instead of hardcoding buffer=16 globally. PushdownSort sets prefetch=16 only when eliminating SortExec under SPM, avoiding impact on unrelated queries. Also detect SPM → SortExec(preserve_partitioning) pattern directly in PushdownSort to handle sort elimination and prefetch in one pass.
1 parent c1d0a33 commit 7cadeca

2 files changed

Lines changed: 87 additions & 3 deletions

File tree

datafusion/physical-optimizer/src/pushdown_sort.rs

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,13 @@ use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
6161
use datafusion_physical_plan::ExecutionPlan;
6262
use datafusion_physical_plan::SortOrderPushdownResult;
6363
use datafusion_physical_plan::sorts::sort::SortExec;
64+
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
6465
use std::sync::Arc;
6566

67+
/// Prefetch buffer size for SortPreservingMergeExec when sort elimination
68+
/// removes the buffering SortExec between SPM and DataSourceExec.
69+
const SPM_PREFETCH_AFTER_SORT_ELIMINATION: usize = 16;
70+
6671
/// A PhysicalOptimizerRule that attempts to push down sort requirements to data sources.
6772
///
6873
/// See module-level documentation for details.
@@ -87,8 +92,59 @@ impl PhysicalOptimizerRule for PushdownSort {
8792
}
8893

8994
// Use transform_down to find and optimize all SortExec nodes (including nested ones)
95+
// Also handles SPM → SortExec pattern to set prefetch when sort is eliminated
9096
plan.transform_down(|plan: Arc<dyn ExecutionPlan>| {
91-
// Check if this is a SortExec
97+
// Pattern 1: SPM → SortExec(preserve_partitioning)
98+
// When we eliminate the SortExec, SPM loses its memory buffer and reads
99+
// directly from I/O-bound sources. Set a larger prefetch to pipeline I/O.
100+
if let Some(spm) = plan.downcast_ref::<SortPreservingMergeExec>()
101+
&& let Some(sort_child) = spm.input().downcast_ref::<SortExec>()
102+
&& sort_child.preserve_partitioning()
103+
{
104+
let sort_input = Arc::clone(sort_child.input());
105+
let required_ordering = sort_child.expr();
106+
match sort_input.try_pushdown_sort(required_ordering)? {
107+
SortOrderPushdownResult::Exact { inner } => {
108+
let inner = if let Some(fetch) = sort_child.fetch() {
109+
inner.with_fetch(Some(fetch)).unwrap_or(inner)
110+
} else {
111+
inner
112+
};
113+
let new_spm = SortPreservingMergeExec::new(
114+
spm.expr().clone(),
115+
inner,
116+
)
117+
.with_fetch(spm.fetch())
118+
.with_round_robin_repartition(
119+
spm.enable_round_robin_repartition(),
120+
)
121+
.with_prefetch(SPM_PREFETCH_AFTER_SORT_ELIMINATION);
122+
return Ok(Transformed::yes(Arc::new(new_spm)));
123+
}
124+
SortOrderPushdownResult::Inexact { inner } => {
125+
let new_sort = SortExec::new(
126+
required_ordering.clone(),
127+
inner,
128+
)
129+
.with_fetch(sort_child.fetch())
130+
.with_preserve_partitioning(true);
131+
let new_spm = SortPreservingMergeExec::new(
132+
spm.expr().clone(),
133+
Arc::new(new_sort),
134+
)
135+
.with_fetch(spm.fetch())
136+
.with_round_robin_repartition(
137+
spm.enable_round_robin_repartition(),
138+
);
139+
return Ok(Transformed::yes(Arc::new(new_spm)));
140+
}
141+
SortOrderPushdownResult::Unsupported => {
142+
return Ok(Transformed::no(plan));
143+
}
144+
}
145+
}
146+
147+
// Pattern 2: Standalone SortExec (no SPM parent)
92148
let Some(sort_exec) = plan.downcast_ref::<SortExec>() else {
93149
return Ok(Transformed::no(plan));
94150
};

datafusion/physical-plan/src/sorts/sort_preserving_merge.rs

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,12 @@ pub struct SortPreservingMergeExec {
100100
///
101101
/// See [`Self::with_round_robin_repartition`] for more information.
102102
enable_round_robin_repartition: bool,
103+
/// Number of batches to prefetch from each input partition.
104+
///
105+
/// When SPM reads directly from I/O-bound sources (e.g., after sort
106+
/// elimination removes a buffering SortExec), a larger prefetch allows
107+
/// pipelining I/O with merge computation. Defaults to 1.
108+
prefetch: usize,
103109
}
104110

105111
impl SortPreservingMergeExec {
@@ -113,6 +119,7 @@ impl SortPreservingMergeExec {
113119
fetch: None,
114120
cache: Arc::new(cache),
115121
enable_round_robin_repartition: true,
122+
prefetch: 1,
116123
}
117124
}
118125

@@ -139,6 +146,21 @@ impl SortPreservingMergeExec {
139146
self
140147
}
141148

149+
/// Sets the number of batches to prefetch from each input partition.
150+
///
151+
/// A larger value allows pipelining I/O with merge computation, which
152+
/// helps when inputs are I/O-bound (e.g., reading directly from
153+
/// DataSourceExec without a buffering SortExec in between).
154+
pub fn with_prefetch(mut self, prefetch: usize) -> Self {
155+
self.prefetch = prefetch;
156+
self
157+
}
158+
159+
/// Returns the prefetch buffer size
160+
pub fn prefetch(&self) -> usize {
161+
self.prefetch
162+
}
163+
142164
/// Input schema
143165
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
144166
&self.input
@@ -154,6 +176,11 @@ impl SortPreservingMergeExec {
154176
self.fetch
155177
}
156178

179+
/// Whether round-robin repartition is enabled
180+
pub fn enable_round_robin_repartition(&self) -> bool {
181+
self.enable_round_robin_repartition
182+
}
183+
157184
/// Creates the cache object that stores the plan properties
158185
/// such as schema, equivalence properties, ordering, partitioning, etc.
159186
fn compute_properties(
@@ -250,7 +277,8 @@ impl ExecutionPlan for SortPreservingMergeExec {
250277
metrics: self.metrics.clone(),
251278
fetch: limit,
252279
cache: Arc::clone(&self.cache),
253-
enable_round_robin_repartition: true,
280+
enable_round_robin_repartition: self.enable_round_robin_repartition,
281+
prefetch: self.prefetch,
254282
}))
255283
}
256284

@@ -361,7 +389,7 @@ impl ExecutionPlan for SortPreservingMergeExec {
361389
.map(|partition| {
362390
let stream =
363391
self.input.execute(partition, Arc::clone(&context))?;
364-
Ok(spawn_buffered(stream, 16))
392+
Ok(spawn_buffered(stream, self.prefetch))
365393
})
366394
.collect::<Result<_>>()?;
367395

0 commit comments

Comments
 (0)