Skip to content

Commit 3e34877

Browse files
committed
feat: make sort pushdown BufferExec capacity configurable, default 1GB
Replace hardcoded 64MB BUFFER_CAPACITY_AFTER_SORT_ELIMINATION with configurable `datafusion.execution.sort_pushdown_buffer_capacity`. Default 1GB — large enough to hold wide-row data without I/O stalls. This is strictly less memory than the SortExec it replaces, and actual usage is bounded by partition size and global memory pool limits. Closes #21417
1 parent cdfade5 commit 3e34877

3 files changed

Lines changed: 19 additions & 20 deletions

File tree

datafusion/common/src/config.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -557,6 +557,19 @@ config_namespace! {
557557
/// batches and merged.
558558
pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024
559559

560+
/// Maximum buffer capacity (in bytes) per partition for BufferExec
561+
/// inserted during sort pushdown optimization.
562+
///
563+
/// When PushdownSort eliminates a SortExec under SortPreservingMergeExec,
564+
/// a BufferExec is inserted to replace SortExec's buffering role. This
565+
/// prevents I/O stalls by allowing the scan to run ahead of the merge.
566+
///
567+
/// This uses strictly less memory than the SortExec it replaces (which
568+
/// buffers the entire partition). The buffer respects the global memory
569+
/// pool limit. Setting this to a large value is safe — actual memory
570+
/// usage is bounded by partition size and global memory limits.
571+
pub sort_pushdown_buffer_capacity: usize, default = 1024 * 1024 * 1024
572+
560573
/// Maximum size in bytes for individual spill files before rotating to a new file.
561574
///
562575
/// When operators spill data to disk (e.g., RepartitionExec), they write

datafusion/physical-optimizer/src/pushdown_sort.rs

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -65,20 +65,6 @@ use datafusion_physical_plan::sorts::sort::SortExec;
6565
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
6666
use std::sync::Arc;
6767

68-
/// Per-partition buffer capacity (in bytes) inserted between SPM and
69-
/// DataSourceExec when sort elimination removes the buffering SortExec.
70-
///
71-
/// SortExec buffers all input data in memory (potentially GB per partition)
72-
/// before outputting sorted results. When we eliminate SortExec, SPM reads
73-
/// directly from I/O-bound sources. BufferExec compensates with bounded
74-
/// buffering, allowing I/O to pipeline with merge computation.
75-
///
76-
/// This is strictly less memory than the SortExec it replaces, and only
77-
/// inserted when PushdownSort eliminates a SortExec — no impact on other
78-
/// query plans. BufferExec also integrates with MemoryPool, so it respects
79-
/// the global memory limit and won't cause OOM.
80-
const BUFFER_CAPACITY_AFTER_SORT_ELIMINATION: usize = 64 * 1024 * 1024; // 64 MB
81-
8268
/// A PhysicalOptimizerRule that attempts to push down sort requirements to data sources.
8369
///
8470
/// See module-level documentation for details.
@@ -102,6 +88,8 @@ impl PhysicalOptimizerRule for PushdownSort {
10288
return Ok(plan);
10389
}
10490

91+
let buffer_capacity = config.execution.sort_pushdown_buffer_capacity;
92+
10593
// Use transform_down to find and optimize all SortExec nodes (including nested ones)
10694
// Also handles SPM → SortExec pattern to insert BufferExec when sort is eliminated
10795
plan.transform_down(|plan: Arc<dyn ExecutionPlan>| {
@@ -124,10 +112,8 @@ impl PhysicalOptimizerRule for PushdownSort {
124112
// Insert BufferExec to replace SortExec's buffering role.
125113
// SortExec buffered all data in memory; BufferExec provides
126114
// bounded buffering so SPM doesn't stall on I/O.
127-
let buffered: Arc<dyn ExecutionPlan> = Arc::new(BufferExec::new(
128-
inner,
129-
BUFFER_CAPACITY_AFTER_SORT_ELIMINATION,
130-
));
115+
let buffered: Arc<dyn ExecutionPlan> =
116+
Arc::new(BufferExec::new(inner, buffer_capacity));
131117
let new_spm =
132118
SortPreservingMergeExec::new(spm.expr().clone(), buffered)
133119
.with_fetch(spm.fetch());

datafusion/sqllogictest/test_files/sort_pushdown.slt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2221,7 +2221,7 @@ logical_plan
22212221
02)--TableScan: tg_buffer projection=[id, value]
22222222
physical_plan
22232223
01)SortPreservingMergeExec: [id@0 ASC NULLS LAST]
2224-
02)--BufferExec: capacity=67108864
2224+
02)--BufferExec: capacity=1073741824
22252225
03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/a_high.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/c_low.parquet]]}, projection=[id, value], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
22262226

22272227
# Verify correctness
@@ -2248,7 +2248,7 @@ logical_plan
22482248
02)--TableScan: tg_buffer projection=[id, value]
22492249
physical_plan
22502250
01)SortPreservingMergeExec: [id@0 ASC NULLS LAST], fetch=3
2251-
02)--BufferExec: capacity=67108864
2251+
02)--BufferExec: capacity=1073741824
22522252
03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/a_high.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/c_low.parquet]]}, projection=[id, value], limit=3, output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
22532253

22542254
query II

0 commit comments

Comments
 (0)