From 3e3487727a4d87f8c9572b5bd8fad371cb1d9457 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Tue, 7 Apr 2026 10:04:17 +0800 Subject: [PATCH 1/4] feat: make sort pushdown BufferExec capacity configurable, default 1GB MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace hardcoded 64MB BUFFER_CAPACITY_AFTER_SORT_ELIMINATION with configurable `datafusion.execution.sort_pushdown_buffer_capacity`. Default 1GB — large enough to hold wide-row data without I/O stalls. This is strictly less memory than the SortExec it replaces, and actual usage is bounded by partition size and global memory pool limits. Closes #21417 --- datafusion/common/src/config.rs | 13 +++++++++++ .../physical-optimizer/src/pushdown_sort.rs | 22 ++++--------------- .../sqllogictest/test_files/sort_pushdown.slt | 4 ++-- 3 files changed, 19 insertions(+), 20 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 38b18c06fe930..53f2501d60752 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -557,6 +557,19 @@ config_namespace! { /// batches and merged. pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024 + /// Maximum buffer capacity (in bytes) per partition for BufferExec + /// inserted during sort pushdown optimization. + /// + /// When PushdownSort eliminates a SortExec under SortPreservingMergeExec, + /// a BufferExec is inserted to replace SortExec's buffering role. This + /// prevents I/O stalls by allowing the scan to run ahead of the merge. + /// + /// This uses strictly less memory than the SortExec it replaces (which + /// buffers the entire partition). The buffer respects the global memory + /// pool limit. Setting this to a large value is safe — actual memory + /// usage is bounded by partition size and global memory limits. + pub sort_pushdown_buffer_capacity: usize, default = 1024 * 1024 * 1024 + /// Maximum size in bytes for individual spill files before rotating to a new file. /// /// When operators spill data to disk (e.g., RepartitionExec), they write diff --git a/datafusion/physical-optimizer/src/pushdown_sort.rs b/datafusion/physical-optimizer/src/pushdown_sort.rs index 308e91d0df145..9da8ce60e1ba5 100644 --- a/datafusion/physical-optimizer/src/pushdown_sort.rs +++ b/datafusion/physical-optimizer/src/pushdown_sort.rs @@ -65,20 +65,6 @@ use datafusion_physical_plan::sorts::sort::SortExec; use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use std::sync::Arc; -/// Per-partition buffer capacity (in bytes) inserted between SPM and -/// DataSourceExec when sort elimination removes the buffering SortExec. -/// -/// SortExec buffers all input data in memory (potentially GB per partition) -/// before outputting sorted results. When we eliminate SortExec, SPM reads -/// directly from I/O-bound sources. BufferExec compensates with bounded -/// buffering, allowing I/O to pipeline with merge computation. -/// -/// This is strictly less memory than the SortExec it replaces, and only -/// inserted when PushdownSort eliminates a SortExec — no impact on other -/// query plans. BufferExec also integrates with MemoryPool, so it respects -/// the global memory limit and won't cause OOM. -const BUFFER_CAPACITY_AFTER_SORT_ELIMINATION: usize = 64 * 1024 * 1024; // 64 MB - /// A PhysicalOptimizerRule that attempts to push down sort requirements to data sources. /// /// See module-level documentation for details. @@ -102,6 +88,8 @@ impl PhysicalOptimizerRule for PushdownSort { return Ok(plan); } + let buffer_capacity = config.execution.sort_pushdown_buffer_capacity; + // Use transform_down to find and optimize all SortExec nodes (including nested ones) // Also handles SPM → SortExec pattern to insert BufferExec when sort is eliminated plan.transform_down(|plan: Arc| { @@ -124,10 +112,8 @@ impl PhysicalOptimizerRule for PushdownSort { // Insert BufferExec to replace SortExec's buffering role. // SortExec buffered all data in memory; BufferExec provides // bounded buffering so SPM doesn't stall on I/O. - let buffered: Arc = Arc::new(BufferExec::new( - inner, - BUFFER_CAPACITY_AFTER_SORT_ELIMINATION, - )); + let buffered: Arc = + Arc::new(BufferExec::new(inner, buffer_capacity)); let new_spm = SortPreservingMergeExec::new(spm.expr().clone(), buffered) .with_fetch(spm.fetch()); diff --git a/datafusion/sqllogictest/test_files/sort_pushdown.slt b/datafusion/sqllogictest/test_files/sort_pushdown.slt index e9d4e221e1ddb..b6c75f3977010 100644 --- a/datafusion/sqllogictest/test_files/sort_pushdown.slt +++ b/datafusion/sqllogictest/test_files/sort_pushdown.slt @@ -2221,7 +2221,7 @@ logical_plan 02)--TableScan: tg_buffer projection=[id, value] physical_plan 01)SortPreservingMergeExec: [id@0 ASC NULLS LAST] -02)--BufferExec: capacity=67108864 +02)--BufferExec: capacity=1073741824 03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/a_high.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/c_low.parquet]]}, projection=[id, value], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet # Verify correctness @@ -2248,7 +2248,7 @@ logical_plan 02)--TableScan: tg_buffer projection=[id, value] physical_plan 01)SortPreservingMergeExec: [id@0 ASC NULLS LAST], fetch=3 -02)--BufferExec: capacity=67108864 +02)--BufferExec: capacity=1073741824 03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/a_high.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/c_low.parquet]]}, projection=[id, value], limit=3, output_ordering=[id@0 ASC NULLS LAST], file_type=parquet query II From 130acd475b1dab0d05b42f052f29d2df4fccd2cf Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Tue, 7 Apr 2026 12:16:00 +0800 Subject: [PATCH 2/4] docs: update configs.md for sort_pushdown_buffer_capacity --- docs/source/user-guide/configs.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index be42f4a0becb8..b88727e7e3b52 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -118,6 +118,7 @@ The following configuration settings are available: | datafusion.execution.spill_compression | uncompressed | Sets the compression codec used when spilling data to disk. Since datafusion writes spill files using the Arrow IPC Stream format, only codecs supported by the Arrow IPC Stream Writer are allowed. Valid values are: uncompressed, lz4_frame, zstd. Note: lz4_frame offers faster (de)compression, but typically results in larger spill files. In contrast, zstd achieves higher compression ratios at the cost of slower (de)compression speed. | | datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). | | datafusion.execution.sort_in_place_threshold_bytes | 1048576 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. | +| datafusion.execution.sort_pushdown_buffer_capacity | 1073741824 | Maximum buffer capacity (in bytes) per partition for BufferExec inserted during sort pushdown optimization. When PushdownSort eliminates a SortExec under SortPreservingMergeExec, a BufferExec is inserted to replace SortExec's buffering role. This prevents I/O stalls by allowing the scan to run ahead of the merge. This uses strictly less memory than the SortExec it replaces (which buffers the entire partition). The buffer respects the global memory pool limit. Setting this to a large value is safe — actual memory usage is bounded by partition size and global memory limits. | | datafusion.execution.max_spill_file_size_bytes | 134217728 | Maximum size in bytes for individual spill files before rotating to a new file. When operators spill data to disk (e.g., RepartitionExec), they write multiple batches to the same file until this size limit is reached, then rotate to a new file. This reduces syscall overhead compared to one-file-per-batch while preventing files from growing too large. A larger value reduces file creation overhead but may hold more disk space. A smaller value creates more files but allows finer-grained space reclamation as files can be deleted once fully consumed. Now only `RepartitionExec` supports this spill file rotation feature, other spilling operators may create spill files larger than the limit. Default: 128 MB | | datafusion.execution.meta_fetch_concurrency | 32 | Number of files to read in parallel when inferring schema and statistics | | datafusion.execution.minimum_parallel_output_files | 4 | Guarantees a minimum level of output files running in parallel. RecordBatches will be distributed in round robin fashion to each parallel writer. Each writer is closed and a new file opened once soft_max_rows_per_output_file is reached. | From 626c9ff2e84b4017656c5e5fa0683f55629edda8 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Tue, 7 Apr 2026 14:32:49 +0800 Subject: [PATCH 3/4] ci: trigger CI From f944ac69324ea93a9614ad453aa982564bc3ff4e Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Tue, 7 Apr 2026 15:38:11 +0800 Subject: [PATCH 4/4] fix: add sort_pushdown_buffer_capacity to information_schema SLT --- datafusion/sqllogictest/test_files/information_schema.slt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 77ae1d335fb8d..0b34f381cbc59 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -270,6 +270,7 @@ datafusion.execution.skip_partial_aggregation_probe_rows_threshold 100000 datafusion.execution.skip_physical_aggregate_schema_check false datafusion.execution.soft_max_rows_per_output_file 50000000 datafusion.execution.sort_in_place_threshold_bytes 1048576 +datafusion.execution.sort_pushdown_buffer_capacity 1073741824 datafusion.execution.sort_spill_reservation_bytes 10485760 datafusion.execution.spill_compression uncompressed datafusion.execution.split_file_groups_by_statistics false @@ -413,6 +414,7 @@ datafusion.execution.skip_partial_aggregation_probe_rows_threshold 100000 Number datafusion.execution.skip_physical_aggregate_schema_check false When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step. datafusion.execution.soft_max_rows_per_output_file 50000000 Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max datafusion.execution.sort_in_place_threshold_bytes 1048576 When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. +datafusion.execution.sort_pushdown_buffer_capacity 1073741824 Maximum buffer capacity (in bytes) per partition for BufferExec inserted during sort pushdown optimization. When PushdownSort eliminates a SortExec under SortPreservingMergeExec, a BufferExec is inserted to replace SortExec's buffering role. This prevents I/O stalls by allowing the scan to run ahead of the merge. This uses strictly less memory than the SortExec it replaces (which buffers the entire partition). The buffer respects the global memory pool limit. Setting this to a large value is safe — actual memory usage is bounded by partition size and global memory limits. datafusion.execution.sort_spill_reservation_bytes 10485760 Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). datafusion.execution.spill_compression uncompressed Sets the compression codec used when spilling data to disk. Since datafusion writes spill files using the Arrow IPC Stream format, only codecs supported by the Arrow IPC Stream Writer are allowed. Valid values are: uncompressed, lz4_frame, zstd. Note: lz4_frame offers faster (de)compression, but typically results in larger spill files. In contrast, zstd achieves higher compression ratios at the cost of slower (de)compression speed. datafusion.execution.split_file_groups_by_statistics false Attempt to eliminate sorts by packing & sorting files with non-overlapping statistics into the same file groups. Currently experimental