Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 50 additions & 4 deletions datafusion/physical-plan/src/sorts/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,24 @@ pub struct BatchBuilder {
/// Maintain a list of [`RecordBatch`] and their corresponding stream
batches: Vec<(usize, RecordBatch)>,

/// Accounts for memory used by buffered batches
/// Accounts for memory used by buffered batches.
///
/// May include pre-reserved bytes (from `sort_spill_reservation_bytes`)
/// that were transferred via [`MemoryReservation::take()`] to prevent
/// starvation when concurrent sort partitions compete for pool memory.
reservation: MemoryReservation,

/// Tracks the actual memory used by buffered batches (not including
/// pre-reserved bytes). This allows [`Self::push_batch`] to skip pool
/// allocation requests when the pre-reserved bytes cover the batch.
batches_mem_used: usize,

/// The initial reservation size at construction time. When the reservation
/// is pre-loaded with `sort_spill_reservation_bytes` (via `take()`), this
/// records that amount so we never shrink below it, maintaining the
/// anti-starvation guarantee throughout the merge.
initial_reservation: usize,

/// The current [`BatchCursor`] for each stream
cursors: Vec<BatchCursor>,

Expand All @@ -59,19 +74,26 @@ impl BatchBuilder {
batch_size: usize,
reservation: MemoryReservation,
) -> Self {
let initial_reservation = reservation.size();
Self {
schema,
batches: Vec::with_capacity(stream_count * 2),
cursors: vec![BatchCursor::default(); stream_count],
indices: Vec::with_capacity(batch_size),
reservation,
batches_mem_used: 0,
initial_reservation,
}
}

/// Append a new batch in `stream_idx`
pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) -> Result<()> {
self.reservation
.try_grow(get_record_batch_memory_size(&batch))?;
let size = get_record_batch_memory_size(&batch);
self.batches_mem_used += size;
// Only request additional memory from the pool when actual batch
// usage exceeds the current reservation (which may include
// pre-reserved bytes from sort_spill_reservation_bytes).
try_grow_reservation_to_at_least(&mut self.reservation, self.batches_mem_used)?;
let batch_idx = self.batches.len();
self.batches.push((stream_idx, batch));
self.cursors[stream_idx] = BatchCursor {
Expand Down Expand Up @@ -143,14 +165,38 @@ impl BatchBuilder {
stream_cursor.batch_idx = retained;
retained += 1;
} else {
self.reservation.shrink(get_record_batch_memory_size(batch));
self.batches_mem_used -= get_record_batch_memory_size(batch);
}
retain
});

// Release excess memory back to the pool, but never shrink below
// initial_reservation to maintain the anti-starvation guarantee
// for the merge phase.
let target = self.batches_mem_used.max(self.initial_reservation);
if self.reservation.size() > target {
self.reservation.shrink(self.reservation.size() - target);
}

Ok(Some(RecordBatch::try_new(
Arc::clone(&self.schema),
columns,
)?))
}
}

/// Try to grow `reservation` so it covers at least `needed` bytes.
///
/// When a reservation has been pre-loaded with bytes (e.g. via
/// [`MemoryReservation::take()`]), this avoids redundant pool
/// allocations: if the reservation already covers `needed`, this is
/// a no-op; otherwise only the deficit is requested from the pool.
pub(crate) fn try_grow_reservation_to_at_least(
reservation: &mut MemoryReservation,
needed: usize,
) -> Result<()> {
if needed > reservation.size() {
reservation.try_grow(needed - reservation.size())?;
}
Ok(())
}
48 changes: 36 additions & 12 deletions datafusion/physical-plan/src/sorts/multi_level_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use arrow::datatypes::SchemaRef;
use datafusion_common::Result;
use datafusion_execution::memory_pool::MemoryReservation;

use crate::sorts::builder::try_grow_reservation_to_at_least;
use crate::sorts::sort::get_reserved_bytes_for_record_batch_size;
use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
use crate::stream::RecordBatchStreamAdapter;
Expand Down Expand Up @@ -253,7 +254,12 @@ impl MultiLevelMergeBuilder {

// Need to merge multiple streams
(_, _) => {
let mut memory_reservation = self.reservation.new_empty();
// Transfer any pre-reserved bytes (from sort_spill_reservation_bytes)
// to the merge memory reservation. This prevents starvation when
// concurrent sort partitions compete for pool memory: the pre-reserved
// bytes cover spill file buffer reservations without additional pool
// allocation.
let mut memory_reservation = self.reservation.take();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like merge_sorted_runs_within_mem_limit() is transferring self.reservation into memory_reservation before it actually knows whether any spill files will be merged. If the builder already has enough in-memory streams to satisfy minimum_number_of_required_streams, but the first spill file still cannot fit, then get_sorted_spill_files_to_merge() could legitimately return zero spill files.

In that situation, is_only_merging_memory_streams would become true, but memory_reservation would still contain the bytes taken from self.reservation. That seems like it could trigger the assertion at lines 297–302 even though falling back to an all-in-memory merge is valid.

My understanding is that this creates a behavior regression in the mixed {sorted_streams + sorted_spill_files} path. Should the reservation transfer instead happen only after at least one spill file is selected, or should the unused reservation be returned to the all-in-memory merge path rather than being asserted away? 🤔

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


// Don't account for existing streams memory
// as we are not holding the memory for them
Expand All @@ -269,6 +275,15 @@ impl MultiLevelMergeBuilder {

let is_only_merging_memory_streams = sorted_spill_files.is_empty();

// If no spill files were selected (e.g. all too large for
// available memory but enough in-memory streams exist),
// return the pre-reserved bytes to self.reservation so
// create_new_merge_sort can transfer them to the merge
// stream's BatchBuilder.
if is_only_merging_memory_streams {
mem::swap(&mut self.reservation, &mut memory_reservation);
}

for spill in sorted_spill_files {
let stream = self
.spill_manager
Expand Down Expand Up @@ -337,8 +352,10 @@ impl MultiLevelMergeBuilder {
builder = builder.with_bypass_mempool();
} else {
// If we are only merging in-memory streams, we need to use the memory reservation
// because we don't know the maximum size of the batches in the streams
builder = builder.with_reservation(self.reservation.new_empty());
// because we don't know the maximum size of the batches in the streams.
// Use take() to transfer any pre-reserved bytes so the merge can use them
// as its initial budget without additional pool allocation.
builder = builder.with_reservation(self.reservation.take());
}

builder.build()
Expand All @@ -356,17 +373,24 @@ impl MultiLevelMergeBuilder {
) -> Result<(Vec<SortedSpillFile>, usize)> {
assert_ne!(buffer_len, 0, "Buffer length must be greater than 0");
let mut number_of_spills_to_read_for_current_phase = 0;
// Track total memory needed for spill file buffers. When the
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like this whole method is ripe for a refactor, and introducing a memory floor is making it even more complex. Is there a way to incorporate the memory floor, but also simplify this a little bit

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a try_grow_reservation_to_at_least help to reduce complexity

// reservation has pre-reserved bytes (from sort_spill_reservation_bytes),
// those bytes cover the first N spill files without additional pool
// allocation, preventing starvation under memory pressure.
let mut total_needed: usize = 0;

for spill in &self.sorted_spill_files {
// For memory pools that are not shared this is good, for other this is not
// and there should be some upper limit to memory reservation so we won't starve the system
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this comment still applies: if you have multiple partitions running, one partition will still be able to starve the others

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I'll keep the comment

match reservation.try_grow(
get_reserved_bytes_for_record_batch_size(
spill.max_record_batch_memory,
// Size will be the same as the sliced size, bc it is a spilled batch.
spill.max_record_batch_memory,
) * buffer_len,
) {
let per_spill = get_reserved_bytes_for_record_batch_size(
spill.max_record_batch_memory,
// Size will be the same as the sliced size, bc it is a spilled batch.
spill.max_record_batch_memory,
) * buffer_len;
total_needed += per_spill;

// For memory pools that are not shared this is good, for other
// this is not and there should be some upper limit to memory
// reservation so we won't starve the system.
match try_grow_reservation_to_at_least(reservation, total_needed) {
Ok(_) => {
number_of_spills_to_read_for_current_phase += 1;
}
Expand Down
159 changes: 153 additions & 6 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,11 +342,6 @@ impl ExternalSorter {
/// 2. A combined streaming merge incorporating both in-memory
/// batches and data from spill files on disk.
async fn sort(&mut self) -> Result<SendableRecordBatchStream> {
// Release the memory reserved for merge back to the pool so
// there is some left when `in_mem_sort_stream` requests an
// allocation.
self.merge_reservation.free();

if self.spilled_before() {
// Sort `in_mem_batches` and spill it first. If there are many
// `in_mem_batches` and the memory limit is almost reached, merging
Expand All @@ -355,6 +350,13 @@ impl ExternalSorter {
self.sort_and_spill_in_mem_batches().await?;
}

// Transfer the pre-reserved merge memory to the streaming merge
// using `take()` instead of `new_empty()`. This ensures the merge
// stream starts with `sort_spill_reservation_bytes` already
// allocated, preventing starvation when concurrent sort partitions
// compete for pool memory. `take()` moves the bytes atomically
// without releasing them back to the pool, so other partitions
// cannot race to consume the freed memory.
Comment on lines +353 to +359
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pre reserved merge memory should be used as part of the sort merge stream.
I mean that if x pre reserved merge memory was reserved the sort merge stream should know about that so it wont think it starting from 0, otherwise this just reserve for unaccounted memory

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Good point — just take() alone wouldn't be enough if the merge stream doesn't know about the pre-reserved bytes.

The PR does address this in the other changed files:

  • In builder.rs: BatchBuilder now tracks batches_mem_used separately and only calls try_grow() when actual usage exceeds the current reservation size. It also records initial_reservation so
    it never shrinks below that during build_output. This way the pre-reserved bytes are used as the initial budget rather than requesting from the pool on top of them.
  • In multi_level_merge.rs: get_sorted_spill_files_to_merge now tracks total_needed and only requests additional pool memory when total_needed > reservation.size(), so spill file buffers
    covered by the pre-reserved bytes don't trigger extra pool allocations.

So the merge stream is aware of the pre-reserved bytes and uses them as its starting budget — it doesn't think it's starting from 0.

StreamingMergeBuilder::new()
.with_sorted_spill_files(std::mem::take(&mut self.finished_spill_files))
.with_spill_manager(self.spill_manager.clone())
Expand All @@ -363,9 +365,14 @@ impl ExternalSorter {
.with_metrics(self.metrics.baseline.clone())
.with_batch_size(self.batch_size)
.with_fetch(None)
.with_reservation(self.merge_reservation.new_empty())
.with_reservation(self.merge_reservation.take())
.build()
} else {
// Release the memory reserved for merge back to the pool so
// there is some left when `in_mem_sort_stream` requests an
// allocation. Only needed for the non-spill path; the spill
// path transfers the reservation to the merge stream instead.
self.merge_reservation.free();
self.in_mem_sort_stream(self.metrics.baseline.clone())
}
}
Expand All @@ -375,6 +382,12 @@ impl ExternalSorter {
self.reservation.size()
}

/// How much memory is reserved for the merge phase?
#[cfg(test)]
fn merge_reservation_size(&self) -> usize {
self.merge_reservation.size()
}

/// How many bytes have been spilled to disk?
fn spilled_bytes(&self) -> usize {
self.metrics.spill_metrics.spilled_bytes.value()
Expand Down Expand Up @@ -2716,4 +2729,138 @@ mod tests {

Ok(())
}

/// Verifies that `ExternalSorter::sort()` transfers the pre-reserved
/// merge bytes to the merge stream via `take()`, rather than leaving
/// them in the sorter (via `new_empty()`).
///
/// 1. Create a sorter with a tight memory pool and insert enough data
/// to force spilling
/// 2. Verify `merge_reservation` holds the pre-reserved bytes before sort
/// 3. Call `sort()` to get the merge stream
/// 4. Verify `merge_reservation` is now 0 (bytes transferred to merge stream)
/// 5. Simulate contention: a competing consumer grabs all available pool memory
/// 6. Verify the merge stream still works (it uses its pre-reserved bytes
/// as initial budget, not requesting from pool starting at 0)
///
/// With `new_empty()` (before fix), step 4 fails: `merge_reservation`
/// still holds the bytes, the merge stream starts with 0 budget, and
/// those bytes become unaccounted-for reserved memory that nobody uses.
#[tokio::test]
async fn test_sort_merge_reservation_transferred_not_freed() -> Result<()> {
use datafusion_execution::memory_pool::{
GreedyMemoryPool, MemoryConsumer, MemoryPool,
};
use futures::TryStreamExt;

let sort_spill_reservation_bytes: usize = 10 * 1024; // 10 KB

// Pool: merge reservation (10KB) + enough room for sort to work.
// The room must accommodate batch data accumulation before spilling.
let sort_working_memory: usize = 40 * 1024; // 40 KB for sort operations
let pool_size = sort_spill_reservation_bytes + sort_working_memory;
let pool: Arc<dyn MemoryPool> = Arc::new(GreedyMemoryPool::new(pool_size));

let runtime = RuntimeEnvBuilder::new()
.with_memory_pool(Arc::clone(&pool))
.build_arc()?;

let metrics_set = ExecutionPlanMetricsSet::new();
let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)]));

let mut sorter = ExternalSorter::new(
0,
Arc::clone(&schema),
[PhysicalSortExpr::new_default(Arc::new(Column::new("x", 0)))].into(),
128, // batch_size
sort_spill_reservation_bytes,
usize::MAX, // sort_in_place_threshold_bytes (high to avoid concat path)
SpillCompression::Uncompressed,
&metrics_set,
Arc::clone(&runtime),
)?;

// Insert enough data to force spilling.
let num_batches = 200;
for i in 0..num_batches {
let values: Vec<i32> = ((i * 100)..((i + 1) * 100)).rev().collect();
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![Arc::new(Int32Array::from(values))],
)?;
sorter.insert_batch(batch).await?;
}

assert!(
sorter.spilled_before(),
"Test requires spilling to exercise the merge path"
);

// Before sort(), merge_reservation holds sort_spill_reservation_bytes.
assert!(
sorter.merge_reservation_size() >= sort_spill_reservation_bytes,
"merge_reservation should hold the pre-reserved bytes before sort()"
);

// Call sort() to get the merge stream. With the fix (take()),
// the pre-reserved merge bytes are transferred to the merge
// stream. Without the fix (free() + new_empty()), the bytes
// are released back to the pool and the merge stream starts
// with 0 bytes.
let merge_stream = sorter.sort().await?;

// THE KEY ASSERTION: after sort(), merge_reservation must be 0.
// This proves take() transferred the bytes to the merge stream,
// rather than them being freed back to the pool where other
// partitions could steal them.
assert_eq!(
sorter.merge_reservation_size(),
0,
"After sort(), merge_reservation should be 0 (bytes transferred \
to merge stream via take()). If non-zero, the bytes are still \
held by the sorter and will be freed on drop, allowing other \
partitions to steal them."
);

// Drop the sorter to free its reservations back to the pool.
drop(sorter);

// Simulate contention: another partition grabs ALL available
// pool memory. If the merge stream didn't receive the
// pre-reserved bytes via take(), it will fail when it tries
// to allocate memory for reading spill files.
let contender = MemoryConsumer::new("CompetingPartition").register(&pool);
let available = pool_size.saturating_sub(pool.reserved());
if available > 0 {
contender.try_grow(available).unwrap();
}

// The merge stream must still produce correct results despite
// the pool being fully consumed by the contender. This only
// works if sort() transferred the pre-reserved bytes to the
// merge stream (via take()) rather than freeing them.
let batches: Vec<RecordBatch> = merge_stream.try_collect().await?;
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(
total_rows,
(num_batches * 100) as usize,
"Merge stream should produce all rows even under memory contention"
);

// Verify data is sorted
let merged = concat_batches(&schema, &batches)?;
let col = merged.column(0).as_primitive::<Int32Type>();
for i in 1..col.len() {
assert!(
col.value(i - 1) <= col.value(i),
"Output should be sorted, but found {} > {} at index {}",
col.value(i - 1),
col.value(i),
i
);
}

drop(contender);
Ok(())
}
}
Loading