Skip to content

Commit 8f1a4f0

Browse files
committed
Use resident_bytes instead of allocated_bytes in memory guard
The memory guard's should_override() used jemalloc's allocated_bytes (live objects) to determine whether pool rejections were false positives. Under concurrent load, jemalloc retains freed pages in thread caches and arenas (dirty/muzzy decay), causing allocated to lag 10-20GB behind resident (physical RSS). This gap allowed overrides to fire when the system was near OOM. Changes: - should_override() now uses resident_bytes (physical RSS) for both admission and operator contexts - New is_memory_pressured() function for proactive admission check - query_budget: proactive RSS check at admission — when pool reserved >= admission threshold AND RSS confirms pressure, immediately reduce to min partitions (prevents concurrent burst) - Jemalloc RSS is only consulted when pool accounting shows >= threshold utilization (no overhead on the happy path) Behavior: - 0-70% pool utilized: no jemalloc call, full partitions (unchanged) - 70%+ pool AND RSS < threshold: override fires (stale accounting) - 70%+ pool AND RSS >= threshold: reduce partitions / trigger spill Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>
1 parent 4f9e89c commit 8f1a4f0

2 files changed

Lines changed: 128 additions & 3 deletions

File tree

sandbox/plugins/analytics-backend-datafusion/rust/src/memory_guard.rs

Lines changed: 105 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,12 @@ pub fn get_thresholds() -> MemoryThresholds {
8383
/// Check whether jemalloc says physical memory has headroom, meaning the
8484
/// pool's rejection is a false positive (stale accounting).
8585
///
86+
/// Uses `resident_bytes` (physical RSS) instead of `allocated_bytes` (live objects).
87+
/// `allocated_bytes` undercounts true memory pressure because jemalloc retains
88+
/// freed pages in thread caches and arenas (dirty/muzzy decay). Under concurrent
89+
/// workloads, the gap between allocated and resident can be 10-20GB, causing the
90+
/// override to fire when the system is actually near OOM.
91+
///
8692
/// Returns `true` if the override should fire (proceed despite pool rejection).
8793
/// Returns `false` if pressure is real or stats are unavailable.
8894
///
@@ -95,8 +101,8 @@ pub fn should_override(pool_limit_bytes: usize, context: OverrideContext) -> boo
95101
return false;
96102
}
97103

98-
let allocated = native_bridge_common::allocator::allocated_bytes();
99-
if allocated <= 0 {
104+
let resident = native_bridge_common::allocator::resident_bytes();
105+
if resident <= 0 {
100106
return false;
101107
}
102108

@@ -106,7 +112,31 @@ pub fn should_override(pool_limit_bytes: usize, context: OverrideContext) -> boo
106112
};
107113

108114
let threshold_bytes = (pool_limit_bytes as u64 * threshold_x1000 / 1000) as i64;
109-
allocated < threshold_bytes
115+
resident < threshold_bytes
116+
}
117+
118+
/// Proactive admission check: returns `true` if jemalloc resident memory
119+
/// already exceeds the admission threshold (70% of pool limit by default).
120+
///
121+
/// Called BEFORE query execution (at budget acquisition) to reject or reduce
122+
/// concurrency early — before any hash table allocation occurs. This prevents
123+
/// the "20 queries all pass admission simultaneously" burst that causes OOM.
124+
///
125+
/// Cost: one `epoch.advance` + stat read (~1-5µs). Called once per query at
126+
/// admission, not per-batch.
127+
pub fn is_memory_pressured(pool_limit_bytes: usize) -> bool {
128+
if pool_limit_bytes < MIN_POOL_FOR_OVERRIDE {
129+
return false;
130+
}
131+
132+
let resident = native_bridge_common::allocator::resident_bytes();
133+
if resident <= 0 {
134+
return false;
135+
}
136+
137+
let threshold_x1000 = ADMISSION_THRESHOLD_X1000.load(Ordering::Acquire);
138+
let threshold_bytes = (pool_limit_bytes as u64 * threshold_x1000 / 1000) as i64;
139+
resident >= threshold_bytes
110140
}
111141

112142
// ---------------------------------------------------------------------------
@@ -203,4 +233,76 @@ mod tests {
203233
assert!(!should_override(1_000_000, OverrideContext::Admission));
204234
assert!(!should_override(1_000_000, OverrideContext::Operator));
205235
}
236+
237+
#[test]
238+
fn should_override_uses_resident_not_allocated() {
239+
// With a large pool (1TB), resident will always be below threshold
240+
// so override should fire (resident < threshold = "headroom available")
241+
let large_pool = 1024 * 1024 * 1024 * 1024; // 1TB
242+
// This test validates the function runs without error and uses resident.
243+
// On a test process with < 1TB RSS, override should fire (we have headroom).
244+
let result = should_override(large_pool, OverrideContext::Operator);
245+
assert!(result, "With 1TB pool limit, resident should be well below threshold — override should fire");
246+
}
247+
248+
#[test]
249+
fn is_memory_pressured_false_for_large_pool() {
250+
// With a 1TB pool, current process RSS is far below 70% → not pressured
251+
let large_pool = 1024 * 1024 * 1024 * 1024; // 1TB
252+
assert!(!is_memory_pressured(large_pool));
253+
}
254+
255+
#[test]
256+
fn is_memory_pressured_true_when_rss_exceeds_limit() {
257+
// Set pool limit to something well below current process RSS.
258+
// A Rust test process typically uses 50-200MB RSS, so a 20MB limit
259+
// should always be exceeded.
260+
let small_pool = 20 * 1024 * 1024; // 20MB — above MIN_POOL_FOR_OVERRIDE
261+
let resident = native_bridge_common::allocator::resident_bytes();
262+
if resident <= 0 {
263+
return; // jemalloc not available
264+
}
265+
// Only assert if RSS is actually above 70% of 20MB = 14MB (which it will be)
266+
if resident as usize > small_pool * 70 / 100 {
267+
assert!(is_memory_pressured(small_pool));
268+
}
269+
}
270+
271+
#[test]
272+
fn is_memory_pressured_skips_small_pools() {
273+
assert!(!is_memory_pressured(1_000_000)); // 1MB — below MIN_POOL_FOR_OVERRIDE
274+
}
275+
276+
#[test]
277+
fn override_respects_operator_vs_admission_threshold() {
278+
// Operator threshold (85%) is more permissive than admission (70%).
279+
// For a pool where RSS is between 70% and 85%:
280+
// - Admission override should NOT fire (RSS >= 70% threshold)
281+
// - Operator override SHOULD fire (RSS < 85% threshold)
282+
//
283+
// We can't precisely control RSS in a unit test, but we can verify
284+
// that the thresholds are read correctly by setting them and checking
285+
// behavior with known pool sizes.
286+
let resident = native_bridge_common::allocator::resident_bytes();
287+
if resident <= 0 {
288+
return; // jemalloc not available in this test env
289+
}
290+
let resident = resident as usize;
291+
292+
// Set pool limit so that resident is exactly between 70% and 85%
293+
// pool = resident / 0.77 (midpoint) → resident/pool ≈ 77%
294+
let pool_at_midpoint = (resident as f64 / 0.77) as usize;
295+
if pool_at_midpoint < MIN_POOL_FOR_OVERRIDE {
296+
return;
297+
}
298+
299+
// At 77% utilization: admission (70%) should NOT override, operator (85%) SHOULD override
300+
let admission_result = should_override(pool_at_midpoint, OverrideContext::Admission);
301+
let operator_result = should_override(pool_at_midpoint, OverrideContext::Operator);
302+
303+
// admission: resident (77%) >= threshold (70%) → NOT below → override = false
304+
assert!(!admission_result, "At 77% RSS, admission override should NOT fire (threshold 70%)");
305+
// operator: resident (77%) < threshold (85%) → below → override = true
306+
assert!(operator_result, "At 77% RSS, operator override SHOULD fire (threshold 85%)");
307+
}
206308
}

sandbox/plugins/analytics-backend-datafusion/rust/src/query_budget.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,13 @@ pub fn acquire_budget_with_projection(
191191
///
192192
/// Tries to reserve a phantom at the configured parallelism. If the pool
193193
/// rejects it, iteratively halves target_partitions until it fits.
194+
///
195+
/// **Proactive RSS check**: before attempting pool reservation, consults
196+
/// jemalloc's resident bytes. If physical memory already exceeds the admission
197+
/// threshold (default 70% of pool limit), immediately reduces partitions to
198+
/// the minimum. This prevents the "20 queries all pass admission simultaneously"
199+
/// burst — each new query arriving when RSS is elevated starts at minimum
200+
/// parallelism, limiting its hash table growth and total memory footprint.
194201
fn acquire_budget_inner(
195202
pool: &Arc<dyn MemoryPool>,
196203
avg_row_bytes: usize,
@@ -202,6 +209,22 @@ fn acquire_budget_inner(
202209
let mut target_partitions = configured_target_partitions.max(min_partitions);
203210
let mut batch_size = configured_batch_size.max(MIN_BATCH_SIZE);
204211

212+
// Proactive admission guard: only consult jemalloc RSS when the pool's own
213+
// reservation accounting already shows pressure (>= admission threshold).
214+
// This avoids the ~5µs jemalloc epoch.advance cost on the happy path.
215+
if let Some(limit) = pool_limit(pool) {
216+
let reserved = pool.reserved();
217+
let threshold = crate::memory_guard::get_thresholds().admission;
218+
let admission_bytes = (limit as f64 * threshold) as usize;
219+
if reserved >= admission_bytes && crate::memory_guard::is_memory_pressured(limit) {
220+
native_bridge_common::log_info!(
221+
"Admission: pool reserved={}B (>={:.0}%) AND RSS at pressure — starting at min partitions={} batch_size={}",
222+
reserved, threshold * 100.0, min_partitions, batch_size
223+
);
224+
target_partitions = min_partitions;
225+
}
226+
}
227+
205228
loop {
206229
let phantom_bytes = compute_untracked_bytes_with_columns(
207230
target_partitions, batch_size, avg_row_bytes, num_columns,

0 commit comments

Comments
 (0)