Skip to content

Commit 9dc4717

Browse files
committed
use reclaimer handle
1 parent 9510938 commit 9dc4717

4 files changed

Lines changed: 148 additions & 96 deletions

File tree

datafusion/execution/src/memory_pool/mod.rs

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ pub use datafusion_common::{
3939
human_readable_count, human_readable_duration, human_readable_size, units,
4040
};
4141
pub use pool::*;
42-
pub use reclaimer::{MemoryReclaimer, reclaimer_state};
42+
pub use reclaimer::{MemoryReclaimer, ReclaimerHandle};
4343

4444
/// Tracks and potentially limits memory use across operators during execution.
4545
///
@@ -264,9 +264,11 @@ pub struct MemoryConsumer {
264264
name: String,
265265
can_spill: bool,
266266
id: usize,
267-
/// Reclaimer collected by reclaim-aware pools at register time. Not
268-
/// part of consumer identity (excluded from `Eq`/`Hash`).
269-
reclaimer: Option<Arc<dyn MemoryReclaimer>>,
267+
/// Reclaim hook collected by reclaim-aware pools at register time.
268+
/// The handle is the operator's control over eligibility (sticky
269+
/// disable); the trait object is the action. Not part of consumer
270+
/// identity (excluded from `Eq`/`Hash`).
271+
reclaim: Option<(Arc<dyn MemoryReclaimer>, ReclaimerHandle)>,
270272
}
271273

272274
impl PartialEq for MemoryConsumer {
@@ -305,37 +307,50 @@ impl MemoryConsumer {
305307
name: name.into(),
306308
can_spill: false,
307309
id: Self::new_unique_id(),
308-
reclaimer: None,
310+
reclaim: None,
309311
}
310312
}
311313

312314
/// Clone this [`MemoryConsumer`] with a new unique id.
313315
///
314-
/// Drops any attached reclaimer: it is bound to the original operator's
315-
/// state and would target the wrong owner under a new id (and bypass
316-
/// the id-keyed requestor-self-skip in `try_grow_async`).
316+
/// Drops any attached reclaimer/handle: they are bound to the
317+
/// original operator's state and would target the wrong owner
318+
/// under a new id (and bypass the id-keyed requestor-self-skip in
319+
/// `try_grow_async`).
317320
pub fn clone_with_new_id(&self) -> Self {
318321
Self {
319322
name: self.name.clone(),
320323
can_spill: self.can_spill,
321324
id: Self::new_unique_id(),
322-
reclaimer: None,
325+
reclaim: None,
323326
}
324327
}
325328

326-
/// Attach a [`MemoryReclaimer`] and mark this consumer spill-capable.
327-
/// Pools without reclaim support ignore the reclaimer.
328-
pub fn with_reclaimer(self, reclaimer: Arc<dyn MemoryReclaimer>) -> Self {
329+
/// Attach a [`MemoryReclaimer`] with its control [`ReclaimerHandle`]
330+
/// and mark this consumer spill-capable. Pools without reclaim
331+
/// support ignore both. The operator keeps a clone of `handle` to
332+
/// later [`ReclaimerHandle::disable`] itself once it can no longer
333+
/// free memory.
334+
pub fn with_reclaimer(
335+
self,
336+
reclaimer: Arc<dyn MemoryReclaimer>,
337+
handle: ReclaimerHandle,
338+
) -> Self {
329339
Self {
330340
can_spill: true,
331-
reclaimer: Some(reclaimer),
341+
reclaim: Some((reclaimer, handle)),
332342
..self
333343
}
334344
}
335345

336346
/// Returns the attached [`MemoryReclaimer`], if any.
337347
pub fn reclaimer(&self) -> Option<&Arc<dyn MemoryReclaimer>> {
338-
self.reclaimer.as_ref()
348+
self.reclaim.as_ref().map(|(r, _)| r)
349+
}
350+
351+
/// Returns the attached [`ReclaimerHandle`], if any.
352+
pub fn reclaimer_handle(&self) -> Option<&ReclaimerHandle> {
353+
self.reclaim.as_ref().map(|(_, h)| h)
339354
}
340355

341356
/// Return the unique id of this [`MemoryConsumer`]

datafusion/execution/src/memory_pool/pool.rs

Lines changed: 51 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
use crate::memory_pool::reclaimer::reclaimer_state;
1919
use crate::memory_pool::{
2020
MemoryConsumer, MemoryLimit, MemoryPool, MemoryReclaimer, MemoryReservation,
21-
human_readable_size,
21+
ReclaimerHandle, human_readable_size,
2222
};
2323
use datafusion_common::HashMap;
2424
use datafusion_common::{DataFusionError, Result, resources_datafusion_err};
@@ -343,13 +343,13 @@ struct TrackedConsumer {
343343
reserved: AtomicUsize,
344344
peak: AtomicUsize,
345345
reclaimer: Option<Arc<dyn MemoryReclaimer>>,
346-
/// Tri-state eligibility flag for [`reclaimer`], encoded per
347-
/// [`reclaimer_state`]. The pool flips `AVAILABLE` ↔ `IN_FLIGHT`
348-
/// for dedup; the reclaimer's owner may sticky-set `DISABLED` once
349-
/// it can no longer free memory. Shared `Arc` so the reclaimer
350-
/// side and the pool see the same cell. `None` reclaimer ⇒ flag
351-
/// is unused but still allocated.
352-
reclaimer_state: Arc<AtomicU8>,
346+
/// Eligibility handle for [`reclaimer`]. The pool flips
347+
/// `AVAILABLE` ↔ `IN_FLIGHT` on its internal cell for dedup; the
348+
/// reclaimer's owner sticky-sets `DISABLED` via [`ReclaimerHandle`].
349+
/// For consumers without a reclaimer the pool still allocates a
350+
/// fresh handle so the `self_guard` path in `try_grow_async` is
351+
/// uniform (cheap: one `Arc<AtomicU8>`).
352+
handle: ReclaimerHandle,
353353
}
354354

355355
/// RAII guard for the [`IN_FLIGHT`] slot of a [`TrackedConsumer`]'s
@@ -628,14 +628,15 @@ impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
628628
self.inner.register(consumer);
629629

630630
let reclaimer = consumer.reclaimer().cloned();
631-
// Reuse the reclaimer's own flag when it provides one — that
632-
// way the reclaimer side can sticky-set `DISABLED` and the
633-
// pool sees it on the next filter pass. Otherwise allocate a
634-
// fresh `AVAILABLE` flag for in-flight dedup only.
635-
let state = reclaimer
636-
.as_ref()
637-
.and_then(|r| r.reclaimer_state())
638-
.unwrap_or_else(|| Arc::new(AtomicU8::new(reclaimer_state::AVAILABLE)));
631+
// Use the operator-supplied handle when one is attached, so a
632+
// sticky `DISABLED` flip on the operator side is visible to
633+
// the pool on the next filter pass. For consumers without a
634+
// reclaimer, allocate a private handle so the in-flight guard
635+
// path stays uniform.
636+
let handle = consumer
637+
.reclaimer_handle()
638+
.cloned()
639+
.unwrap_or_else(ReclaimerHandle::new);
639640

640641
let mut guard = self.tracked_consumers.write();
641642
let existing = guard.insert(
@@ -646,7 +647,7 @@ impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
646647
reserved: Default::default(),
647648
peak: Default::default(),
648649
reclaimer,
649-
reclaimer_state: state,
650+
handle,
650651
},
651652
);
652653

@@ -738,23 +739,36 @@ impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
738739
.tracked_consumers
739740
.read()
740741
.get(&requestor_id)
741-
.and_then(|tc| ReclaimerStateGuard::try_acquire(&tc.reclaimer_state));
742+
.and_then(|tc| ReclaimerStateGuard::try_acquire(tc.handle.state()));
742743

743744
let mut retries: usize = 0;
744745
loop {
745-
// Snapshot reclaimers. Only consumers strictly larger than
746-
// the requestor are eligible: smaller-or-equal siblings would
747-
// free less than the requestor itself can, so the requestor
748-
// should self-spill instead. This rule also breaks the
749-
// mutual-reclaim cycle (A targets B while B targets A) — at
750-
// most one side of any pair can hold strictly more memory,
751-
// so the other side has no candidates and surfaces an error
752-
// for the caller's self-spill fallback. Filter out anyone
753-
// whose `reclaimer_state` flag is not `AVAILABLE` (in-flight or
754-
// sticky-disabled). Also count IN_FLIGHT siblings so we know
755-
// whether to wait briefly for them to finish before giving up.
756-
// Drop the read guard before awaiting any reclaim.
757-
let requestor_reserved = {
746+
// Snapshot reclaimers. Eligibility ranks by *reclaimable*
747+
// bytes (what the reclaimer believes it can free now),
748+
// not by total consumer reservation: a sorter whose
749+
// buffer just spilled may still carry a non-zero merge
750+
// reservation, but its reclaimable size is ~0 and
751+
// picking it would burn a round trip for nothing.
752+
// Reclaimers that don't report a precise bound fall
753+
// back to their tracked `reserved` as a conservative
754+
// upper bound.
755+
//
756+
// Only consumers with *strictly more* reclaimable bytes
757+
// than the requestor are eligible: smaller-or-equal
758+
// siblings would free less than the requestor itself
759+
// could, so the requestor should self-spill instead.
760+
// This rule also breaks the mutual-reclaim cycle (A
761+
// targets B while B targets A) — at most one side of
762+
// any pair can hold strictly more, so the other side
763+
// has no candidates and surfaces an error for the
764+
// caller's self-spill fallback.
765+
//
766+
// Filter out anyone whose handle is not `AVAILABLE`
767+
// (in-flight or sticky-disabled). Also count IN_FLIGHT
768+
// siblings so we know whether to wait briefly for them
769+
// to finish before giving up. Drop the read guard
770+
// before awaiting any reclaim.
771+
let requestor_reclaimable = {
758772
let guard = self.tracked_consumers.read();
759773
guard
760774
.get(&requestor_id)
@@ -776,26 +790,27 @@ impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
776790
}
777791
// Track in-flight siblings (any size) so we can
778792
// decide whether a retry has any chance of helping.
779-
let state = tc.reclaimer_state.load(Ordering::Acquire);
793+
let state = tc.handle.state().load(Ordering::Acquire);
780794
if state == reclaimer_state::IN_FLIGHT {
781795
in_flight_seen += 1;
782796
}
783797
let reclaimer = tc.reclaimer.as_ref()?;
784-
if tc.reserved() <= requestor_reserved {
798+
let reclaimable = tc.reserved();
799+
if reclaimable <= requestor_reclaimable {
785800
return None;
786801
}
787802
if state != reclaimer_state::AVAILABLE {
788803
return None;
789804
}
790805
Some((
791-
tc.reserved(),
806+
reclaimable,
792807
Arc::clone(reclaimer),
793-
Arc::clone(&tc.reclaimer_state),
808+
Arc::clone(tc.handle.state()),
794809
))
795810
})
796811
.collect()
797812
};
798-
// Order: priority desc, then reservation size desc.
813+
// Order: priority desc, then reclaimable bytes desc.
799814
candidates.sort_by(|(lr, l, _), (rr, r, _)| {
800815
r.priority().cmp(&l.priority()).then_with(|| rr.cmp(lr))
801816
});

datafusion/execution/src/memory_pool/reclaimer.rs

Lines changed: 57 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,10 @@
2323
use datafusion_common::Result;
2424
use std::fmt::Debug;
2525
use std::sync::Arc;
26-
use std::sync::atomic::AtomicU8;
26+
use std::sync::atomic::{AtomicU8, Ordering};
2727

28-
/// Encoded values stored in the [`reclaimer_state`] tri-state.
29-
///
30-
/// [`reclaimer_state`]: MemoryReclaimer::reclaimer_state
31-
pub mod reclaimer_state {
28+
/// Encoded values stored in a [`ReclaimerHandle`]'s tri-state cell.
29+
pub(crate) mod reclaimer_state {
3230
/// Reclaimer is idle and may be selected as a victim.
3331
pub const AVAILABLE: u8 = 0;
3432
/// A pool task is currently driving `reclaim` on this reclaimer.
@@ -39,6 +37,55 @@ pub mod reclaimer_state {
3937
pub const DISABLED: u8 = 2;
4038
}
4139

40+
/// Operator-side control over a reclaimer's eligibility in the pool.
41+
///
42+
/// Created by the operator and passed alongside a [`MemoryReclaimer`]
43+
/// to [`MemoryConsumer::with_reclaimer`]. The pool stores the same
44+
/// handle on its tracking entry, so any [`Self::disable`] call is
45+
/// observed on the next reclaim walk.
46+
///
47+
/// The internal `IN_FLIGHT` slot is managed by the pool for dedup and
48+
/// is opaque to operators; operators only flip `AVAILABLE` → `DISABLED`.
49+
///
50+
/// [`MemoryConsumer::with_reclaimer`]: super::MemoryConsumer::with_reclaimer
51+
#[derive(Debug, Clone)]
52+
pub struct ReclaimerHandle {
53+
state: Arc<AtomicU8>,
54+
}
55+
56+
impl ReclaimerHandle {
57+
/// Create a new handle in the `AVAILABLE` state.
58+
pub fn new() -> Self {
59+
Self {
60+
state: Arc::new(AtomicU8::new(reclaimer_state::AVAILABLE)),
61+
}
62+
}
63+
64+
/// Sticky-disable this reclaimer. After this call the pool will
65+
/// stop selecting the associated consumer as a reclaim victim.
66+
/// Idempotent.
67+
pub fn disable(&self) {
68+
self.state
69+
.store(reclaimer_state::DISABLED, Ordering::Release);
70+
}
71+
72+
/// Returns `true` once [`Self::disable`] has been called.
73+
pub fn is_disabled(&self) -> bool {
74+
self.state.load(Ordering::Acquire) == reclaimer_state::DISABLED
75+
}
76+
77+
/// Pool-internal access to the shared state cell.
78+
pub(crate) fn state(&self) -> &Arc<AtomicU8> {
79+
&self.state
80+
}
81+
}
82+
83+
impl Default for ReclaimerHandle {
84+
fn default() -> Self {
85+
Self::new()
86+
}
87+
}
88+
4289
/// Hook attached to a [`MemoryConsumer`] via
4390
/// [`MemoryConsumer::with_reclaimer`]. On
4491
/// [`MemoryPool::try_grow_async`] failure the pool walks registered
@@ -54,18 +101,18 @@ pub mod reclaimer_state {
54101
/// - Not capture `Arc<MemoryReservation>` / `Arc<MemoryConsumer>`
55102
/// (creates a cycle that blocks `unregister`); use channels or `Weak`.
56103
///
104+
/// Eligibility (whether the pool may pick this reclaimer at all) is
105+
/// controlled out-of-band via the [`ReclaimerHandle`] passed at
106+
/// registration. The trait itself is narrowly about "how many bytes
107+
/// can you free, and please free some now."
108+
///
57109
/// [`MemoryConsumer`]: super::MemoryConsumer
58110
/// [`MemoryConsumer::with_reclaimer`]: super::MemoryConsumer::with_reclaimer
59111
/// [`MemoryPool::try_grow_async`]: super::MemoryPool::try_grow_async
60112
/// [`MemoryReservation::shrink`]: super::MemoryReservation::shrink
61113
/// [`MemoryReservation::free`]: super::MemoryReservation::free
62114
#[async_trait::async_trait]
63115
pub trait MemoryReclaimer: Send + Sync + Debug {
64-
/// Upper bound on bytes this reclaimer can free. `None` = unknown.
65-
fn reclaimable_bytes(&self) -> Option<usize> {
66-
None
67-
}
68-
69116
/// Free up to `target` bytes; return the amount actually released.
70117
/// See trait-level contract.
71118
async fn reclaim(&self, target: usize) -> Result<usize>;
@@ -74,15 +121,4 @@ pub trait MemoryReclaimer: Send + Sync + Debug {
74121
fn priority(&self) -> i32 {
75122
0
76123
}
77-
78-
/// Optional shared tri-state flag controlling whether the pool
79-
/// currently considers this reclaimer eligible. Values are defined
80-
/// in [`reclaimer_state`]. Returning `Some(arc)` lets the
81-
/// reclaimer's owner flip itself to `DISABLED` once it can no
82-
/// longer free memory (e.g., on entering a merge phase), which
83-
/// the pool observes immediately. Returning `None` lets the pool
84-
/// allocate its own private flag — used only for in-flight dedup.
85-
fn reclaimer_state(&self) -> Option<Arc<AtomicU8>> {
86-
None
87-
}
88124
}

0 commit comments

Comments
 (0)