Skip to content

Commit 0ee9f72

Browse files
Add lucene ffm callbacks to task tracking (opensearch-project#21610)
* Add lucene ffm callbacks to task tracking Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com> Co-authored-by: Bukhtawar Khan <bukhtawa@amazon.com>
1 parent 446a1c9 commit 0ee9f72

10 files changed

Lines changed: 404 additions & 94 deletions

File tree

sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/AnalyticsSearchBackendPlugin.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,4 +113,10 @@ default FilterDelegationHandle getFilterDelegationHandle(List<DelegatedExpressio
113113
default void configureFilterDelegation(FilterDelegationHandle handle, BackendExecutionContext backendContext) {
114114
throw new UnsupportedOperationException("configureFilterDelegation not implemented for [" + name() + "]");
115115
}
116+
117+
/**
118+
* Install a thread tracker for attribution of delegation callbacks executing on foreign threads.
119+
* Called after {@link #configureFilterDelegation}. Pass {@code null} to clear.
120+
*/
121+
default void setDelegationThreadTracker(DelegationThreadTracker tracker) {}
116122
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.analytics.spi;
10+
11+
/**
12+
* Tracks thread-level resource attribution for delegation callbacks executing
13+
* on foreign threads (e.g., DataFusion/Tokio workers invoking Lucene via FFM).
14+
*
15+
* @opensearch.internal
16+
*/
17+
public interface DelegationThreadTracker {
18+
19+
/**
20+
* Signal that delegation work has started on the current thread.
21+
*
22+
* @return thread id to pass to {@link #trackEnd}, or {@code -1} if tracking is inactive
23+
*/
24+
long trackStart();
25+
26+
/**
27+
* Signal that delegation work has finished on the given thread.
28+
*
29+
* @param threadId the value returned by {@link #trackStart}
30+
*/
31+
void trackEnd(long threadId);
32+
}

sandbox/plugins/analytics-backend-datafusion/rust/src/query_tracker.rs

Lines changed: 30 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,8 @@
1414
//! usage independently.
1515
//!
1616
//! [`QueryTrackingContext`] owns the per-query pool and tracker, auto-registers
17-
//! in the global [`QueryRegistry`] on creation, and marks the query completed
18-
//! on [`Drop`]. The registry retains completed entries so Java can retrieve
19-
//! final metrics via JNI before explicitly draining them.
17+
//! in the global [`QueryRegistry`] on creation, and removes the entry
18+
//! on [`Drop`].
2019
2120
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
2221
use std::sync::Arc;
@@ -151,14 +150,6 @@ impl QueryTracker {
151150

152151
static QUERY_REGISTRY: Lazy<DashMap<i64, Arc<QueryTracker>>> = Lazy::new(DashMap::new);
153152

154-
/// Remove a completed tracker from the registry and return it.
155-
/// Called from JNI after Java has consumed the metrics.
156-
pub fn drain_completed_query(context_id: i64) -> Option<Arc<QueryTracker>> {
157-
QUERY_REGISTRY
158-
.remove_if(&context_id, |_, t| t.is_completed())
159-
.map(|(_, t)| t)
160-
}
161-
162153
/// Fire the cancellation token for the given context_id.
163154
/// No-op for unknown or already-completed queries.
164155
pub fn cancel_query(context_id: i64) {
@@ -179,8 +170,7 @@ pub fn get_cancellation_token(context_id: i64) -> Option<CancellationToken> {
179170
/// Per-query context that owns the memory pool and tracker.
180171
///
181172
/// - On creation: registers the tracker in the global registry.
182-
/// - On [`Drop`]: marks the tracker completed and logs final metrics.
183-
/// The tracker stays in the registry for JNI retrieval.
173+
/// - On [`Drop`]: removes the tracker from the registry and logs final metrics.
184174
///
185175
/// For `context_id == 0` (unset), no tracking is performed.
186176
#[derive(Debug)]
@@ -237,6 +227,7 @@ impl Drop for QueryTrackingContext {
237227
tracker.memory_pool.current_bytes(),
238228
tracker.memory_pool.peak_bytes(),
239229
);
230+
QUERY_REGISTRY.remove(&tracker.context_id);
240231
}
241232
}
242233
}
@@ -324,41 +315,30 @@ mod tests {
324315
}
325316

326317
#[test]
327-
fn test_context_registers_in_registry() {
318+
fn test_context_registers_and_removes_on_drop() {
328319
let global = make_global_pool(10_000);
329320
let ctx_id = 50_000;
330321
let ctx = QueryTrackingContext::new(ctx_id, global);
331322
assert!(ctx.memory_pool().is_some());
332323
assert!(QUERY_REGISTRY.contains_key(&ctx_id));
333324

334325
drop(ctx);
335-
// Still in registry after drop (completed, not drained)
336-
assert!(QUERY_REGISTRY.contains_key(&ctx_id));
337-
assert!(QUERY_REGISTRY.get(&ctx_id).unwrap().is_completed());
338-
339-
// Drain removes it
340-
let drained = drain_completed_query(ctx_id);
341-
assert!(drained.is_some());
326+
// Removed from registry after drop
342327
assert!(!QUERY_REGISTRY.contains_key(&ctx_id));
343328
}
344329

345330
#[test]
346-
fn test_drop_marks_completed_and_freezes_wall_time() {
331+
fn test_drop_removes_from_registry() {
347332
let global = make_global_pool(10_000);
348333
let ctx_id = 50_001;
349334
let ctx = QueryTrackingContext::new(ctx_id, global);
350335

336+
assert!(QUERY_REGISTRY.contains_key(&ctx_id));
351337
thread::sleep(Duration::from_millis(50));
352338
drop(ctx);
353339

354-
let tracker = QUERY_REGISTRY.get(&ctx_id).unwrap();
355-
assert!(tracker.is_completed());
356-
let frozen = tracker.wall_secs();
357-
thread::sleep(Duration::from_millis(50));
358-
assert!((tracker.wall_secs() - frozen).abs() < 0.001);
359-
360-
drop(tracker);
361-
QUERY_REGISTRY.remove(&ctx_id);
340+
// Entry is gone after drop
341+
assert!(!QUERY_REGISTRY.contains_key(&ctx_id));
362342
}
363343

364344
#[test]
@@ -373,26 +353,7 @@ mod tests {
373353
assert!(t2 - t1 >= 0.04);
374354

375355
drop(_ctx);
376-
QUERY_REGISTRY.remove(&ctx_id);
377-
}
378-
379-
#[test]
380-
fn test_drain_returns_none_for_active_query() {
381-
let global = make_global_pool(10_000);
382-
let ctx_id = 50_003;
383-
let _ctx = QueryTrackingContext::new(ctx_id, global);
384-
385-
// Cannot drain while still active
386-
assert!(drain_completed_query(ctx_id).is_none());
387-
assert!(QUERY_REGISTRY.contains_key(&ctx_id));
388-
389-
drop(_ctx);
390-
assert!(drain_completed_query(ctx_id).is_some());
391-
}
392-
393-
#[test]
394-
fn test_drain_nonexistent_is_none() {
395-
assert!(drain_completed_query(99_999).is_none());
356+
// drop removes from registry automatically
396357
}
397358

398359
#[test]
@@ -416,21 +377,15 @@ mod tests {
416377
assert_eq!(qp.current_bytes(), 2000);
417378
assert_eq!(qp.peak_bytes(), 8000);
418379

419-
// Drop context — marks completed
380+
// Drop context — removes from registry
420381
drop(ctx);
421-
{
422-
let tracker = QUERY_REGISTRY.get(&ctx_id).unwrap();
423-
assert!(tracker.is_completed());
424-
assert_eq!(tracker.memory_pool.peak_bytes(), 8000);
425-
assert!(tracker.wall_secs() > 0.0);
426-
}
382+
assert!(!QUERY_REGISTRY.contains_key(&ctx_id));
427383

428-
// Drop reservation — current goes to 0, peak stays
429-
drop(reservation);
430-
assert_eq!(qp.current_bytes(), 0);
384+
// Pool still works (Arc kept alive by qp)
431385
assert_eq!(qp.peak_bytes(), 8000);
432386

433-
QUERY_REGISTRY.remove(&ctx_id);
387+
drop(reservation);
388+
assert_eq!(qp.current_bytes(), 0);
434389
}
435390

436391
#[test]
@@ -457,22 +412,21 @@ mod tests {
457412

458413
// Drop one, other keeps running
459414
drop(ctx_a);
460-
assert!(QUERY_REGISTRY.get(&ctx_a_id).unwrap().is_completed());
461-
assert!(!QUERY_REGISTRY.get(&ctx_b_id).unwrap().is_completed());
415+
assert!(!QUERY_REGISTRY.contains_key(&ctx_a_id));
416+
assert!(QUERY_REGISTRY.contains_key(&ctx_b_id));
462417

463418
drop(res_a);
464419
drop(res_b);
465420
drop(ctx_b);
466-
QUERY_REGISTRY.remove(&ctx_a_id);
467-
QUERY_REGISTRY.remove(&ctx_b_id);
421+
assert!(!QUERY_REGISTRY.contains_key(&ctx_b_id));
468422
}
469423

470424
// -----------------------------------------------------------------------
471425
// Query lifecycle tests (simulating stream completion and error paths)
472426
// -----------------------------------------------------------------------
473427

474428
#[test]
475-
fn test_context_completes_on_normal_drop_with_stream() {
429+
fn test_context_removes_on_normal_drop_with_stream() {
476430
// Simulates: query succeeds → stream is consumed → handle dropped
477431
let global = make_global_pool(1_000_000);
478432
let ctx_id = 50_010;
@@ -485,25 +439,21 @@ mod tests {
485439
// Simulate allocations during stream consumption
486440
reservation.try_grow(4000).unwrap();
487441
assert_eq!(qp.peak_bytes(), 4000);
488-
assert!(!QUERY_REGISTRY.get(&ctx_id).unwrap().is_completed());
442+
assert!(QUERY_REGISTRY.contains_key(&ctx_id));
489443

490444
// Stream fully consumed — reservation and context dropped together
491-
// (mirrors QueryStreamHandle being dropped in streamClose)
492445
drop(reservation);
493446
drop(ctx);
494447

495-
let tracker = QUERY_REGISTRY.get(&ctx_id).unwrap();
496-
assert!(tracker.is_completed());
497-
assert_eq!(tracker.memory_pool.peak_bytes(), 4000);
498-
assert_eq!(tracker.memory_pool.current_bytes(), 0);
499-
assert!(tracker.wall_secs() > 0.0);
500-
501-
drop(tracker);
502-
QUERY_REGISTRY.remove(&ctx_id);
448+
// Removed from registry
449+
assert!(!QUERY_REGISTRY.contains_key(&ctx_id));
450+
// Pool stats still accessible via Arc
451+
assert_eq!(qp.peak_bytes(), 4000);
452+
assert_eq!(qp.current_bytes(), 0);
503453
}
504454

505455
#[test]
506-
fn test_context_completes_on_error_drop() {
456+
fn test_context_removes_on_error_drop() {
507457
// Simulates: query execution fails → context dropped without
508458
// explicit cleanup (the error path in executeQueryPhaseAsync)
509459
let global = make_global_pool(1_000_000);
@@ -512,18 +462,9 @@ mod tests {
512462
{
513463
let ctx = QueryTrackingContext::new(ctx_id, global);
514464
let _pool = ctx.memory_pool();
515-
assert!(!QUERY_REGISTRY.get(&ctx_id).unwrap().is_completed());
516-
517-
// Simulate error: context goes out of scope, no stream was created
518-
} // ctx dropped here — should still mark completed
465+
assert!(QUERY_REGISTRY.contains_key(&ctx_id));
466+
} // ctx dropped here — removes from registry
519467

520-
let tracker = QUERY_REGISTRY.get(&ctx_id).unwrap();
521-
assert!(tracker.is_completed());
522-
assert_eq!(tracker.memory_pool.peak_bytes(), 0);
523-
assert_eq!(tracker.memory_pool.current_bytes(), 0);
524-
525-
drop(tracker);
526-
let drained = drain_completed_query(ctx_id);
527-
assert!(drained.is_some());
468+
assert!(!QUERY_REGISTRY.contains_key(&ctx_id));
528469
}
529470
}

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionAnalyticsBackendPlugin.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -573,4 +573,9 @@ public void configureFilterDelegation(FilterDelegationHandle handle, BackendExec
573573
// (createProvider, createCollector, collectDocs, release*) route to it.
574574
FilterTreeCallbacks.setHandle(handle);
575575
}
576+
577+
@Override
578+
public void setDelegationThreadTracker(org.opensearch.analytics.spi.DelegationThreadTracker tracker) {
579+
FilterTreeCallbacks.setThreadTracker(tracker);
580+
}
576581
}

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/indexfilter/FilterTreeCallbacks.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.apache.logging.log4j.LogManager;
1212
import org.apache.logging.log4j.Logger;
1313
import org.apache.logging.log4j.message.ParameterizedMessage;
14+
import org.opensearch.analytics.spi.DelegationThreadTracker;
1415
import org.opensearch.analytics.spi.FilterDelegationHandle;
1516

1617
import java.lang.foreign.MemorySegment;
@@ -35,6 +36,7 @@ public final class FilterTreeCallbacks {
3536
private static final Logger LOGGER = LogManager.getLogger(FilterTreeCallbacks.class);
3637

3738
private static final AtomicReference<FilterDelegationHandle> HANDLE = new AtomicReference<>();
39+
private static final AtomicReference<DelegationThreadTracker> TRACKER = new AtomicReference<>();
3840

3941
private FilterTreeCallbacks() {}
4042

@@ -47,12 +49,31 @@ public static void setHandle(FilterDelegationHandle handle) {
4749
HANDLE.set(handle);
4850
}
4951

52+
/**
53+
* Install or clear the thread tracker for resource attribution.
54+
*/
55+
public static void setThreadTracker(DelegationThreadTracker tracker) {
56+
TRACKER.set(tracker);
57+
}
58+
59+
private static long trackStart() {
60+
DelegationThreadTracker t = TRACKER.get();
61+
return (t != null) ? t.trackStart() : -1;
62+
}
63+
64+
private static void trackEnd(long threadId) {
65+
if (threadId < 0) return;
66+
DelegationThreadTracker t = TRACKER.get();
67+
if (t != null) t.trackEnd(threadId);
68+
}
69+
5070
// ── Provider lifecycle (cold path, once per query) ────────────────
5171

5272
/**
5373
* {@code createProvider(annotationId) -> providerKey|-1}.
5474
*/
5575
public static int createProvider(int annotationId) {
76+
long tid = trackStart();
5677
try {
5778
FilterDelegationHandle handle = HANDLE.get();
5879
if (handle == null) {
@@ -62,6 +83,8 @@ public static int createProvider(int annotationId) {
6283
} catch (Throwable throwable) {
6384
LOGGER.error("createProvider failed for annotationId=" + annotationId, throwable);
6485
return -1;
86+
} finally {
87+
trackEnd(tid);
6588
}
6689
}
6790

@@ -85,6 +108,7 @@ public static void releaseProvider(int providerKey) {
85108
* {@code createCollector(providerKey, segmentOrd, minDoc, maxDoc) -> collectorKey|-1}.
86109
*/
87110
public static int createCollector(int providerKey, int segmentOrd, int minDoc, int maxDoc) {
111+
long tid = trackStart();
88112
try {
89113
FilterDelegationHandle handle = HANDLE.get();
90114
if (handle == null) {
@@ -103,13 +127,16 @@ public static int createCollector(int providerKey, int segmentOrd, int minDoc, i
103127
throwable
104128
);
105129
return -1;
130+
} finally {
131+
trackEnd(tid);
106132
}
107133
}
108134

109135
/**
110136
* {@code collectDocs(collectorKey, minDoc, maxDoc, outPtr, outWordCap) -> wordsWritten|-1}.
111137
*/
112138
public static long collectDocs(int collectorKey, int minDoc, int maxDoc, MemorySegment outPtr, long outWordCap) {
139+
long tid = trackStart();
113140
try {
114141
FilterDelegationHandle handle = HANDLE.get();
115142
if (handle == null) {
@@ -125,6 +152,8 @@ public static long collectDocs(int collectorKey, int minDoc, int maxDoc, MemoryS
125152
throwable
126153
);
127154
return -1L;
155+
} finally {
156+
trackEnd(tid);
128157
}
129158
}
130159

0 commit comments

Comments
 (0)