Skip to content

Commit eb1af95

Browse files
committed
refactor: replace SPM prefetch with BufferExec for sort elimination
When PushdownSort eliminates SortExec under SortPreservingMergeExec, insert BufferExec(8MB) to replace SortExec's buffering role. This is strictly better: same I/O pipelining with bounded memory, no sort computation, and no impact on other SPM usage. Revert SPM prefetch field — SPM stays at spawn_buffered(1) for all cases. BufferExec only inserted by PushdownSort when needed.
1 parent 63fd896 commit eb1af95

3 files changed

Lines changed: 132 additions & 44 deletions

File tree

datafusion/physical-optimizer/src/pushdown_sort.rs

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,19 @@ use datafusion_common::config::ConfigOptions;
6060
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
6161
use datafusion_physical_plan::ExecutionPlan;
6262
use datafusion_physical_plan::SortOrderPushdownResult;
63+
use datafusion_physical_plan::buffer::BufferExec;
6364
use datafusion_physical_plan::sorts::sort::SortExec;
6465
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
6566
use std::sync::Arc;
6667

67-
/// Prefetch buffer size for SortPreservingMergeExec when sort elimination
68-
/// removes the buffering SortExec between SPM and DataSourceExec.
69-
const SPM_PREFETCH_AFTER_SORT_ELIMINATION: usize = 16;
68+
/// Buffer capacity (in bytes) inserted between SPM and DataSourceExec when
69+
/// sort elimination removes the buffering SortExec.
70+
///
71+
/// SortExec buffers all input data in memory (with spill support) before
72+
/// outputting sorted results. When we eliminate SortExec, SPM reads directly
73+
/// from I/O-bound sources. BufferExec compensates by buffering batches in
74+
/// the background, allowing I/O to pipeline with merge computation.
75+
const BUFFER_CAPACITY_AFTER_SORT_ELIMINATION: usize = 8 * 1024 * 1024; // 8 MB
7076

7177
/// A PhysicalOptimizerRule that attempts to push down sort requirements to data sources.
7278
///
@@ -92,11 +98,11 @@ impl PhysicalOptimizerRule for PushdownSort {
9298
}
9399

94100
// Use transform_down to find and optimize all SortExec nodes (including nested ones)
95-
// Also handles SPM → SortExec pattern to set prefetch when sort is eliminated
101+
// Also handles SPM → SortExec pattern to insert BufferExec when sort is eliminated
96102
plan.transform_down(|plan: Arc<dyn ExecutionPlan>| {
97103
// Pattern 1: SPM → SortExec(preserve_partitioning)
98104
// When we eliminate the SortExec, SPM loses its memory buffer and reads
99-
// directly from I/O-bound sources. Set a larger prefetch to pipeline I/O.
105+
// directly from I/O-bound sources. Insert a BufferExec to compensate.
100106
if let Some(spm) = plan.downcast_ref::<SortPreservingMergeExec>()
101107
&& let Some(sort_child) = spm.input().downcast_ref::<SortExec>()
102108
&& sort_child.preserve_partitioning()
@@ -110,13 +116,16 @@ impl PhysicalOptimizerRule for PushdownSort {
110116
} else {
111117
inner
112118
};
119+
// Insert BufferExec to replace SortExec's buffering role.
120+
// SortExec buffered all data in memory; BufferExec provides
121+
// bounded buffering so SPM doesn't stall on I/O.
122+
let buffered: Arc<dyn ExecutionPlan> = Arc::new(BufferExec::new(
123+
inner,
124+
BUFFER_CAPACITY_AFTER_SORT_ELIMINATION,
125+
));
113126
let new_spm =
114-
SortPreservingMergeExec::new(spm.expr().clone(), inner)
115-
.with_fetch(spm.fetch())
116-
.with_round_robin_repartition(
117-
spm.enable_round_robin_repartition(),
118-
)
119-
.with_prefetch(SPM_PREFETCH_AFTER_SORT_ELIMINATION);
127+
SortPreservingMergeExec::new(spm.expr().clone(), buffered)
128+
.with_fetch(spm.fetch());
120129
return Ok(Transformed::yes(Arc::new(new_spm)));
121130
}
122131
SortOrderPushdownResult::Inexact { inner } => {
@@ -127,10 +136,7 @@ impl PhysicalOptimizerRule for PushdownSort {
127136
spm.expr().clone(),
128137
Arc::new(new_sort),
129138
)
130-
.with_fetch(spm.fetch())
131-
.with_round_robin_repartition(
132-
spm.enable_round_robin_repartition(),
133-
);
139+
.with_fetch(spm.fetch());
134140
return Ok(Transformed::yes(Arc::new(new_spm)));
135141
}
136142
SortOrderPushdownResult::Unsupported => {

datafusion/physical-plan/src/sorts/sort_preserving_merge.rs

Lines changed: 1 addition & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -100,12 +100,6 @@ pub struct SortPreservingMergeExec {
100100
///
101101
/// See [`Self::with_round_robin_repartition`] for more information.
102102
enable_round_robin_repartition: bool,
103-
/// Number of batches to prefetch from each input partition.
104-
///
105-
/// When SPM reads directly from I/O-bound sources (e.g., after sort
106-
/// elimination removes a buffering SortExec), a larger prefetch allows
107-
/// pipelining I/O with merge computation. Defaults to 1.
108-
prefetch: usize,
109103
}
110104

111105
impl SortPreservingMergeExec {
@@ -119,7 +113,6 @@ impl SortPreservingMergeExec {
119113
fetch: None,
120114
cache: Arc::new(cache),
121115
enable_round_robin_repartition: true,
122-
prefetch: 1,
123116
}
124117
}
125118

@@ -146,21 +139,6 @@ impl SortPreservingMergeExec {
146139
self
147140
}
148141

149-
/// Sets the number of batches to prefetch from each input partition.
150-
///
151-
/// A larger value allows pipelining I/O with merge computation, which
152-
/// helps when inputs are I/O-bound (e.g., reading directly from
153-
/// DataSourceExec without a buffering SortExec in between).
154-
pub fn with_prefetch(mut self, prefetch: usize) -> Self {
155-
self.prefetch = prefetch;
156-
self
157-
}
158-
159-
/// Returns the prefetch buffer size
160-
pub fn prefetch(&self) -> usize {
161-
self.prefetch
162-
}
163-
164142
/// Input schema
165143
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
166144
&self.input
@@ -176,11 +154,6 @@ impl SortPreservingMergeExec {
176154
self.fetch
177155
}
178156

179-
/// Whether round-robin repartition is enabled
180-
pub fn enable_round_robin_repartition(&self) -> bool {
181-
self.enable_round_robin_repartition
182-
}
183-
184157
/// Creates the cache object that stores the plan properties
185158
/// such as schema, equivalence properties, ordering, partitioning, etc.
186159
fn compute_properties(
@@ -278,7 +251,6 @@ impl ExecutionPlan for SortPreservingMergeExec {
278251
fetch: limit,
279252
cache: Arc::clone(&self.cache),
280253
enable_round_robin_repartition: self.enable_round_robin_repartition,
281-
prefetch: self.prefetch,
282254
}))
283255
}
284256

@@ -389,7 +361,7 @@ impl ExecutionPlan for SortPreservingMergeExec {
389361
.map(|partition| {
390362
let stream =
391363
self.input.execute(partition, Arc::clone(&context))?;
392-
Ok(spawn_buffered(stream, self.prefetch))
364+
Ok(spawn_buffered(stream, 1))
393365
})
394366
.collect::<Result<_>>()?;
395367

datafusion/sqllogictest/test_files/sort_pushdown.slt

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2161,6 +2161,116 @@ DROP TABLE null_src_b;
21612161
statement ok
21622162
DROP TABLE tf_nulls_last;
21632163

2164+
# ===========================================================
2165+
# Test G: BufferExec insertion when sort elimination removes
2166+
# SortExec under SortPreservingMergeExec.
2167+
#
2168+
# When PushdownSort eliminates SortExec(preserve_partitioning=true),
2169+
# SPM loses SortExec's memory buffer. A BufferExec is inserted to
2170+
# compensate, allowing I/O pipelining with merge computation.
2171+
# ===========================================================
2172+
2173+
# Create files with reversed naming: c_low has smallest values,
2174+
# a_high has largest — alphabetical order ≠ sort key order.
2175+
statement ok
2176+
CREATE TABLE tg_src_low(id INT, value INT) AS VALUES (1, 100), (2, 200), (3, 300);
2177+
2178+
statement ok
2179+
CREATE TABLE tg_src_mid(id INT, value INT) AS VALUES (4, 400), (5, 500), (6, 600);
2180+
2181+
statement ok
2182+
CREATE TABLE tg_src_high(id INT, value INT) AS VALUES (7, 700), (8, 800), (9, 900), (10, 1000);
2183+
2184+
query I
2185+
COPY (SELECT * FROM tg_src_low ORDER BY id ASC)
2186+
TO 'test_files/scratch/sort_pushdown/tg_buffer/c_low.parquet';
2187+
----
2188+
3
2189+
2190+
query I
2191+
COPY (SELECT * FROM tg_src_mid ORDER BY id ASC)
2192+
TO 'test_files/scratch/sort_pushdown/tg_buffer/b_mid.parquet';
2193+
----
2194+
3
2195+
2196+
query I
2197+
COPY (SELECT * FROM tg_src_high ORDER BY id ASC)
2198+
TO 'test_files/scratch/sort_pushdown/tg_buffer/a_high.parquet';
2199+
----
2200+
4
2201+
2202+
# Use target_partitions=2 so files are split across 2 groups.
2203+
# Files are in wrong alphabetical order → validated_output_ordering strips ordering
2204+
# → EnforceSorting adds SortExec(preserve_partitioning) + SPM
2205+
# → PushdownSort eliminates SortExec and inserts BufferExec
2206+
statement ok
2207+
SET datafusion.execution.target_partitions = 2;
2208+
2209+
statement ok
2210+
CREATE EXTERNAL TABLE tg_buffer(id INT, value INT)
2211+
STORED AS PARQUET
2212+
LOCATION 'test_files/scratch/sort_pushdown/tg_buffer/'
2213+
WITH ORDER (id ASC);
2214+
2215+
# Test G.1: BufferExec appears between SPM and DataSourceExec
2216+
query TT
2217+
EXPLAIN SELECT * FROM tg_buffer ORDER BY id ASC;
2218+
----
2219+
logical_plan
2220+
01)Sort: tg_buffer.id ASC NULLS LAST
2221+
02)--TableScan: tg_buffer projection=[id, value]
2222+
physical_plan
2223+
01)SortPreservingMergeExec: [id@0 ASC NULLS LAST]
2224+
02)--BufferExec: capacity=8388608
2225+
03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/a_high.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/c_low.parquet]]}, projection=[id, value], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
2226+
2227+
# Verify correctness
2228+
query II
2229+
SELECT * FROM tg_buffer ORDER BY id ASC;
2230+
----
2231+
1 100
2232+
2 200
2233+
3 300
2234+
4 400
2235+
5 500
2236+
6 600
2237+
7 700
2238+
8 800
2239+
9 900
2240+
10 1000
2241+
2242+
# Test G.2: LIMIT query with BufferExec
2243+
query TT
2244+
EXPLAIN SELECT * FROM tg_buffer ORDER BY id ASC LIMIT 3;
2245+
----
2246+
logical_plan
2247+
01)Sort: tg_buffer.id ASC NULLS LAST, fetch=3
2248+
02)--TableScan: tg_buffer projection=[id, value]
2249+
physical_plan
2250+
01)SortPreservingMergeExec: [id@0 ASC NULLS LAST], fetch=3
2251+
02)--BufferExec: capacity=8388608
2252+
03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/a_high.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/c_low.parquet]]}, projection=[id, value], limit=3, output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
2253+
2254+
query II
2255+
SELECT * FROM tg_buffer ORDER BY id ASC LIMIT 3;
2256+
----
2257+
1 100
2258+
2 200
2259+
3 300
2260+
2261+
# Cleanup Test G
2262+
statement ok
2263+
DROP TABLE tg_src_low;
2264+
2265+
statement ok
2266+
DROP TABLE tg_src_mid;
2267+
2268+
statement ok
2269+
DROP TABLE tg_src_high;
2270+
2271+
statement ok
2272+
DROP TABLE tg_buffer;
2273+
21642274
# Reset settings (SLT runner uses target_partitions=4, not system default)
21652275
statement ok
21662276
SET datafusion.execution.target_partitions = 4;

0 commit comments

Comments
 (0)