Skip to content

Commit f34f5c8

Browse files
JanKaulclaude
andcommitted
ExternalSorter: add pool-relative pre-merge spill trigger (0.66)
The self-relative trigger (`sorter.used() > pool_limit / num_workers`) catches early-finisher partitions but misses the common Q18 pattern: all partitions transition with their own footprint just below the fair share (e.g. 470 MiB each on 8 GiB / 16 workers where self_share is 512 MiB), yet the pool itself is at 7+ GiB because other consumers are loaded. Self-relative never fires; the merge enters with no headroom. Port the pool-relative trigger from the original `reclaimer-sortmerge-hashagg` branch with a 0.66 threshold (vs. the old 0.33). On an 8 GiB pool that's ~5.3 GiB — high enough that we don't spill spuriously on light queries, but well below the "merge-can't-grow" zone Q18 hits at ~7 GiB. Now: `sorter.used() > self_share || pool_used > pool_threshold`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 74726fd commit f34f5c8

1 file changed

Lines changed: 34 additions & 10 deletions

File tree

  • datafusion/physical-plan/src/sorts

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,15 @@ use datafusion_physical_expr::expressions::{DynamicFilterPhysicalExpr, lit};
7777
use futures::{StreamExt, TryStreamExt};
7878
use log::{debug, trace};
7979

80+
/// Fraction of the memory pool above which a sort partition will
81+
/// proactively spill its in-memory batches before entering the
82+
/// non-reclaimable merge phase, regardless of how much that one
83+
/// partition is holding. Complements the self-relative check
84+
/// (`sorter.used() > pool_limit / num_workers`): catches the case
85+
/// where this partition is small but the pool is hot — the merge
86+
/// would still benefit from any bytes we can release now.
87+
const PRE_MERGE_SPILL_POOL_THRESHOLD: f64 = 0.66;
88+
8089
struct ExternalSorterMetrics {
8190
/// metrics
8291
baseline: BaselineMetrics,
@@ -1595,15 +1604,26 @@ impl ExecutionPlan for SortExec {
15951604
handle_reclaim_request(&mut sorter, resp_tx).await?;
15961605
}
15971606
// Proactive spill before entering the
1598-
// non-reclaimable merge phase. If this
1599-
// partition is still holding more than its
1600-
// fair share (`pool_limit / num_workers`),
1601-
// dump the in-memory batches so siblings can
1602-
// use the freed bytes. After this, `sort()`
1603-
// takes the multi-level-merge path with a
1604-
// tiny resident footprint instead of
1605-
// becoming a "fat monument" that holds memory
1606-
// for the rest of the query.
1607+
// non-reclaimable merge phase. Two signals,
1608+
// either fires:
1609+
//
1610+
// - Self-relative: this partition alone holds
1611+
// more than its fair per-worker share
1612+
// (`limit / num_workers`). Catches early
1613+
// finishers whose own footprint is the
1614+
// problem even when `pool.reserved()`
1615+
// hasn't yet risen.
1616+
//
1617+
// - Pool-relative: total pool usage is past
1618+
// the `PRE_MERGE_SPILL_POOL_THRESHOLD`
1619+
// fraction. Catches the "I'm small but the
1620+
// pool is hot" case — small partitions
1621+
// transition with the pool already
1622+
// saturated by upstream consumers, and the
1623+
// upcoming merge would have no headroom
1624+
// to grow into. Spilling now releases the
1625+
// batches before the merge commits to its
1626+
// non-reclaimable footprint.
16071627
if !sorter.in_mem_batches.is_empty()
16081628
&& let MemoryLimit::Finite(limit) =
16091629
sorter.runtime.memory_pool.memory_limit()
@@ -1613,7 +1633,11 @@ impl ExecutionPlan for SortExec {
16131633
.num_workers()
16141634
.max(1);
16151635
let self_share = limit / num_workers;
1616-
if sorter.used() > self_share {
1636+
let pool_threshold =
1637+
(limit as f64 * PRE_MERGE_SPILL_POOL_THRESHOLD) as usize;
1638+
let pool_used = sorter.runtime.memory_pool.reserved();
1639+
if sorter.used() > self_share || pool_used > pool_threshold
1640+
{
16171641
sorter.sort_and_spill_in_mem_batches().await?;
16181642
}
16191643
}

0 commit comments

Comments
 (0)