Skip to content

Commit 8be8cf4

Browse files
author
Pradeep L
committed
searchBack Pressure
Signed-off-by: Pradeep L <spradeel@amazon.com>
1 parent f7bd4f4 commit 8be8cf4

30 files changed

Lines changed: 1659 additions & 28 deletions

sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/AnalyticsSearchBackendPlugin.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@
88

99
package org.opensearch.analytics.spi;
1010

11+
import java.util.Collections;
1112
import java.util.List;
13+
import java.util.Map;
1214

1315
/**
1416
* SPI extension point for backend query engine plugins.
@@ -113,4 +115,18 @@ default FilterDelegationHandle getFilterDelegationHandle(List<DelegatedExpressio
113115
default void configureFilterDelegation(FilterDelegationHandle handle, BackendExecutionContext backendContext) {
114116
throw new UnsupportedOperationException("configureFilterDelegation not implemented for [" + name() + "]");
115117
}
118+
119+
/**
120+
* Returns a snapshot of this backend's currently-tracked queries, keyed by {@code contextId}.
121+
*
122+
* <p>The map is a point-in-time view — entries can register or drain concurrently on the
123+
* backend side. Implementations MUST return a non-null map (empty when nothing is tracked)
124+
* and SHOULD make it unmodifiable so callers cannot mutate backend state.
125+
*
126+
* <p>Default implementation returns an empty map so backends that do not track per-query
127+
* metrics don't have to opt in.
128+
*/
129+
default Map<Long, QueryExecutionMetrics> getActiveQueryMetrics() {
130+
return Collections.emptyMap();
131+
}
116132
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.analytics.spi;
10+
11+
/**
12+
* Snapshot of per-query execution metrics reported by an analytics backend.
13+
*
14+
* <p>The context id (the map key in {@link AnalyticsSearchBackendPlugin#getActiveQueryMetrics()})
15+
* is not repeated here — this record holds only the remaining parameters: memory accounting,
16+
* wall time, and whether the query has completed but not yet been drained.
17+
*
18+
* @param currentBytes bytes currently reserved by the query's memory pool
19+
* @param peakBytes high-water mark of bytes reserved during the query's lifetime
20+
* @param wallNanos live or frozen wall-clock duration in nanoseconds
21+
* @param completed {@code true} when the query has finished executing but its entry has not
22+
* yet been drained from the backend's internal registry
23+
*
24+
* @opensearch.internal
25+
*/
26+
public record QueryExecutionMetrics(long currentBytes, long peakBytes, long wallNanos, boolean completed) {}

sandbox/libs/dataformat-native/rust/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ opensearch-repository-azure = { path = "../../../plugins/native-repository-azure
8080
opensearch-repository-fs = { path = "../../../plugins/native-repository-fs/src/main/rust" }
8181

8282
[profile.release]
83-
lto = true
83+
lto = "thin"
8484
codegen-units = 1
8585
incremental = true
8686
debug = "line-tables-only"

sandbox/plugins/analytics-backend-datafusion/rust/src/ffm.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use std::slice;
1212
use std::str;
1313
use std::sync::Arc;
1414

15+
use log::info;
1516
use native_bridge_common::ffm_safe;
1617
use parking_lot::RwLock;
1718

@@ -202,6 +203,52 @@ pub extern "C" fn df_cancel_query(context_id: i64) {
202203
api::cancel_query(context_id);
203204
}
204205

206+
// ---------------------------------------------------------------------------
207+
// Per-query registry snapshot (two-phase)
208+
//
209+
// Java calls `df_query_registry_len` to size a buffer, then `df_query_registry_snapshot`
210+
// to populate it. See `query_tracker::WireQueryMetric` for the wire layout.
211+
// ---------------------------------------------------------------------------
212+
213+
/// Returns the current number of entries in the query registry.
214+
/// Value is racy — treat it as a sizing hint only.
215+
#[no_mangle]
216+
pub extern "C" fn df_query_registry_len() -> i64 {
217+
let len = crate::query_tracker::query_registry_len();
218+
info!("[nativemem-bp] ffm.df_query_registry_len -> {}", len);
219+
len as i64
220+
}
221+
222+
/// Copies up to `cap_entries` `WireQueryMetric`s into the caller-provided buffer.
223+
/// Returns the number of entries actually written.
224+
///
225+
/// Safety: `out_ptr` must be non-null, 8-byte aligned, and point to storage for
226+
/// at least `cap_entries * size_of::<WireQueryMetric>()` bytes.
227+
#[ffm_safe]
228+
#[no_mangle]
229+
pub unsafe extern "C" fn df_query_registry_snapshot(out_ptr: *mut u8, cap_entries: i64) -> i64 {
230+
use crate::query_tracker::{snapshot_query_registry, WireQueryMetric};
231+
232+
if cap_entries < 0 {
233+
return Err(format!("negative capacity: {cap_entries}"));
234+
}
235+
if cap_entries == 0 {
236+
info!("[nativemem-bp] ffm.df_query_registry_snapshot: capacity=0, nothing to write");
237+
return Ok(0);
238+
}
239+
if out_ptr.is_null() {
240+
return Err("null snapshot buffer".to_string());
241+
}
242+
let out: &mut [WireQueryMetric] =
243+
slice::from_raw_parts_mut(out_ptr as *mut WireQueryMetric, cap_entries as usize);
244+
let written = snapshot_query_registry(out);
245+
info!(
246+
"[nativemem-bp] ffm.df_query_registry_snapshot: wrote {} entries (capacity {})",
247+
written, cap_entries
248+
);
249+
Ok(written as i64)
250+
}
251+
205252
#[ffm_safe]
206253
#[no_mangle]
207254
pub unsafe extern "C" fn df_sql_to_substrait(

0 commit comments

Comments
 (0)