Skip to content

Commit 53e2fa9

Browse files
fix for head + offset queries, which needs compaction of variable length arrow arrays before passing via FFI
Signed-off-by: bharath-techie <bharath78910@gmail.com>
1 parent bede4c9 commit 53e2fa9

1 file changed

Lines changed: 65 additions & 1 deletion

File tree

  • plugins/engine-datafusion/jni/src

plugins/engine-datafusion/jni/src/lib.rs

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use jni::objects::JLongArray;
1717
use jni::sys::{jboolean, jbyteArray, jint, jlong, jstring};
1818
use jni::{JNIEnv, JavaVM};
1919
use std::sync::{Arc, OnceLock};
20-
use arrow_array::{Array, StructArray};
20+
use arrow_array::{Array, RecordBatch, StructArray};
2121
use arrow_array::ffi::FFI_ArrowArray;
2222
use arrow_schema::ffi::FFI_ArrowSchema;
2323
use datafusion::{
@@ -651,6 +651,57 @@ pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_fetchSegm
651651
});
652652
}
653653

654+
use arrow_array::{ArrayRef, StringArray, BinaryArray, LargeStringArray, LargeBinaryArray};
655+
use arrow_schema::DataType;
656+
657+
/// Check if a variable-length array has non-zero starting offset.
658+
#[inline]
659+
fn has_nonzero_offset(col: &ArrayRef) -> bool {
660+
match col.data_type() {
661+
DataType::Utf8 | DataType::Binary => {
662+
if col.offset() > 0 { return true; }
663+
col.to_data().buffers().first()
664+
.map(|b| b.len() >= 4 && unsafe { *(b.as_ptr() as *const i32) } != 0)
665+
.unwrap_or(false)
666+
}
667+
DataType::LargeUtf8 | DataType::LargeBinary => {
668+
if col.offset() > 0 { return true; }
669+
col.to_data().buffers().first()
670+
.map(|b| b.len() >= 8 && unsafe { *(b.as_ptr() as *const i64) } != 0)
671+
.unwrap_or(false)
672+
}
673+
_ => false
674+
}
675+
}
676+
677+
/// Compact a variable-length array by rebuilding with offsets starting at 0.
678+
#[inline]
679+
fn compact_array(col: &ArrayRef) -> ArrayRef {
680+
match col.data_type() {
681+
DataType::Utf8 => Arc::new(col.as_any().downcast_ref::<StringArray>().unwrap().iter().collect::<StringArray>()),
682+
DataType::LargeUtf8 => Arc::new(col.as_any().downcast_ref::<LargeStringArray>().unwrap().iter().collect::<LargeStringArray>()),
683+
DataType::Binary => Arc::new(col.as_any().downcast_ref::<BinaryArray>().unwrap().iter().collect::<BinaryArray>()),
684+
DataType::LargeBinary => Arc::new(col.as_any().downcast_ref::<LargeBinaryArray>().unwrap().iter().collect::<LargeBinaryArray>()),
685+
_ => Arc::clone(col)
686+
}
687+
}
688+
689+
/// Normalize batch for FFI. Zero-alloc fast path when no compaction needed.
690+
#[inline]
691+
fn maybe_normalize_for_ffi(batch: RecordBatch) -> Result<RecordBatch, DataFusionError> {
692+
// Fast path check: any column need compaction?
693+
if !batch.columns().iter().any(has_nonzero_offset) {
694+
return Ok(batch);
695+
}
696+
697+
// Slow path: rebuild - Arc::clone is just refcount bump, unavoidable for new RecordBatch
698+
let columns: Vec<ArrayRef> = batch.columns().iter()
699+
.map(|col| if has_nonzero_offset(col) { compact_array(col) } else { Arc::clone(col) })
700+
.collect();
701+
702+
RecordBatch::try_new(batch.schema(), columns)
703+
.map_err(|e| DataFusionError::ArrowError(Box::from(e), None))
704+
}
654705

655706
#[no_mangle]
656707
pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_streamNext(
@@ -704,6 +755,19 @@ pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_streamNex
704755
with_jni_env(|env| {
705756
match result {
706757
Ok(Some(batch)) => {
758+
759+
// Normalize only if batch contains sliced arrays (offset > 0)
760+
// This is necessary because sliced Arrow arrays don't translate
761+
// correctly across FFI boundaries
762+
let batch = match maybe_normalize_for_ffi(batch) {
763+
Ok(b) => b,
764+
Err(e) => {
765+
log_error!("Failed to normalize batch for FFI: {}", e);
766+
set_action_listener_error_global(env, &listener_ref, &e);
767+
return;
768+
}
769+
};
770+
707771
// Convert to FFI
708772
let struct_array: StructArray = batch.into();
709773
let array_data = struct_array.into_data();

0 commit comments

Comments
 (0)