Skip to content

Commit 65bfc20

Browse files
Expose tokio-metrics as datafusion plugin stats (opensearch-project#21303)
* Expose tokio-metrics as datafusion plugin stats Signed-off-by: Ajay Raj Nelapudi <ajnelapu@amazon.com> * refactor: Extract stats SPI to libs/plugin-stats-spi Signed-off-by: Ajay Raj Nelapudi <ajnelapu@amazon.com> * Move DF specific constructs to DF plugin Signed-off-by: Ajay Raj Nelapudi <ajnelapu@amazon.com> * Remove RUNTIME_METRICS_COUNT and resolve sandbox failures Signed-off-by: Ajay Raj Nelapudi <ajnelapu@amazon.com> * Add local_queue_depth to runtime metrics Signed-off-by: Ajay Raj Nelapudi <ajnelapu@amazon.com> * Use Handle::metrics() for point-in-time stats instead of interval deltas Signed-off-by: Ajay Raj Nelapudi <ajnelapu@amazon.com> --------- Signed-off-by: Ajay Raj Nelapudi <ajnelapu@amazon.com> Co-authored-by: Ajay Raj Nelapudi <ajnelapu@amazon.com>
1 parent b6c4b9d commit 65bfc20

32 files changed

Lines changed: 2907 additions & 1 deletion

gradle/libs.versions.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,10 @@ flatbuffers = "2.0.0"
109109
calcite = "1.41.0"
110110
calcite_os_rev = "1"
111111

112+
# property-based testing
113+
jqwik = "1.9.2"
114+
junit_jupiter = "5.11.3"
115+
junit_platform = "1.11.3"
112116
[libraries]
113117
antlr4-runtime = { group = "org.antlr", name = "antlr4-runtime", version.ref = "antlr4" }
114118
asm-analysis = { group = "org.ow2.asm", name = "asm-analysis", version.ref = "asm" }

sandbox/libs/dataformat-native/rust/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ thiserror = "1.0"
7070
async-trait = "0.1"
7171
bytes = "1"
7272
criterion = { version = "0.5", features = ["async_tokio"] }
73+
tokio-metrics = { version = "0.5", features = ["rt"] }
7374

7475
# Internal
7576
native-bridge-common = { path = "common" }
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
/*
10+
* SPI interfaces for plugin stats collection.
11+
* Contains only PluginStats (marker interface) and BackendStatsProvider.
12+
* Consumed by sandbox plugins that report backend statistics.
13+
*/
14+
15+
dependencies {
16+
api project(':libs:opensearch-core')
17+
api project(':libs:opensearch-common')
18+
}
19+
20+
testingConventions.enabled = false
21+
22+
tasks.named('forbiddenApisMain').configure {
23+
replaceSignatureFiles 'jdk-signatures'
24+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.plugin.stats;
10+
11+
/**
12+
* Each backend (DataFusion, Parquet, future engines) implements this interface
13+
* to provide its stats to the Mustang Stats Framework. The Analytics Plugin
14+
* discovers {@code BackendStatsProvider} implementations and iterates over them
15+
* to collect stats from all registered backends.
16+
*/
17+
public interface BackendStatsProvider {
18+
19+
/**
20+
* Returns the backend's identifier, e.g. {@code "datafusion"}, {@code "parquet"}.
21+
*
22+
* @return a non-null backend name
23+
*/
24+
String name();
25+
26+
/**
27+
* Returns the backend's stats object.
28+
*
29+
* @return a non-null {@link PluginStats} instance
30+
*/
31+
PluginStats getBackendStats();
32+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.plugin.stats;
10+
11+
/**
12+
* Marker interface for all backend stats types in the Mustang Stats Framework.
13+
*
14+
* <p>Intentionally empty — serves as the common type for
15+
* {@link BackendStatsProvider#getBackendStats()}. Each backend's top-level stats
16+
* class (e.g. {@code DataFusionStats}) implements this interface so the Analytics
17+
* Plugin can discover and iterate over them.
18+
*/
19+
public interface PluginStats {
20+
// marker — no methods
21+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
/**
10+
* SPI stats types for the Mustang Stats Framework.
11+
*
12+
* <p>This package contains the stats interfaces shared between
13+
* the OpenSearch server and native backend plugins. Types here are visible to
14+
* both sides without requiring a plugin dependency.
15+
*
16+
* <p>Key types:
17+
* <ul>
18+
* <li>{@link org.opensearch.plugin.stats.PluginStats} — marker interface for all backend stats</li>
19+
* <li>{@link org.opensearch.plugin.stats.BackendStatsProvider} — interface for backends to provide stats</li>
20+
* </ul>
21+
*/
22+
package org.opensearch.plugin.stats;

sandbox/plugins/analytics-backend-datafusion/build.gradle

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ dependencies {
3939
// Shared native bridge lib (provides the unified .so and FFM SymbolLookup)
4040
implementation project(':sandbox:libs:dataformat-native')
4141

42+
// Canonical stats SPI classes (PluginStats, BackendStatsProvider)
43+
implementation project(':sandbox:libs:plugin-stats-spi')
44+
4245
// Provided at runtime by the parent analytics-engine plugin; compile-only to avoid jar hell.
4346
compileOnly project(':sandbox:libs:analytics-framework')
4447
// analytics-engine's RelNode types (OpenSearchStageInputScan) are referenced by the
@@ -192,6 +195,47 @@ testingConventions.enabled = false
192195

193196
tasks.named('forbiddenPatterns').configure {
194197
exclude '**/*.parquet'
198+
exclude '**/*.dylib'
199+
exclude '**/*.so'
200+
exclude '**/*.dll'
201+
}
202+
203+
// ---- Property-based tests (jqwik / JUnit 5 Platform) ----
204+
205+
sourceSets {
206+
propertyTest {
207+
java {
208+
srcDir 'src/propertyTest/java'
209+
}
210+
compileClasspath += sourceSets.main.output
211+
runtimeClasspath += sourceSets.main.output
212+
}
213+
}
214+
215+
configurations {
216+
propertyTestImplementation.extendsFrom implementation, compileOnly
217+
propertyTestRuntimeOnly.extendsFrom runtimeOnly
218+
}
219+
220+
dependencies {
221+
propertyTestImplementation "net.jqwik:jqwik:${versions.jqwik}"
222+
propertyTestImplementation "org.junit.jupiter:junit-jupiter-api:${versions.junit_jupiter}"
223+
propertyTestRuntimeOnly "org.junit.platform:junit-platform-launcher:${versions.junit_platform}"
224+
// Jackson for JSON parsing in property tests
225+
propertyTestImplementation "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}"
226+
}
227+
228+
tasks.register('propertyTest', Test) {
229+
description = 'Run jqwik property-based tests'
230+
group = 'verification'
231+
useJUnitPlatform {
232+
includeEngines 'jqwik'
233+
}
234+
testClassesDirs = sourceSets.propertyTest.output.classesDirs
235+
classpath = sourceSets.propertyTest.runtimeClasspath
236+
jvmArgs += ["--add-opens", "java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED"]
237+
// Disable the security manager for property tests (jqwik is not compatible)
238+
systemProperty 'tests.security.manager', 'false'
195239
}
196240

197241
tasks.matching { it.name == 'missingJavadoc' }.configureEach {
@@ -206,3 +250,7 @@ tasks.named('thirdPartyAudit').configure {
206250
'org.apache.calcite.server.ServerDdlExecutor'
207251
)
208252
}
253+
254+
// jqwik property tests don't ship with the randomized-testing framework that
255+
// forbiddenApis signatures reference — skip the check for this source set.
256+
tasks.matching { it.name == 'forbiddenApisPropertyTest' }.configureEach { enabled = false }

sandbox/plugins/analytics-backend-datafusion/rust/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
[package]
2+
# DataFusion analytics backend native library
23
name = "opensearch-datafusion"
34
version = "0.1.0"
45
edition = "2021"
@@ -46,6 +47,8 @@ thiserror = { workspace = true }
4647
# convert_tz UDF
4748
chrono-tz = "0.10"
4849

50+
tokio-metrics = { workspace = true }
51+
4952
[dev-dependencies]
5053
criterion = { workspace = true }
5154
tempfile = { workspace = true }

sandbox/plugins/analytics-backend-datafusion/rust/src/executor.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,14 @@ impl DedicatedExecutor {
154154
}
155155
}
156156

157+
/// Returns a clone of the underlying Tokio runtime `Handle`, if the
158+
/// executor has not been shut down. Used to create a
159+
/// `tokio_metrics::RuntimeMonitor` for the CPU runtime.
160+
pub fn handle(&self) -> Option<Handle> {
161+
let state = self.state.read();
162+
state.handle.clone()
163+
}
164+
157165
pub fn shutdown(&self) {
158166
let mut state = self.state.write();
159167
state.handle = None;

sandbox/plugins/analytics-backend-datafusion/rust/src/ffm.rs

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -651,3 +651,59 @@ pub unsafe extern "C" fn df_execute_with_context(
651651
.map_err(|e| e.to_string())
652652
}
653653
}
654+
655+
// ---- Stats collection ----
656+
657+
/// Collects all native executor metrics into a caller-provided byte buffer.
658+
///
659+
/// The buffer must have capacity for at least `size_of::<DfStatsBuffer>()` bytes (224).
660+
/// Returns 0 on success.
661+
#[ffm_safe]
662+
#[no_mangle]
663+
pub unsafe extern "C" fn df_stats(out_ptr: *mut u8, out_cap: i64) -> i64 {
664+
use crate::stats::{layout, pack_runtime_metrics, pack_task_monitor, DfStatsBuffer, RuntimeMetricsRepr};
665+
use crate::task_monitors::{
666+
query_execution_monitor, stream_next_monitor,
667+
fetch_phase_monitor, segment_stats_monitor,
668+
};
669+
670+
if out_cap < 0 || (out_cap as usize) < layout::BUFFER_BYTE_SIZE {
671+
return Err(format!(
672+
"stats buffer too small: need {} but got {}",
673+
layout::BUFFER_BYTE_SIZE, out_cap
674+
));
675+
}
676+
677+
let mgr = get_rt_manager()?;
678+
679+
// IO runtime (always present)
680+
let io_runtime = pack_runtime_metrics(&mgr.io_monitor, mgr.io_runtime.handle());
681+
682+
// CPU runtime (optional — zeroed when absent)
683+
let cpu_runtime = if let Some(ref cpu_mon) = mgr.cpu_monitor {
684+
if let Some(cpu_handle) = mgr.cpu_executor.handle() {
685+
pack_runtime_metrics(cpu_mon, &cpu_handle)
686+
} else {
687+
RuntimeMetricsRepr::zeroed()
688+
}
689+
} else {
690+
RuntimeMetricsRepr::zeroed()
691+
};
692+
693+
let buf = DfStatsBuffer {
694+
io_runtime,
695+
cpu_runtime,
696+
query_execution: pack_task_monitor(query_execution_monitor()),
697+
stream_next: pack_task_monitor(stream_next_monitor()),
698+
fetch_phase: pack_task_monitor(fetch_phase_monitor()),
699+
segment_stats: pack_task_monitor(segment_stats_monitor()),
700+
};
701+
702+
// Copy struct bytes to caller buffer
703+
std::ptr::copy_nonoverlapping(
704+
&buf as *const DfStatsBuffer as *const u8,
705+
out_ptr,
706+
std::mem::size_of::<DfStatsBuffer>(),
707+
);
708+
Ok(0)
709+
}

0 commit comments

Comments
 (0)