Skip to content

Commit 74726fd

Browse files
JanKaulclaude
andcommitted
port reclaimer_state tri-state from old reclaimer-sortmerge-hashagg branch
The previous design relied solely on the close()+drain pattern to tell the pool "this reclaimer can no longer free memory" — but the pool only discovers that by invoking reclaim() and getting Ok(0) back. Snapshot, sort by size, in-flight CAS, await reclaim, observe 0 — all wasted per dead victim. Worse: dead reclaimers with high `reserved` sort to the top, so they're picked first; live ones at lower reserved get deprioritized. The dump's "reclaimable_sum" was inflated by these phantom entries. Port the tri-state mechanism from `reclaimer-sortmerge-hashagg`: - `reclaimer::reclaimer_state` module: AVAILABLE / IN_FLIGHT / DISABLED. - `MemoryReclaimer::reclaimer_state() -> Option<Arc<AtomicU8>>`: default None; implementations that participate in walks return Some(arc), shared with their owning operator. - `TrackedConsumer.reclaim_in_flight: AtomicBool` removed; replaced with `TrackedConsumer.reclaimer_state: Arc<AtomicU8>` sourced from the reclaimer (or pool-allocated to AVAILABLE for in-flight dedup only). - `ReclaimerStateGuard`: RAII CAS AVAILABLE -> IN_FLIGHT; Drop only restores AVAILABLE if state is still IN_FLIGHT (leaves DISABLED). - `walk_reclaimers`: filter out DISABLED at snapshot time (logged as `skipped_disabled`); per-candidate CAS via ReclaimerStateGuard. - `report_all_consumers`: three-way split — live-reclaimer (genuinely reclaimable), disabled-reclaimer (sticky-retired, dead bytes), and no-reclaimer (never reclaimable). Operators sticky-set DISABLED right BEFORE closing reclaim_rx, so any walk that snapshots after the store filters them at selection time: - `ExternalSorterReclaimer` (sort.rs): in the post-input-loop block, store DISABLED then reclaim_rx.close() then drain. - `HashAggReclaimer` (row_hash.rs): in the streaming-merge transition block where reclaim_rx is taken+closed. - `MaterializingSmjReclaimer` (sort_merge_join): in the Exhausted state handler where reclaim_rx is taken+closed. Q18-relevant effect: at OOM time, the consumer dump now correctly separates the ~7 GiB of "appears reclaimable but actually returns 0" bytes from the actually-reclaimable bytes, and walks no longer waste round-trips on dead victims. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent af69ecd commit 74726fd

7 files changed

Lines changed: 289 additions & 80 deletions

File tree

datafusion/execution/src/memory_pool/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ pub use datafusion_common::{
4040
human_readable_count, human_readable_duration, human_readable_size, units,
4141
};
4242
pub use pool::*;
43-
pub use reclaimer::MemoryReclaimer;
43+
pub use reclaimer::{MemoryReclaimer, reclaimer_state};
4444

4545
/// Tracks and potentially limits memory use across operators during execution.
4646
///

datafusion/execution/src/memory_pool/pool.rs

Lines changed: 124 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
use crate::memory_pool::{
1919
MemoryConsumer, MemoryLimit, MemoryPool, MemoryReclaimer, MemoryReservation,
20-
human_readable_size,
20+
human_readable_size, reclaimer_state,
2121
};
2222
use datafusion_common::HashMap;
2323
use datafusion_common::{DataFusionError, Result, resources_datafusion_err};
@@ -29,7 +29,7 @@ use std::pin::Pin;
2929
use std::sync::Arc;
3030
use std::{
3131
num::NonZeroUsize,
32-
sync::atomic::{AtomicBool, AtomicUsize, Ordering},
32+
sync::atomic::{AtomicU8, AtomicUsize, Ordering},
3333
};
3434
use tokio::sync::Semaphore;
3535

@@ -332,13 +332,55 @@ struct TrackedConsumer {
332332
/// Hook the pool walks on `try_grow_async` failure. `None` when
333333
/// the consumer registered without a reclaimer.
334334
reclaimer: Option<Arc<dyn MemoryReclaimer>>,
335-
/// Set while a reclaim walk is currently calling this consumer's
336-
/// [`MemoryReclaimer::reclaim`]. Concurrent walks check-and-swap
337-
/// before invoking the reclaimer so that at most one in-flight
338-
/// reclaim per consumer happens at a time, even when the pool
339-
/// allows multiple walks concurrently. Skipped candidates fall
340-
/// through to the next victim in their list.
341-
reclaim_in_flight: AtomicBool,
335+
/// Shared tri-state flag controlling whether this reclaimer is
336+
/// pickable by the walk. Values are defined in
337+
/// [`reclaimer_state`]. The pool flips `AVAILABLE` ↔ `IN_FLIGHT`
338+
/// for cross-walk dedup; the reclaimer's owner may sticky-set
339+
/// `DISABLED` once it can no longer free memory (e.g. on entering
340+
/// the non-reclaimable merge phase). If the reclaimer didn't
341+
/// expose its own flag via [`MemoryReclaimer::reclaimer_state`],
342+
/// the pool allocates a private `AVAILABLE`-initialised flag used
343+
/// only for in-flight dedup.
344+
reclaimer_state: Arc<AtomicU8>,
345+
}
346+
347+
/// RAII guard for the [`IN_FLIGHT`] slot of a [`TrackedConsumer`]'s
348+
/// `reclaimer_state` flag. On `Drop` it only restores `AVAILABLE` if
349+
/// the state is still `IN_FLIGHT` — leaves a sticky `DISABLED` alone
350+
/// so a victim can retire mid-walk without being re-armed.
351+
///
352+
/// [`IN_FLIGHT`]: reclaimer_state::IN_FLIGHT
353+
struct ReclaimerStateGuard {
354+
flag: Arc<AtomicU8>,
355+
}
356+
357+
impl Drop for ReclaimerStateGuard {
358+
fn drop(&mut self) {
359+
let _ = self.flag.compare_exchange(
360+
reclaimer_state::IN_FLIGHT,
361+
reclaimer_state::AVAILABLE,
362+
Ordering::AcqRel,
363+
Ordering::Relaxed,
364+
);
365+
}
366+
}
367+
368+
impl ReclaimerStateGuard {
369+
/// Try to transition the flag from `AVAILABLE` to `IN_FLIGHT`.
370+
/// Fails on contention (another walker won the CAS) or on a
371+
/// sticky `DISABLED`.
372+
fn try_acquire(flag: &Arc<AtomicU8>) -> Option<Self> {
373+
flag.compare_exchange(
374+
reclaimer_state::AVAILABLE,
375+
reclaimer_state::IN_FLIGHT,
376+
Ordering::AcqRel,
377+
Ordering::Relaxed,
378+
)
379+
.ok()
380+
.map(|_| Self {
381+
flag: Arc::clone(flag),
382+
})
383+
}
342384
}
343385

344386
impl TrackedConsumer {
@@ -587,7 +629,16 @@ impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
587629
reserved: Default::default(),
588630
peak: Default::default(),
589631
reclaimer: consumer.reclaimer().cloned(),
590-
reclaim_in_flight: AtomicBool::new(false),
632+
// Source the tri-state flag from the reclaimer if it
633+
// exposes one (so the operator can sticky-set
634+
// DISABLED). Otherwise allocate a private AVAILABLE
635+
// flag used only for in-flight dedup across walks.
636+
reclaimer_state: consumer
637+
.reclaimer()
638+
.and_then(|r| r.reclaimer_state())
639+
.unwrap_or_else(|| {
640+
Arc::new(AtomicU8::new(reclaimer_state::AVAILABLE))
641+
}),
591642
},
592643
);
593644

@@ -777,13 +828,22 @@ impl<I: MemoryPool> TrackConsumersPool<I> {
777828
min_reserved: usize,
778829
) -> usize {
779830
// Snapshot candidates while holding the read lock; release
780-
// before any await. We carry the `cid` and name along so we
781-
// can flip the per-victim in-flight flag without re-locking
782-
// and log meaningfully.
783-
let (mut candidates, total_eligible) = {
831+
// before any await. Carry the shared `reclaimer_state` flag
832+
// along so we can do the CAS without re-locking, and skip
833+
// sticky-DISABLED reclaimers at filter time instead of
834+
// wasting a `reclaim()` call that would return `Ok(0)`.
835+
let (mut candidates, total_eligible, skipped_disabled) = {
784836
let guard = self.tracked_consumers.read();
785837
let mut total = 0usize;
786-
let cands: Vec<(usize, String, usize, Arc<dyn MemoryReclaimer>, i32)> = guard
838+
let mut skipped_disabled = 0usize;
839+
let cands: Vec<(
840+
usize,
841+
String,
842+
usize,
843+
Arc<dyn MemoryReclaimer>,
844+
i32,
845+
Arc<AtomicU8>,
846+
)> = guard
787847
.iter()
788848
.filter_map(|(cid, tc)| {
789849
if Some(*cid) == exclude_consumer_id {
@@ -794,17 +854,30 @@ impl<I: MemoryPool> TrackConsumersPool<I> {
794854
if reserved <= min_reserved {
795855
return None;
796856
}
857+
// Skip sticky-DISABLED reclaimers (operator has
858+
// retired itself). IN_FLIGHT is also non-AVAILABLE
859+
// but we still record those at the per-candidate
860+
// CAS site below for the `skipped_in_flight`
861+
// counter — they may transition back to AVAILABLE
862+
// by the time we reach them in the iteration.
863+
if tc.reclaimer_state.load(Ordering::Acquire)
864+
== reclaimer_state::DISABLED
865+
{
866+
skipped_disabled += 1;
867+
return None;
868+
}
797869
total = total.saturating_add(reserved);
798870
Some((
799871
*cid,
800872
tc.name.clone(),
801873
reserved,
802874
Arc::clone(reclaimer),
803875
reclaimer.priority(),
876+
Arc::clone(&tc.reclaimer_state),
804877
))
805878
})
806879
.collect();
807-
(cands, total)
880+
(cands, total, skipped_disabled)
808881
};
809882
// Order: priority DESC, then reserved-size DESC. No cap —
810883
// walk every eligible candidate; the in-flight flag handles
@@ -813,43 +886,33 @@ impl<I: MemoryPool> TrackConsumersPool<I> {
813886

814887
debug!(
815888
"[reclaim-walk] excl={:?} min_reserved={} target={} candidates={} \
816-
eligible_bytes={} pool_reserved={}",
889+
eligible_bytes={} skipped_disabled={} pool_reserved={}",
817890
exclude_consumer_id,
818891
min_reserved,
819892
target,
820893
candidates.len(),
821894
total_eligible,
895+
skipped_disabled,
822896
self.inner.reserved(),
823897
);
824898

825899
let mut total_freed: usize = 0;
826900
let mut still_needed = target;
827901
let mut skipped_in_flight = 0usize;
828-
for (cid, name, _size, reclaimer, _prio) in candidates {
902+
for (cid, name, _size, reclaimer, _prio, flag) in candidates {
829903
if still_needed == 0 {
830904
break;
831905
}
832-
// Atomic check-and-set: if another walk is already
833-
// reclaiming this victim, skip and try the next one. We
834-
// re-fetch the entry by cid (held read lock briefly).
835-
let claimed = {
836-
let guard = self.tracked_consumers.read();
837-
guard
838-
.get(&cid)
839-
.is_some_and(|tc| !tc.reclaim_in_flight.swap(true, Ordering::AcqRel))
840-
};
841-
if !claimed {
906+
// CAS AVAILABLE → IN_FLIGHT. Fails if (a) another walker
907+
// got there first (in-flight) or (b) the owner sticky-set
908+
// DISABLED between snapshot and now. Either way, skip.
909+
// Guard's Drop restores AVAILABLE on scope exit (leaving
910+
// DISABLED alone).
911+
let Some(_g) = ReclaimerStateGuard::try_acquire(&flag) else {
842912
skipped_in_flight += 1;
843913
continue;
844-
}
914+
};
845915
let result = reclaimer.reclaim(still_needed).await;
846-
// Release the in-flight flag whether reclaim succeeded or
847-
// not. If the consumer was deregistered mid-walk
848-
// `get(&cid)` returns None; that's fine — the flag was on
849-
// a now-dropped value.
850-
if let Some(tc) = self.tracked_consumers.read().get(&cid) {
851-
tc.reclaim_in_flight.store(false, Ordering::Release);
852-
}
853916
match result {
854917
Ok(freed) => {
855918
debug!(
@@ -875,26 +938,33 @@ impl<I: MemoryPool> TrackConsumersPool<I> {
875938
}
876939

877940
/// Diagnostic: format ALL tracked consumers with non-zero current
878-
/// reservations, grouped by whether they have a reclaimer or not.
941+
/// reservations, split by reclaimer state so dumps distinguish
942+
/// truly-reclaimable bytes from phantom (sticky-DISABLED) ones.
879943
/// Useful at OOM time to see the full pool composition beyond
880944
/// what [`Self::report_top`] shows.
881945
pub fn report_all_consumers(&self) -> String {
882946
let guard = self.tracked_consumers.read();
883-
let mut with_reclaimer: Vec<(usize, String, usize, usize)> = Vec::new();
947+
let mut live: Vec<(usize, String, usize, usize)> = Vec::new();
948+
let mut disabled: Vec<(usize, String, usize, usize)> = Vec::new();
884949
let mut without_reclaimer: Vec<(usize, String, usize, usize)> = Vec::new();
885950
for (cid, tc) in guard.iter() {
886951
let reserved = tc.reserved();
887952
if reserved == 0 {
888953
continue;
889954
}
890955
let entry = (*cid, tc.name.clone(), reserved, tc.peak());
891-
if tc.reclaimer.is_some() {
892-
with_reclaimer.push(entry);
893-
} else {
956+
if tc.reclaimer.is_none() {
894957
without_reclaimer.push(entry);
958+
} else if tc.reclaimer_state.load(Ordering::Acquire)
959+
== reclaimer_state::DISABLED
960+
{
961+
disabled.push(entry);
962+
} else {
963+
live.push(entry);
895964
}
896965
}
897-
with_reclaimer.sort_by_key(|e| std::cmp::Reverse(e.2));
966+
live.sort_by_key(|e| std::cmp::Reverse(e.2));
967+
disabled.sort_by_key(|e| std::cmp::Reverse(e.2));
898968
without_reclaimer.sort_by_key(|e| std::cmp::Reverse(e.2));
899969
let fmt = |v: &[(usize, String, usize, usize)]| -> (String, usize) {
900970
let sum = v.iter().map(|e| e.2).sum::<usize>();
@@ -911,15 +981,22 @@ impl<I: MemoryPool> TrackConsumersPool<I> {
911981
.join("\n");
912982
(body, sum)
913983
};
914-
let (with_body, with_sum) = fmt(&with_reclaimer);
984+
let (live_body, live_sum) = fmt(&live);
985+
let (disabled_body, disabled_sum) = fmt(&disabled);
915986
let (without_body, without_sum) = fmt(&without_reclaimer);
916987
format!(
917-
"[consumer-dump] pool_reserved={} reclaimable_sum={} ({} consumers):\n{}\n\
988+
"[consumer-dump] pool_reserved={}\n\
989+
reclaimable_sum={} ({} consumers):\n{}\n\
990+
disabled_reclaimer_sum={} ({} consumers — sticky-retired, \
991+
pool can no longer free these):\n{}\n\
918992
non-reclaimable_sum={} ({} consumers):\n{}",
919993
human_readable_size(self.inner.reserved()),
920-
human_readable_size(with_sum),
921-
with_reclaimer.len(),
922-
with_body,
994+
human_readable_size(live_sum),
995+
live.len(),
996+
live_body,
997+
human_readable_size(disabled_sum),
998+
disabled.len(),
999+
disabled_body,
9231000
human_readable_size(without_sum),
9241001
without_reclaimer.len(),
9251002
without_body,

datafusion/execution/src/memory_pool/reclaimer.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,29 @@
2222
2323
use datafusion_common::Result;
2424
use std::fmt::Debug;
25+
use std::sync::Arc;
26+
use std::sync::atomic::AtomicU8;
27+
28+
/// Encoded values stored in the shared [`reclaimer_state`] tri-state
29+
/// flag. The flag is `Arc<AtomicU8>` so the operator (owner) and the
30+
/// pool (selector) can observe transitions atomically.
31+
///
32+
/// [`reclaimer_state`]: MemoryReclaimer::reclaimer_state
33+
pub mod reclaimer_state {
34+
/// Reclaimer is idle and may be selected as a walk victim.
35+
pub const AVAILABLE: u8 = 0;
36+
/// A pool walk is currently driving `reclaim` on this reclaimer.
37+
/// Other concurrent walks should skip it to avoid serializing on
38+
/// the same victim's mpsc/select! arm.
39+
pub const IN_FLIGHT: u8 = 1;
40+
/// Reclaimer has been sticky-retired by its owner — typically when
41+
/// the operator transitions past the phase where it can free
42+
/// memory (e.g., a sort entering its non-reclaimable merge phase).
43+
/// Once set, never returns to `AVAILABLE`; walks observe it
44+
/// immediately and skip without invoking `reclaim` (which would
45+
/// just return `Ok(0)` and waste a round-trip).
46+
pub const DISABLED: u8 = 2;
47+
}
2548

2649
/// Hook attached to a [`MemoryConsumer`] via
2750
/// [`MemoryConsumer::with_reclaimer`]. On
@@ -64,4 +87,21 @@ pub trait MemoryReclaimer: Send + Sync + Debug {
6487
fn priority(&self) -> i32 {
6588
0
6689
}
90+
91+
/// Optional shared flag the pool consults before invoking
92+
/// [`Self::reclaim`]. Implementations that participate in walks
93+
/// return `Some(arc)` and share the same `Arc<AtomicU8>` with the
94+
/// owning operator, so the operator can sticky-set
95+
/// [`reclaimer_state::DISABLED`] when it can no longer free memory
96+
/// (e.g., after closing its reclaim channel on the way into the
97+
/// merge phase). The pool then immediately stops picking this
98+
/// reclaimer as a victim instead of wasting a `reclaim()` call
99+
/// that would return `Ok(0)`.
100+
///
101+
/// Implementations that don't expose this flag (return `None`)
102+
/// get a pool-allocated `AVAILABLE` flag used only for in-flight
103+
/// dedup across concurrent walks.
104+
fn reclaimer_state(&self) -> Option<Arc<AtomicU8>> {
105+
None
106+
}
67107
}

0 commit comments

Comments
 (0)