Skip to content

Commit 9772af8

Browse files
committed
fix deadlock
1 parent 715c358 commit 9772af8

1 file changed

Lines changed: 15 additions & 1 deletion

File tree

  • datafusion/physical-plan/src/sorts

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -519,7 +519,11 @@ impl ExternalSorter {
519519
while let Some(batch) = sorted_stream.next().await {
520520
let batch = batch?;
521521
let sorted_size = get_reserved_bytes_for_record_batch(&batch)?;
522-
if self.reservation.try_grow_async(sorted_size).await.is_err() {
522+
// Sync `try_grow`, not `try_grow_async`: we are already in the
523+
// spill path (freeing memory). A recursive reclaim here can
524+
// close a cycle between two sorters that are each waiting on
525+
// the other's spill to complete.
526+
if self.reservation.try_grow(sorted_size).is_err() {
523527
// Although the reservation is not enough, the batch is
524528
// already in memory, so it's okay to combine it with previously
525529
// sorted batches, and spill together.
@@ -1310,6 +1314,16 @@ impl ExecutionPlan for SortExec {
13101314
tokio::select! {
13111315
biased;
13121316
Some(resp_tx) = reclaim_rx.recv() => {
1317+
// A reclaim can be dequeued just after a
1318+
// prior spill drained `in_mem_batches`
1319+
// (sibling sent during the spill's awaits;
1320+
// pool's zero-byte filter can transiently
1321+
// miss us via split reservations). Nothing
1322+
// local to free — reply 0 and keep going.
1323+
if sorter.in_mem_batches.is_empty() {
1324+
let _ = resp_tx.send(0);
1325+
continue;
1326+
}
13131327
let before = sorter.used();
13141328
sorter.sort_and_spill_in_mem_batches().await?;
13151329
let after = sorter.used();

0 commit comments

Comments
 (0)