Skip to content

Commit ed68a0b

Browse files
committed
Use resident_bytes instead of allocated_bytes in memory guard
The memory guard's should_override() used jemalloc's allocated_bytes (live objects) to determine whether pool rejections were false positives. Under concurrent load, jemalloc retains freed pages in thread caches and arenas (dirty/muzzy decay), causing allocated to lag 10-20GB behind resident (physical RSS). This gap allowed overrides to fire when the system was near OOM. Changes: - should_override() now uses resident_bytes (physical RSS) for both admission and operator contexts - New is_memory_pressured() function for proactive admission check - query_budget: proactive RSS check at admission — when pool reserved >= admission threshold AND RSS confirms pressure, immediately reduce to min partitions (prevents concurrent burst) - Jemalloc RSS is only consulted when pool accounting shows >= threshold utilization (no overhead on the happy path) Behavior: - 0-70% pool utilized: no jemalloc call, full partitions (unchanged) - 70%+ pool AND RSS < threshold: override fires (stale accounting) - 70%+ pool AND RSS >= threshold: reduce partitions / trigger spill Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>
1 parent 4f9e89c commit ed68a0b

6 files changed

Lines changed: 214 additions & 16 deletions

File tree

sandbox/plugins/analytics-backend-datafusion/rust/src/ffm.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -155,13 +155,14 @@ pub extern "C" fn df_set_min_target_partitions(value: i64) {
155155
api::set_min_target_partitions(value);
156156
}
157157

158-
/// Sets memory guard thresholds. admission_x1000 and operator_x1000 are
159-
/// the thresholds multiplied by 1000 (e.g., 700 = 0.70, 850 = 0.85).
158+
/// Sets memory guard thresholds. Values are thresholds multiplied by 1000
159+
/// (e.g., 700 = 0.70, 850 = 0.85, 950 = 0.95).
160160
#[no_mangle]
161-
pub extern "C" fn df_set_memory_guard_thresholds(admission_x1000: i64, operator_x1000: i64) {
161+
pub extern "C" fn df_set_memory_guard_thresholds(admission_x1000: i64, operator_x1000: i64, kill_x1000: i64) {
162162
crate::memory_guard::set_thresholds(crate::memory_guard::MemoryThresholds {
163163
admission: admission_x1000 as f64 / 1000.0,
164164
operator: operator_x1000 as f64 / 1000.0,
165+
kill: kill_x1000 as f64 / 1000.0,
165166
});
166167
}
167168

sandbox/plugins/analytics-backend-datafusion/rust/src/memory_guard.rs

Lines changed: 138 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,10 @@ use std::sync::atomic::{AtomicU64, Ordering};
2222
const MIN_POOL_FOR_OVERRIDE: usize = 16 * 1024 * 1024; // 16MB
2323

2424
// Configurable thresholds stored as fixed-point (×1000) in atomics.
25-
// Defaults: admission=70%, operator=85%.
25+
// Defaults: admission=70%, operator=85%, kill=95%.
2626
static ADMISSION_THRESHOLD_X1000: AtomicU64 = AtomicU64::new(700);
2727
static OPERATOR_THRESHOLD_X1000: AtomicU64 = AtomicU64::new(850);
28+
static KILL_THRESHOLD_X1000: AtomicU64 = AtomicU64::new(950);
2829

2930
/// Which layer is asking for the override check.
3031
#[derive(Debug, Clone, Copy)]
@@ -39,22 +40,26 @@ pub enum OverrideContext {
3940

4041
/// Configurable memory thresholds for jemalloc override decisions.
4142
///
42-
/// Both values are fractions (0.0–1.0) of the pool limit:
43-
/// - If `jemalloc_allocated < threshold × pool_limit`, the pool's rejection
43+
/// Values are fractions (0.0–1.0) of the pool limit:
44+
/// - If `jemalloc_resident < threshold × pool_limit`, the pool's rejection
4445
/// is considered a false positive and the operation proceeds.
4546
#[derive(Debug, Clone, Copy)]
4647
pub struct MemoryThresholds {
4748
/// Threshold for admission decisions (reduce partitions). Default: 0.70
4849
pub admission: f64,
4950
/// Threshold for operator decisions (trigger spill). Default: 0.85
5051
pub operator: f64,
52+
/// Threshold for query kill (cancel in-flight query). Default: 0.95
53+
/// When RSS exceeds this, spill can't save the node — cancel the query.
54+
pub kill: f64,
5155
}
5256

5357
impl Default for MemoryThresholds {
5458
fn default() -> Self {
5559
Self {
5660
admission: 0.70,
5761
operator: 0.85,
62+
kill: 0.95,
5863
}
5964
}
6065
}
@@ -70,19 +75,47 @@ pub fn set_thresholds(thresholds: MemoryThresholds) {
7075
(thresholds.operator * 1000.0) as u64,
7176
Ordering::Release,
7277
);
78+
KILL_THRESHOLD_X1000.store(
79+
(thresholds.kill * 1000.0) as u64,
80+
Ordering::Release,
81+
);
7382
}
7483

7584
/// Read current thresholds.
7685
pub fn get_thresholds() -> MemoryThresholds {
7786
MemoryThresholds {
7887
admission: ADMISSION_THRESHOLD_X1000.load(Ordering::Acquire) as f64 / 1000.0,
7988
operator: OPERATOR_THRESHOLD_X1000.load(Ordering::Acquire) as f64 / 1000.0,
89+
kill: KILL_THRESHOLD_X1000.load(Ordering::Acquire) as f64 / 1000.0,
90+
}
91+
}
92+
93+
/// Returns `true` if RSS exceeds the kill threshold — the query should be
94+
/// cancelled rather than spilled. Spill can't help at this pressure level;
95+
/// protecting the node is more important than completing the query.
96+
///
97+
/// Only called from the operator spill path (after pool rejection + override blocked).
98+
pub fn should_kill_query(pool_limit_bytes: usize) -> bool {
99+
if pool_limit_bytes < MIN_POOL_FOR_OVERRIDE {
100+
return false;
101+
}
102+
let resident = native_bridge_common::allocator::resident_bytes();
103+
if resident <= 0 {
104+
return false;
80105
}
106+
let kill_bytes = (pool_limit_bytes as u64 * KILL_THRESHOLD_X1000.load(Ordering::Acquire) / 1000) as i64;
107+
resident >= kill_bytes
81108
}
82109

83110
/// Check whether jemalloc says physical memory has headroom, meaning the
84111
/// pool's rejection is a false positive (stale accounting).
85112
///
113+
/// Uses `resident_bytes` (physical RSS) instead of `allocated_bytes` (live objects).
114+
/// `allocated_bytes` undercounts true memory pressure because jemalloc retains
115+
/// freed pages in thread caches and arenas (dirty/muzzy decay). Under concurrent
116+
/// workloads, the gap between allocated and resident can be 10-20GB, causing the
117+
/// override to fire when the system is actually near OOM.
118+
///
86119
/// Returns `true` if the override should fire (proceed despite pool rejection).
87120
/// Returns `false` if pressure is real or stats are unavailable.
88121
///
@@ -95,8 +128,8 @@ pub fn should_override(pool_limit_bytes: usize, context: OverrideContext) -> boo
95128
return false;
96129
}
97130

98-
let allocated = native_bridge_common::allocator::allocated_bytes();
99-
if allocated <= 0 {
131+
let resident = native_bridge_common::allocator::resident_bytes();
132+
if resident <= 0 {
100133
return false;
101134
}
102135

@@ -106,7 +139,31 @@ pub fn should_override(pool_limit_bytes: usize, context: OverrideContext) -> boo
106139
};
107140

108141
let threshold_bytes = (pool_limit_bytes as u64 * threshold_x1000 / 1000) as i64;
109-
allocated < threshold_bytes
142+
resident < threshold_bytes
143+
}
144+
145+
/// Proactive admission check: returns `true` if jemalloc resident memory
146+
/// already exceeds the admission threshold (70% of pool limit by default).
147+
///
148+
/// Called BEFORE query execution (at budget acquisition) to reject or reduce
149+
/// concurrency early — before any hash table allocation occurs. This prevents
150+
/// the "20 queries all pass admission simultaneously" burst that causes OOM.
151+
///
152+
/// Cost: one `epoch.advance` + stat read (~1-5µs). Called once per query at
153+
/// admission, not per-batch.
154+
pub fn is_memory_pressured(pool_limit_bytes: usize) -> bool {
155+
if pool_limit_bytes < MIN_POOL_FOR_OVERRIDE {
156+
return false;
157+
}
158+
159+
let resident = native_bridge_common::allocator::resident_bytes();
160+
if resident <= 0 {
161+
return false;
162+
}
163+
164+
let threshold_x1000 = ADMISSION_THRESHOLD_X1000.load(Ordering::Acquire);
165+
let threshold_bytes = (pool_limit_bytes as u64 * threshold_x1000 / 1000) as i64;
166+
resident >= threshold_bytes
110167
}
111168

112169
// ---------------------------------------------------------------------------
@@ -182,17 +239,20 @@ mod tests {
182239
let t = MemoryThresholds::default();
183240
assert!((t.admission - 0.70).abs() < 0.001);
184241
assert!((t.operator - 0.85).abs() < 0.001);
242+
assert!((t.kill - 0.95).abs() < 0.001);
185243
}
186244

187245
#[test]
188246
fn set_and_get_thresholds() {
189247
set_thresholds(MemoryThresholds {
190248
admission: 0.60,
191249
operator: 0.90,
250+
kill: 0.97,
192251
});
193252
let t = get_thresholds();
194253
assert!((t.admission - 0.60).abs() < 0.001);
195254
assert!((t.operator - 0.90).abs() < 0.001);
255+
assert!((t.kill - 0.97).abs() < 0.001);
196256
// Restore defaults
197257
set_thresholds(MemoryThresholds::default());
198258
}
@@ -203,4 +263,76 @@ mod tests {
203263
assert!(!should_override(1_000_000, OverrideContext::Admission));
204264
assert!(!should_override(1_000_000, OverrideContext::Operator));
205265
}
266+
267+
#[test]
268+
fn should_override_uses_resident_not_allocated() {
269+
// With a large pool (1TB), resident will always be below threshold
270+
// so override should fire (resident < threshold = "headroom available")
271+
let large_pool = 1024 * 1024 * 1024 * 1024; // 1TB
272+
// This test validates the function runs without error and uses resident.
273+
// On a test process with < 1TB RSS, override should fire (we have headroom).
274+
let result = should_override(large_pool, OverrideContext::Operator);
275+
assert!(result, "With 1TB pool limit, resident should be well below threshold — override should fire");
276+
}
277+
278+
#[test]
279+
fn is_memory_pressured_false_for_large_pool() {
280+
// With a 1TB pool, current process RSS is far below 70% → not pressured
281+
let large_pool = 1024 * 1024 * 1024 * 1024; // 1TB
282+
assert!(!is_memory_pressured(large_pool));
283+
}
284+
285+
#[test]
286+
fn is_memory_pressured_true_when_rss_exceeds_limit() {
287+
// Set pool limit to something well below current process RSS.
288+
// A Rust test process typically uses 50-200MB RSS, so a 20MB limit
289+
// should always be exceeded.
290+
let small_pool = 20 * 1024 * 1024; // 20MB — above MIN_POOL_FOR_OVERRIDE
291+
let resident = native_bridge_common::allocator::resident_bytes();
292+
if resident <= 0 {
293+
return; // jemalloc not available
294+
}
295+
// Only assert if RSS is actually above 70% of 20MB = 14MB (which it will be)
296+
if resident as usize > small_pool * 70 / 100 {
297+
assert!(is_memory_pressured(small_pool));
298+
}
299+
}
300+
301+
#[test]
302+
fn is_memory_pressured_skips_small_pools() {
303+
assert!(!is_memory_pressured(1_000_000)); // 1MB — below MIN_POOL_FOR_OVERRIDE
304+
}
305+
306+
#[test]
307+
fn override_respects_operator_vs_admission_threshold() {
308+
// Operator threshold (85%) is more permissive than admission (70%).
309+
// For a pool where RSS is between 70% and 85%:
310+
// - Admission override should NOT fire (RSS >= 70% threshold)
311+
// - Operator override SHOULD fire (RSS < 85% threshold)
312+
//
313+
// We can't precisely control RSS in a unit test, but we can verify
314+
// that the thresholds are read correctly by setting them and checking
315+
// behavior with known pool sizes.
316+
let resident = native_bridge_common::allocator::resident_bytes();
317+
if resident <= 0 {
318+
return; // jemalloc not available in this test env
319+
}
320+
let resident = resident as usize;
321+
322+
// Set pool limit so that resident is exactly between 70% and 85%
323+
// pool = resident / 0.77 (midpoint) → resident/pool ≈ 77%
324+
let pool_at_midpoint = (resident as f64 / 0.77) as usize;
325+
if pool_at_midpoint < MIN_POOL_FOR_OVERRIDE {
326+
return;
327+
}
328+
329+
// At 77% utilization: admission (70%) should NOT override, operator (85%) SHOULD override
330+
let admission_result = should_override(pool_at_midpoint, OverrideContext::Admission);
331+
let operator_result = should_override(pool_at_midpoint, OverrideContext::Operator);
332+
333+
// admission: resident (77%) >= threshold (70%) → NOT below → override = false
334+
assert!(!admission_result, "At 77% RSS, admission override should NOT fire (threshold 70%)");
335+
// operator: resident (77%) < threshold (85%) → below → override = true
336+
assert!(operator_result, "At 77% RSS, operator override SHOULD fire (threshold 85%)");
337+
}
206338
}

sandbox/plugins/analytics-backend-datafusion/rust/src/query_budget.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,13 @@ pub fn acquire_budget_with_projection(
191191
///
192192
/// Tries to reserve a phantom at the configured parallelism. If the pool
193193
/// rejects it, iteratively halves target_partitions until it fits.
194+
///
195+
/// **Proactive RSS check**: before attempting pool reservation, consults
196+
/// jemalloc's resident bytes. If physical memory already exceeds the admission
197+
/// threshold (default 70% of pool limit), immediately reduces partitions to
198+
/// the minimum. This prevents the "20 queries all pass admission simultaneously"
199+
/// burst — each new query arriving when RSS is elevated starts at minimum
200+
/// parallelism, limiting its hash table growth and total memory footprint.
194201
fn acquire_budget_inner(
195202
pool: &Arc<dyn MemoryPool>,
196203
avg_row_bytes: usize,
@@ -202,6 +209,45 @@ fn acquire_budget_inner(
202209
let mut target_partitions = configured_target_partitions.max(min_partitions);
203210
let mut batch_size = configured_batch_size.max(MIN_BATCH_SIZE);
204211

212+
// Proactive admission guard: only consult jemalloc RSS when the pool's own
213+
// reservation accounting already shows pressure (>= admission threshold).
214+
// This avoids the ~5µs jemalloc epoch.advance cost on the happy path.
215+
if let Some(limit) = pool_limit(pool) {
216+
let reserved = pool.reserved();
217+
let thresholds = crate::memory_guard::get_thresholds();
218+
let admission_bytes = (limit as f64 * thresholds.admission) as usize;
219+
if reserved >= admission_bytes {
220+
let resident = native_bridge_common::allocator::resident_bytes();
221+
if resident > 0 {
222+
let operator_bytes = (limit as f64 * thresholds.operator) as i64;
223+
if resident >= operator_bytes {
224+
// RSS at operator threshold (85%) — reject immediately.
225+
// Even at min partitions this query will hit spill on first batch.
226+
// Better to reject with clear backpressure than admit and fail slowly.
227+
native_bridge_common::log_info!(
228+
"Admission REJECTED: pool reserved={}B, RSS={}B >= operator threshold ({:.0}% of {}B). Node under memory pressure.",
229+
reserved, resident, thresholds.operator * 100.0, limit
230+
);
231+
return Err(crate::native_error::admission_rejected_error(
232+
compute_untracked_bytes_with_columns(min_partitions, MIN_BATCH_SIZE, avg_row_bytes, num_columns),
233+
min_partitions,
234+
MIN_BATCH_SIZE,
235+
avg_row_bytes,
236+
));
237+
}
238+
// RSS between admission (70%) and operator (85%) — reduce partitions
239+
let admission_threshold_bytes = (limit as f64 * thresholds.admission) as i64;
240+
if resident >= admission_threshold_bytes {
241+
native_bridge_common::log_info!(
242+
"Admission: pool reserved={}B, RSS={}B >= admission threshold ({:.0}%) — reducing to min partitions={}",
243+
reserved, resident, thresholds.admission * 100.0, min_partitions
244+
);
245+
target_partitions = min_partitions;
246+
}
247+
}
248+
}
249+
}
250+
205251
loop {
206252
let phantom_bytes = compute_untracked_bytes_with_columns(
207253
target_partitions, batch_size, avg_row_bytes, num_columns,

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionPlugin.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,20 @@ static String deriveSpillLimitDefault() {
228228
Setting.Property.Dynamic
229229
);
230230

231+
/**
232+
* RSS fraction above which in-flight queries are cancelled rather than spilled.
233+
* At this level, spill can't prevent OOM — protecting the node is priority.
234+
* Default: 0.95.
235+
*/
236+
public static final Setting<Double> DATAFUSION_MEMORY_GUARD_KILL_THRESHOLD = Setting.doubleSetting(
237+
"datafusion.memory_guard.kill_threshold",
238+
0.95,
239+
0.0,
240+
1.0,
241+
Setting.Property.NodeScope,
242+
Setting.Property.Dynamic
243+
);
244+
231245
/**
232246
* Selects how the coordinator-reduce sink hands shard responses to the native runtime.
233247
* <ul>
@@ -308,12 +322,15 @@ public Collection<Object> createComponents(
308322
.addSettingsUpdateConsumer(DATAFUSION_MEMORY_GUARD_ADMISSION_THRESHOLD, v -> updateMemoryGuardThresholds());
309323
clusterService.getClusterSettings()
310324
.addSettingsUpdateConsumer(DATAFUSION_MEMORY_GUARD_OPERATOR_THRESHOLD, v -> updateMemoryGuardThresholds());
325+
clusterService.getClusterSettings()
326+
.addSettingsUpdateConsumer(DATAFUSION_MEMORY_GUARD_KILL_THRESHOLD, v -> updateMemoryGuardThresholds());
311327

312328
// Apply initial values
313329
NativeBridge.setMinTargetPartitions(DATAFUSION_MIN_TARGET_PARTITIONS.get(settings));
314330
NativeBridge.setMemoryGuardThresholds(
315331
DATAFUSION_MEMORY_GUARD_ADMISSION_THRESHOLD.get(settings),
316-
DATAFUSION_MEMORY_GUARD_OPERATOR_THRESHOLD.get(settings)
332+
DATAFUSION_MEMORY_GUARD_OPERATOR_THRESHOLD.get(settings),
333+
DATAFUSION_MEMORY_GUARD_KILL_THRESHOLD.get(settings)
317334
);
318335

319336
this.datafusionSettings = new DatafusionSettings(clusterService);
@@ -451,8 +468,9 @@ void updateMinTargetPartitions(int value) {
451468
private void updateMemoryGuardThresholds() {
452469
double admission = clusterService.getClusterSettings().get(DATAFUSION_MEMORY_GUARD_ADMISSION_THRESHOLD);
453470
double operator = clusterService.getClusterSettings().get(DATAFUSION_MEMORY_GUARD_OPERATOR_THRESHOLD);
454-
NativeBridge.setMemoryGuardThresholds(admission, operator);
455-
logger.info("Updated DataFusion memory guard thresholds: admission={}, operator={}", admission, operator);
471+
double kill = clusterService.getClusterSettings().get(DATAFUSION_MEMORY_GUARD_KILL_THRESHOLD);
472+
NativeBridge.setMemoryGuardThresholds(admission, operator, kill);
473+
logger.info("Updated DataFusion memory guard thresholds: admission={}, operator={}, kill={}", admission, operator, kill);
456474
}
457475

458476
@Override

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ public final class DatafusionSettings {
173173
DataFusionPlugin.DATAFUSION_MIN_TARGET_PARTITIONS,
174174
DataFusionPlugin.DATAFUSION_MEMORY_GUARD_ADMISSION_THRESHOLD,
175175
DataFusionPlugin.DATAFUSION_MEMORY_GUARD_OPERATOR_THRESHOLD,
176+
DataFusionPlugin.DATAFUSION_MEMORY_GUARD_KILL_THRESHOLD,
176177

177178
// Cache settings — metadata and statistics cache configuration
178179
CacheSettings.METADATA_CACHE_SIZE_LIMIT,

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/nativelib/NativeBridge.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ public final class NativeBridge {
180180

181181
SET_MEMORY_GUARD_THRESHOLDS = linker.downcallHandle(
182182
lib.find("df_set_memory_guard_thresholds").orElseThrow(),
183-
FunctionDescriptor.ofVoid(ValueLayout.JAVA_LONG, ValueLayout.JAVA_LONG)
183+
FunctionDescriptor.ofVoid(ValueLayout.JAVA_LONG, ValueLayout.JAVA_LONG, ValueLayout.JAVA_LONG)
184184
);
185185

186186
CREATE_READER = linker.downcallHandle(
@@ -680,10 +680,10 @@ public static void setMinTargetPartitions(int value) {
680680
}
681681
}
682682

683-
/** Sets the memory guard admission and operator thresholds (0.0–1.0). */
684-
public static void setMemoryGuardThresholds(double admission, double operator) {
683+
/** Sets the memory guard admission, operator, and kill thresholds (0.0–1.0). */
684+
public static void setMemoryGuardThresholds(double admission, double operator, double kill) {
685685
try {
686-
SET_MEMORY_GUARD_THRESHOLDS.invokeExact((long) (admission * 1000), (long) (operator * 1000));
686+
SET_MEMORY_GUARD_THRESHOLDS.invokeExact((long) (admission * 1000), (long) (operator * 1000), (long) (kill * 1000));
687687
} catch (Throwable t) {
688688
logger.debug("Failed to set memory guard thresholds", t);
689689
}

0 commit comments

Comments
 (0)