Skip to content

Commit 525b207

Browse files
committed
Add hard guard and cached RSS for memory pool
- Add cached_resident_bytes() to memory_guard.rs (100ms refresh, CAS-guarded) as single source of truth for all RSS checks - Add hard guard at top of try_grow: reject immediately when RSS exceeds critical threshold (95%), forcing spill before the CAS can approve further growth. This catches the malloc-first burst where hashbrown::reserve() allocates outside the pool under concurrent load. - Rename kill -> critical to reflect dual purpose: force-spill (recoverable, pre-CAS) and cancel-query (last resort, post-CAS-fail) - All RSS checks (override, cancel, admission, hard guard) now use cached_resident_bytes() for consistent view and <1ns amortized hot-path cost Tested: 25 concurrent high-cardinality GROUP BY queries on r8g.2xlarge (61.6GB RAM, 30.9GB pool limit) survived at 46.5GB peak with spill, vs OOM at 60.7GB without the hard guard.
1 parent 2e32d1c commit 525b207

6 files changed

Lines changed: 221 additions & 48 deletions

File tree

sandbox/plugins/analytics-backend-datafusion/rust/src/ffm.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,11 +158,11 @@ pub extern "C" fn df_set_min_target_partitions(value: i64) {
158158
/// Sets memory guard thresholds. Values are thresholds multiplied by 1000
159159
/// (e.g., 700 = 0.70, 850 = 0.85, 950 = 0.95).
160160
#[no_mangle]
161-
pub extern "C" fn df_set_memory_guard_thresholds(admission_x1000: i64, operator_x1000: i64, kill_x1000: i64) {
161+
pub extern "C" fn df_set_memory_guard_thresholds(admission_x1000: i64, operator_x1000: i64, critical_x1000: i64) {
162162
crate::memory_guard::set_thresholds(crate::memory_guard::MemoryThresholds {
163163
admission: admission_x1000 as f64 / 1000.0,
164164
operator: operator_x1000 as f64 / 1000.0,
165-
kill: kill_x1000 as f64 / 1000.0,
165+
critical: critical_x1000 as f64 / 1000.0,
166166
});
167167
}
168168

sandbox/plugins/analytics-backend-datafusion/rust/src/memory.rs

Lines changed: 96 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,39 @@ impl MemoryPool for DynamicLimitPool {
112112
reservation: &MemoryReservation,
113113
additional: usize,
114114
) -> Result<(), DataFusionError> {
115+
// Hard guard: reject immediately if physical RSS exceeds the critical threshold.
116+
//
117+
// hashbrown::reserve() allocates via jemalloc BEFORE this try_grow is called
118+
// (malloc-first, ask-permission-later). Under concurrent load, multiple queries
119+
// grow multi-GB hash tables in parallel before any pool check occurs. This guard
120+
// catches that burst: even if pool accounting (CAS) would approve, we reject when
121+
// physical memory is critically high, forcing the operator to spill.
122+
//
123+
// Aligned with the critical threshold (95%) rather than 100% to give spill a
124+
// chance to recover before the cancel path fires. Without this, the CAS can pass
125+
// at 96% RSS (pool accounting lags jemalloc) and the cancel check never fires
126+
// (it's post-CAS-fail only).
127+
//
128+
// Uses a cached resident_bytes value (refreshed every 100ms) to avoid calling
129+
// jemalloc epoch.advance() on every batch — amortized cost <1µs.
130+
let limit = self.dynamic_limit.load(Ordering::Acquire);
131+
let resident = crate::memory_guard::cached_resident_bytes();
132+
if resident > 0 {
133+
let critical_threshold = crate::memory_guard::get_thresholds().critical;
134+
let critical_bytes = (limit as f64 * critical_threshold) as usize;
135+
if resident as usize > critical_bytes {
136+
self.tripped_count.fetch_add(1, Ordering::Relaxed);
137+
let used = self.used.load(Ordering::Relaxed);
138+
return Err(crate::native_error::pool_limit_error(
139+
additional,
140+
reservation.consumer().name(),
141+
reservation.size(),
142+
0,
143+
limit,
144+
));
145+
}
146+
}
147+
115148
let dynamic_limit = &self.dynamic_limit;
116149

117150
// Fast path: try the normal CAS against the pool limit.
@@ -148,21 +181,21 @@ impl MemoryPool for DynamicLimitPool {
148181
}
149182

150183
// Both pool and jemalloc confirm pressure. Check if RSS is critical —
151-
// if so, kill the query rather than spilling (spill can't help at 95%+).
152-
if crate::memory_guard::should_kill_query(limit) {
184+
// if so, cancel the query rather than spilling (spill can't help at 95%+).
185+
if crate::memory_guard::should_cancel_query(limit) {
153186
native_bridge_common::log_info!(
154-
"Memory KILL: RSS exceeds kill threshold for consumer [{}]. Cancelling query to protect node.",
187+
"Memory CANCEL: RSS exceeds critical threshold for consumer [{}]. Cancelling query to protect node.",
155188
reservation.consumer().name()
156189
);
157190
return Err(DataFusionError::ResourcesExhausted(format!(
158-
"Query cancelled: native memory RSS exceeds kill threshold ({}% of pool limit {}B). \
191+
"Query cancelled: native memory RSS exceeds critical threshold ({}% of pool limit {}B). \
159192
Reduce query concurrency or increase datafusion.memory_pool_limit_bytes.",
160-
(crate::memory_guard::get_thresholds().kill * 100.0) as u32,
193+
(crate::memory_guard::get_thresholds().critical * 100.0) as u32,
161194
limit
162195
)));
163196
}
164197

165-
// RSS between operator (85%) and kill (95%) — reject (operator will spill)
198+
// RSS between operator (85%) and critical (95%) — reject (operator will spill)
166199
self.tripped_count.fetch_add(1, Ordering::Relaxed);
167200
let used = self.used.load(Ordering::Relaxed);
168201
Err(crate::native_error::pool_limit_error(
@@ -409,4 +442,61 @@ mod tests {
409442
assert_eq!(handle.tripped_count(), 0);
410443
assert_eq!(pool.reserved(), 100_000);
411444
}
445+
446+
#[test]
447+
fn test_hard_guard_rejects_when_rss_exceeds_critical() {
448+
// Create a pool with a limit smaller than the current process RSS.
449+
// A Rust test process typically uses 50-200MB RSS, so 20MB should
450+
// always trigger the hard guard (RSS > 95% of 20MB = 19MB).
451+
let (pool, handle) = new_pool(20 * 1024 * 1024); // 20MB
452+
453+
let resident = crate::memory_guard::cached_resident_bytes();
454+
if resident <= 0 {
455+
return; // jemalloc not available in this test env
456+
}
457+
458+
let critical_bytes = (20.0 * 1024.0 * 1024.0 * 0.95) as i64;
459+
if resident < critical_bytes {
460+
return; // RSS unexpectedly low — skip rather than false-fail
461+
}
462+
463+
let consumer = MemoryConsumer::new("hard_guard_test");
464+
let mut reservation = consumer.register(&pool);
465+
466+
// The hard guard fires before the CAS — even a tiny grow should be rejected
467+
let result = reservation.try_grow(1024);
468+
assert!(
469+
result.is_err(),
470+
"try_grow should fail when RSS ({}) exceeds critical threshold (95% of 20MB)",
471+
resident
472+
);
473+
assert!(
474+
handle.tripped_count() >= 1,
475+
"tripped_count should increment on hard guard rejection"
476+
);
477+
}
478+
479+
#[test]
480+
fn test_hard_guard_passes_when_rss_below_critical() {
481+
// Create a pool with a huge limit (1TB). The test process RSS is well
482+
// below 95% of 1TB, so the hard guard should NOT fire.
483+
let limit = 1024 * 1024 * 1024 * 1024_usize; // 1TB
484+
let (pool, handle) = new_pool(limit);
485+
486+
let consumer = MemoryConsumer::new("large_pool_test");
487+
let mut reservation = consumer.register(&pool);
488+
489+
// A small allocation should succeed — RSS is far below 95% of 1TB
490+
let result = reservation.try_grow(4096);
491+
assert!(
492+
result.is_ok(),
493+
"try_grow should succeed when RSS is well below 95% of 1TB pool limit"
494+
);
495+
assert_eq!(
496+
handle.tripped_count(),
497+
0,
498+
"tripped_count should remain 0 when hard guard does not fire"
499+
);
500+
assert_eq!(pool.reserved(), 4096);
501+
}
412502
}

sandbox/plugins/analytics-backend-datafusion/rust/src/memory_guard.rs

Lines changed: 109 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -8,24 +8,54 @@
88

99
//! Unified jemalloc-based memory guard for pool override decisions.
1010
//!
11-
//! Provides a single entry point (`should_override`) that both the admission
12-
//! layer (`query_budget.rs`) and the operator layer (`memory.rs`) call before
13-
//! reducing partitions or triggering spill respectively.
11+
//! All RSS checks go through [`cached_resident_bytes()`] — a single source of
12+
//! truth refreshed at most once per 100ms. This avoids expensive jemalloc
13+
//! `epoch.advance()` calls on the hot path while keeping the memory picture
14+
//! consistent across all decision layers (hard guard, override, cancel, admission).
1415
//!
1516
//! Thresholds are configurable at runtime via `set_thresholds`.
1617
17-
use std::sync::atomic::{AtomicU64, Ordering};
18+
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
19+
use std::time::Instant;
20+
21+
// --- Cached RSS ---
22+
23+
const RESIDENT_CACHE_INTERVAL_MS: u64 = 100;
24+
static CACHED_RESIDENT: AtomicI64 = AtomicI64::new(0);
25+
static LAST_CHECK_MS: AtomicU64 = AtomicU64::new(0);
26+
static EPOCH_BASE: std::sync::OnceLock<Instant> = std::sync::OnceLock::new();
27+
28+
/// Returns jemalloc resident bytes, cached for up to 100ms.
29+
///
30+
/// Only one thread per interval pays the ~1-5µs `epoch.advance()` cost;
31+
/// all others get the cached value in <1ns (one atomic load).
32+
/// Used by all memory decision points: hard guard, override, cancel, admission.
33+
pub fn cached_resident_bytes() -> i64 {
34+
let base = EPOCH_BASE.get_or_init(Instant::now);
35+
let now_ms = base.elapsed().as_millis() as u64;
36+
let last = LAST_CHECK_MS.load(Ordering::Relaxed);
37+
if now_ms.saturating_sub(last) >= RESIDENT_CACHE_INTERVAL_MS {
38+
if LAST_CHECK_MS.compare_exchange(last, now_ms, Ordering::Relaxed, Ordering::Relaxed).is_ok() {
39+
let r = native_bridge_common::allocator::resident_bytes();
40+
CACHED_RESIDENT.store(r, Ordering::Relaxed);
41+
return r;
42+
}
43+
}
44+
CACHED_RESIDENT.load(Ordering::Relaxed)
45+
}
46+
47+
// --- Thresholds ---
1848

1949
/// Minimum pool size (bytes) for jemalloc override to activate.
2050
/// Below this, the pool is assumed to be a unit test / benchmark with
2151
/// artificial limits — never override.
2252
const MIN_POOL_FOR_OVERRIDE: usize = 16 * 1024 * 1024; // 16MB
2353

2454
// Configurable thresholds stored as fixed-point (×1000) in atomics.
25-
// Defaults: admission=70%, operator=85%, kill=95%.
55+
// Defaults: admission=70%, operator=85%, critical=95%.
2656
static ADMISSION_THRESHOLD_X1000: AtomicU64 = AtomicU64::new(700);
2757
static OPERATOR_THRESHOLD_X1000: AtomicU64 = AtomicU64::new(850);
28-
static KILL_THRESHOLD_X1000: AtomicU64 = AtomicU64::new(950);
58+
static CRITICAL_THRESHOLD_X1000: AtomicU64 = AtomicU64::new(950);
2959

3060
/// Which layer is asking for the override check.
3161
#[derive(Debug, Clone, Copy)]
@@ -49,17 +79,18 @@ pub struct MemoryThresholds {
4979
pub admission: f64,
5080
/// Threshold for operator decisions (trigger spill). Default: 0.85
5181
pub operator: f64,
52-
/// Threshold for query kill (cancel in-flight query). Default: 0.95
53-
/// When RSS exceeds this, spill can't save the node — cancel the query.
54-
pub kill: f64,
82+
/// Critical memory threshold. Default: 0.95
83+
/// When RSS exceeds this: the hard guard forces spill (pre-CAS path), and
84+
/// the cancel path terminates the query (post-CAS-fail path, last resort).
85+
pub critical: f64,
5586
}
5687

5788
impl Default for MemoryThresholds {
5889
fn default() -> Self {
5990
Self {
6091
admission: 0.70,
6192
operator: 0.85,
62-
kill: 0.95,
93+
critical: 0.95,
6394
}
6495
}
6596
}
@@ -75,8 +106,8 @@ pub fn set_thresholds(thresholds: MemoryThresholds) {
75106
(thresholds.operator * 1000.0) as u64,
76107
Ordering::Release,
77108
);
78-
KILL_THRESHOLD_X1000.store(
79-
(thresholds.kill * 1000.0) as u64,
109+
CRITICAL_THRESHOLD_X1000.store(
110+
(thresholds.critical * 1000.0) as u64,
80111
Ordering::Release,
81112
);
82113
}
@@ -86,25 +117,28 @@ pub fn get_thresholds() -> MemoryThresholds {
86117
MemoryThresholds {
87118
admission: ADMISSION_THRESHOLD_X1000.load(Ordering::Acquire) as f64 / 1000.0,
88119
operator: OPERATOR_THRESHOLD_X1000.load(Ordering::Acquire) as f64 / 1000.0,
89-
kill: KILL_THRESHOLD_X1000.load(Ordering::Acquire) as f64 / 1000.0,
120+
critical: CRITICAL_THRESHOLD_X1000.load(Ordering::Acquire) as f64 / 1000.0,
90121
}
91122
}
92123

93-
/// Returns `true` if RSS exceeds the kill threshold — the query should be
94-
/// cancelled rather than spilled. Spill can't help at this pressure level;
95-
/// protecting the node is more important than completing the query.
124+
/// Returns `true` if RSS exceeds the critical threshold — the query should be
125+
/// cancelled. This is the last-resort path (post-CAS-fail, post-override-denied):
126+
/// the pool rejected, jemalloc confirms pressure, and spill alone can't recover
127+
/// fast enough. Cancel the query to protect the node.
96128
///
97-
/// Only called from the operator spill path (after pool rejection + override blocked).
98-
pub fn should_kill_query(pool_limit_bytes: usize) -> bool {
129+
/// The same critical threshold is used by the hard guard (pre-CAS) to force spill
130+
/// earlier — that path is recoverable. This path fires only when spill was already
131+
/// attempted or cannot help.
132+
pub fn should_cancel_query(pool_limit_bytes: usize) -> bool {
99133
if pool_limit_bytes < MIN_POOL_FOR_OVERRIDE {
100134
return false;
101135
}
102-
let resident = native_bridge_common::allocator::resident_bytes();
136+
let resident = cached_resident_bytes();
103137
if resident <= 0 {
104138
return false;
105139
}
106-
let kill_bytes = (pool_limit_bytes as u64).saturating_mul(KILL_THRESHOLD_X1000.load(Ordering::Acquire)) / 1000;
107-
resident >= kill_bytes as i64
140+
let critical_bytes = (pool_limit_bytes as u64).saturating_mul(CRITICAL_THRESHOLD_X1000.load(Ordering::Acquire)) / 1000;
141+
resident >= critical_bytes as i64
108142
}
109143

110144
/// Check whether jemalloc says physical memory has headroom, meaning the
@@ -123,12 +157,11 @@ pub fn should_kill_query(pool_limit_bytes: usize) -> bool {
123157
/// - `pool_limit_bytes`: the pool's configured limit
124158
/// - `context`: which layer is asking (determines threshold)
125159
pub fn should_override(pool_limit_bytes: usize, context: OverrideContext) -> bool {
126-
// Skip for tiny pools (unit tests, benchmarks with artificial limits)
127160
if pool_limit_bytes < MIN_POOL_FOR_OVERRIDE {
128161
return false;
129162
}
130163

131-
let resident = native_bridge_common::allocator::resident_bytes();
164+
let resident = cached_resident_bytes();
132165
if resident <= 0 {
133166
return false;
134167
}
@@ -156,7 +189,7 @@ pub fn is_memory_pressured(pool_limit_bytes: usize) -> bool {
156189
return false;
157190
}
158191

159-
let resident = native_bridge_common::allocator::resident_bytes();
192+
let resident = cached_resident_bytes();
160193
if resident <= 0 {
161194
return false;
162195
}
@@ -239,20 +272,20 @@ mod tests {
239272
let t = MemoryThresholds::default();
240273
assert!((t.admission - 0.70).abs() < 0.001);
241274
assert!((t.operator - 0.85).abs() < 0.001);
242-
assert!((t.kill - 0.95).abs() < 0.001);
275+
assert!((t.critical - 0.95).abs() < 0.001);
243276
}
244277

245278
#[test]
246279
fn set_and_get_thresholds() {
247280
set_thresholds(MemoryThresholds {
248281
admission: 0.60,
249282
operator: 0.90,
250-
kill: 0.97,
283+
critical: 0.97,
251284
});
252285
let t = get_thresholds();
253286
assert!((t.admission - 0.60).abs() < 0.001);
254287
assert!((t.operator - 0.90).abs() < 0.001);
255-
assert!((t.kill - 0.97).abs() < 0.001);
288+
assert!((t.critical - 0.97).abs() < 0.001);
256289
// Restore defaults
257290
set_thresholds(MemoryThresholds::default());
258291
}
@@ -303,6 +336,55 @@ mod tests {
303336
assert!(!is_memory_pressured(1_000_000)); // 1MB — below MIN_POOL_FOR_OVERRIDE
304337
}
305338

339+
#[test]
340+
fn cached_resident_bytes_returns_positive() {
341+
// jemalloc is active in the test process — resident_bytes must be > 0
342+
let resident = cached_resident_bytes();
343+
assert!(resident > 0, "cached_resident_bytes() should return > 0 when jemalloc is active, got {}", resident);
344+
}
345+
346+
#[test]
347+
fn cached_resident_bytes_is_stable_within_interval() {
348+
// Two calls within <100ms should return the same cached value
349+
// (only one thread per interval refreshes the cache).
350+
let first = cached_resident_bytes();
351+
let second = cached_resident_bytes();
352+
assert_eq!(
353+
first, second,
354+
"Two immediate calls should return the same cached value"
355+
);
356+
}
357+
358+
#[test]
359+
fn should_cancel_query_false_for_small_pools() {
360+
// Pools below MIN_POOL_FOR_OVERRIDE (16MB) always return false
361+
assert!(!should_cancel_query(1_000_000)); // 1MB
362+
assert!(!should_cancel_query(8 * 1024 * 1024)); // 8MB
363+
assert!(!should_cancel_query(15 * 1024 * 1024)); // 15MB
364+
}
365+
366+
#[test]
367+
fn should_cancel_query_true_when_rss_exceeds_limit() {
368+
// With a 20MB pool limit (above MIN_POOL_FOR_OVERRIDE), the current test
369+
// process RSS should exceed 95% of 20MB = 19MB. A Rust test process
370+
// typically uses 50-200MB RSS.
371+
let small_pool = 20 * 1024 * 1024; // 20MB
372+
let resident = native_bridge_common::allocator::resident_bytes();
373+
if resident <= 0 {
374+
return; // jemalloc not available in this test env
375+
}
376+
// Only assert if RSS actually exceeds the critical threshold
377+
let critical_bytes = (small_pool as f64 * 0.95) as i64;
378+
if resident >= critical_bytes {
379+
assert!(
380+
should_cancel_query(small_pool),
381+
"should_cancel_query should return true when RSS ({}) exceeds 95% of pool ({})",
382+
resident,
383+
small_pool
384+
);
385+
}
386+
}
387+
306388
#[test]
307389
fn override_respects_operator_vs_admission_threshold() {
308390
// Operator threshold (85%) is more permissive than admission (70%).

0 commit comments

Comments
 (0)