Skip to content

Commit 8e0c266

Browse files
authored
Merge pull request #2 from bharath-techie/filter_delegation_e2e_rebased
making tests pass and fixing arrow stream alignment issue
2 parents 360ef34 + a2f209c commit 8e0c266

12 files changed

Lines changed: 451 additions & 523 deletions

File tree

sandbox/plugins/analytics-backend-datafusion/build.gradle

Lines changed: 0 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,6 @@
66
* compatible open source license.
77
*/
88

9-
apply plugin: 'opensearch.internal-cluster-test'
10-
11-
// SQL Unified Query API version (aligned with OpenSearch build version) — required by
12-
// the PPL transport plugin used in CoordinatorReduceIT.
13-
def sqlUnifiedQueryVersion = '3.6.0.0-SNAPSHOT'
14-
159
opensearchplugin {
1610
description = 'DataFusion native execution engine plugin for the query engine.'
1711
classname = 'org.opensearch.be.datafusion.DataFusionPlugin'
@@ -85,25 +79,6 @@ dependencies {
8579
testImplementation project(':sandbox:plugins:analytics-backend-lucene')
8680
testCompileOnly 'org.immutables:value-annotations:2.8.8'
8781

88-
// ── internalClusterTest: end-to-end coordinator-reduce IT ───────────────────
89-
// Pulls in every sibling plugin needed to construct a parquet-backed composite
90-
// index, dispatch a multi-shard PPL aggregate, and exercise DatafusionReduceSink.
91-
internalClusterTestImplementation project(':sandbox:plugins:analytics-engine')
92-
internalClusterTestImplementation project(':sandbox:plugins:parquet-data-format')
93-
internalClusterTestImplementation project(':sandbox:plugins:composite-engine')
94-
internalClusterTestImplementation project(':sandbox:plugins:analytics-backend-lucene')
95-
internalClusterTestImplementation project(':plugins:arrow-flight-rpc')
96-
internalClusterTestImplementation("org.opensearch.query:unified-query-api:${sqlUnifiedQueryVersion}") {
97-
exclude group: 'org.opensearch'
98-
}
99-
internalClusterTestImplementation("org.opensearch.query:unified-query-core:${sqlUnifiedQueryVersion}") {
100-
exclude group: 'org.opensearch'
101-
}
102-
internalClusterTestImplementation("org.opensearch.query:unified-query-ppl:${sqlUnifiedQueryVersion}") {
103-
exclude group: 'org.opensearch'
104-
}
105-
// PPL front-end plugin — provides UnifiedPPLExecuteAction transport action used by the IT.
106-
internalClusterTestImplementation project(':sandbox:plugins:test-ppl-frontend')
10782
}
10883

10984
test {
@@ -169,30 +144,10 @@ task cargoTest(type: Exec) {
169144

170145
check.dependsOn cargoTest
171146

172-
internalClusterTest {
173-
jvmArgs '--add-opens=java.base/java.nio=ALL-UNNAMED'
174-
jvmArgs '--add-opens=java.base/java.lang=ALL-UNNAMED'
175-
jvmArgs '--add-opens=java.base/sun.nio.ch=ALL-UNNAMED'
176-
jvmArgs '--enable-native-access=ALL-UNNAMED'
177-
jvmArgs '-Darrow.memory.debug.allocator=false'
178-
jvmArgs += ["--add-opens", "java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED"]
179-
systemProperty 'io.netty.allocator.numDirectArenas', '1'
180-
systemProperty 'io.netty.noUnsafe', 'false'
181-
systemProperty 'io.netty.tryUnsafe', 'true'
182-
systemProperty 'io.netty.tryReflectionSetAccessible', 'true'
183-
systemProperty 'native.lib.path', project(':sandbox:libs:dataformat-native').ext.nativeLibPath.absolutePath
184-
dependsOn ':sandbox:libs:dataformat-native:buildRustLibrary'
185-
}
186-
187147
configurations.all {
188-
// okhttp-aws-signer is a transitive dep of unified-query-common (via unified-query-core),
189-
// only published on JitPack, not needed for PPL parsing/planning
190148
exclude group: 'com.github.babbel', module: 'okhttp-aws-signer'
191149

192150
resolutionStrategy {
193-
// Align transitive versions with OpenSearch's managed versions — required because the
194-
// unified-query-* artifacts pull in older versions of common libs that conflict with
195-
// OpenSearch's enforced versions on internalClusterTest classpath.
196151
force 'com.google.guava:guava:33.4.0-jre'
197152
force 'com.google.guava:failureaccess:1.0.2'
198153
force 'com.google.errorprone:error_prone_annotations:2.36.0'

sandbox/plugins/analytics-backend-datafusion/rust/src/api.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@ pub struct QueryStreamHandle {
6969
/// Held for its `Drop` impl — marks the query completed when the
7070
/// stream is closed.
7171
_query_tracking_context: QueryTrackingContext,
72+
/// Keeps the SessionContext alive while the stream is being consumed.
73+
/// The physical plan may reference state (e.g. RuntimeEnv, caches) owned
74+
/// by the session; dropping it prematurely causes use-after-free.
75+
_session_ctx: Option<datafusion::prelude::SessionContext>,
7276
}
7377

7478
impl QueryStreamHandle {
@@ -79,6 +83,19 @@ impl QueryStreamHandle {
7983
Self {
8084
stream,
8185
_query_tracking_context: query_context,
86+
_session_ctx: None,
87+
}
88+
}
89+
90+
pub fn with_session_context(
91+
stream: RecordBatchStreamAdapter<CrossRtStream>,
92+
query_context: QueryTrackingContext,
93+
ctx: datafusion::prelude::SessionContext,
94+
) -> Self {
95+
Self {
96+
stream,
97+
_query_tracking_context: query_context,
98+
_session_ctx: Some(ctx),
8299
}
83100
}
84101
}
@@ -632,10 +649,15 @@ pub unsafe fn sender_send(
632649

633650
// `from_ffi` takes the array by value (consumes it) and the schema by
634651
// reference (it is still dropped when `ffi_schema` goes out of scope).
635-
let array_data = arrow_array::ffi::from_ffi(ffi_array, &ffi_schema).map_err(|e| {
652+
let mut array_data = arrow_array::ffi::from_ffi(ffi_array, &ffi_schema).map_err(|e| {
636653
DataFusionError::Execution(format!("Failed to import Arrow C Data array: {}", e))
637654
})?;
638655

656+
// Buffers from Java's Flight RPC deserialization may not meet Rust's
657+
// native alignment requirements. align_buffers() is a no-op for
658+
// already-aligned buffers; only misaligned ones are reallocated.
659+
array_data.align_buffers();
660+
639661
let struct_array = StructArray::from(array_data);
640662
let batch = RecordBatch::from(struct_array);
641663

sandbox/plugins/analytics-backend-datafusion/rust/src/ffm.rs

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -624,28 +624,17 @@ pub unsafe extern "C" fn df_execute_with_context(
624624
plan_ptr: *const u8,
625625
plan_len: i64,
626626
) -> i64 {
627-
// Consume the session context handle on entry. Ownership transfers here
628-
// regardless of whether the remainder of this function succeeds, returns an
629-
// error via `?`, or panics — RAII (or `catch_unwind` drop-during-unwind)
630-
// drops `session_handle` and frees the underlying SessionContext resources.
631-
//
632-
// This matches the Java-side contract: SessionContextHandle.markConsumed() is
633-
// invoked in a `finally` after the FFM downcall, so every observable path from
634-
// Java's perspective ("call.invoke ran") maps to "Rust consumed the handle".
635-
// If we were to run fallible or panic-prone code (e.g. `get_rt_manager()?`)
636-
// before Box::from_raw, the handle would leak on those paths.
637627
let session_handle = *Box::from_raw(session_ctx_ptr as *mut crate::session_context::SessionContextHandle);
638628

639629
let mgr = get_rt_manager()?;
640630
let plan_bytes = slice::from_raw_parts(plan_ptr, plan_len as usize);
641631
let cpu_executor = mgr.cpu_executor();
642632

643-
// Route based on whether the session was configured for indexed execution
644-
let handle_ref = &*(session_ctx_ptr as *const crate::session_context::SessionContextHandle);
645-
if handle_ref.indexed_config.is_some() {
633+
if session_handle.indexed_config.is_some() {
634+
let ptr = Box::into_raw(Box::new(session_handle)) as i64;
646635
mgr.io_runtime
647636
.block_on(crate::indexed_executor::execute_indexed_with_context(
648-
session_ctx_ptr,
637+
ptr,
649638
plan_bytes.to_vec(),
650639
cpu_executor,
651640
))

sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_executor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -692,6 +692,6 @@ pub async unsafe fn execute_indexed_with_context(
692692
let cross_rt_stream = CrossRtStream::new_with_df_error_stream(df_stream, cpu_executor);
693693
let schema = cross_rt_stream.schema();
694694
let wrapped = RecordBatchStreamAdapter::new(schema, cross_rt_stream);
695-
let stream_handle = crate::api::QueryStreamHandle::new(wrapped, query_context);
695+
let stream_handle = crate::api::QueryStreamHandle::with_session_context(wrapped, query_context, ctx);
696696
Ok(Box::into_raw(Box::new(stream_handle)) as i64)
697697
}

sandbox/plugins/analytics-backend-datafusion/rust/src/query_executor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,6 @@ pub async fn execute_with_context(
185185
cross_rt_stream,
186186
);
187187

188-
let stream_handle = crate::api::QueryStreamHandle::new(wrapped, handle.query_context);
188+
let stream_handle = crate::api::QueryStreamHandle::with_session_context(wrapped, handle.query_context, handle.ctx);
189189
Ok(Box::into_raw(Box::new(stream_handle)) as i64)
190190
}

sandbox/plugins/analytics-backend-datafusion/src/internalClusterTest/java/org/opensearch/be/datafusion/CoordinatorReduceIT.java

Lines changed: 0 additions & 164 deletions
This file was deleted.

0 commit comments

Comments
 (0)