diff --git a/java/vortex-jni/src/main/java/dev/vortex/api/VortexWriter.java b/java/vortex-jni/src/main/java/dev/vortex/api/VortexWriter.java index 9714399b739..9bc5dfd0cf3 100644 --- a/java/vortex-jni/src/main/java/dev/vortex/api/VortexWriter.java +++ b/java/vortex-jni/src/main/java/dev/vortex/api/VortexWriter.java @@ -42,6 +42,18 @@ static VortexWriter create(String uri, DType dtype, Map options) */ void writeBatch(byte[] arrowData) throws IOException; + /** + * Writes a batch of Arrow data directly from Arrow C Data Interface pointers. + *

+ * This avoids the IPC serialization overhead by accepting raw memory addresses + * of ArrowArray and ArrowSchema structs. + * + * @param arrowArrayAddr memory address of the ArrowArray struct + * @param arrowSchemaAddr memory address of the ArrowSchema struct + * @throws IOException if writing fails + */ + void writeBatchFfi(long arrowArrayAddr, long arrowSchemaAddr) throws IOException; + /** * Closes the writer and finalizes the Vortex file. *

diff --git a/java/vortex-jni/src/main/java/dev/vortex/jni/JNIWriter.java b/java/vortex-jni/src/main/java/dev/vortex/jni/JNIWriter.java index 7fb69248808..6b0c62ca091 100644 --- a/java/vortex-jni/src/main/java/dev/vortex/jni/JNIWriter.java +++ b/java/vortex-jni/src/main/java/dev/vortex/jni/JNIWriter.java @@ -48,6 +48,24 @@ public void writeBatch(byte[] arrowData) throws IOException { } } + /** + * Writes a batch of Arrow data directly from Arrow C Data Interface pointers. + * + * @param arrowArrayAddr memory address of the ArrowArray struct + * @param arrowSchemaAddr memory address of the ArrowSchema struct + * @throws IOException if writing fails + */ + @Override + public void writeBatchFfi(long arrowArrayAddr, long arrowSchemaAddr) throws IOException { + logger.trace("Writing batch via FFI (arrayAddr={}, schemaAddr={})", arrowArrayAddr, arrowSchemaAddr); + + boolean success = NativeWriterMethods.writeBatchFfi(ptr.getAsLong(), arrowArrayAddr, arrowSchemaAddr); + if (!success) { + logger.error("Failed to write FFI batch to Vortex file"); + throw new IOException("Failed to write FFI batch to Vortex file"); + } + } + /** * Closes the writer and finalizes the Vortex file. * diff --git a/java/vortex-jni/src/main/java/dev/vortex/jni/NativeWriterMethods.java b/java/vortex-jni/src/main/java/dev/vortex/jni/NativeWriterMethods.java index 1cb5c543427..8ddc7ce0bb1 100644 --- a/java/vortex-jni/src/main/java/dev/vortex/jni/NativeWriterMethods.java +++ b/java/vortex-jni/src/main/java/dev/vortex/jni/NativeWriterMethods.java @@ -35,6 +35,16 @@ private NativeWriterMethods() {} */ public static native boolean writeBatch(long writerPtr, byte[] arrowData); + /** + * Writes a batch of Arrow data to the Vortex file directly from Arrow C Data Interface pointers. + * + * @param writerPtr the native writer pointer + * @param arrowArrayAddr memory address of the ArrowArray struct + * @param arrowSchemaAddr memory address of the ArrowSchema struct + * @return true if successful, false otherwise + */ + public static native boolean writeBatchFfi(long writerPtr, long arrowArrayAddr, long arrowSchemaAddr); + /** * Close and flush the writer, finalizing it to the storage system. * diff --git a/java/vortex-jni/src/test/java/dev/vortex/jni/JNIWriterTest.java b/java/vortex-jni/src/test/java/dev/vortex/jni/JNIWriterTest.java index 44a0a247047..c534f273c87 100644 --- a/java/vortex-jni/src/test/java/dev/vortex/jni/JNIWriterTest.java +++ b/java/vortex-jni/src/test/java/dev/vortex/jni/JNIWriterTest.java @@ -3,16 +3,32 @@ package dev.vortex.jni; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import dev.vortex.api.DType; +import dev.vortex.api.ScanOptions; import dev.vortex.api.VortexWriter; +import dev.vortex.arrow.ArrowAllocation; +import static java.nio.charset.StandardCharsets.UTF_8; + import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.HashMap; import java.util.Map; +import org.apache.arrow.c.ArrowArray; +import org.apache.arrow.c.ArrowSchema; +import org.apache.arrow.c.Data; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -58,4 +74,68 @@ public void testCreateWriter() throws IOException { assertTrue(Files.exists(outputPath), "Output file should exist"); System.err.println("File created at: " + outputPath); } + + @Test + public void testWriteBatchFfi() throws IOException { + Path outputPath = tempDir.resolve("test_ffi.vortex"); + String writePath = outputPath.toAbsolutePath().toUri().toString(); + + var writeSchema = DType.newStruct( + new String[] {"name", "age"}, + new DType[] {DType.newUtf8(false), DType.newInt(false)}, + false); + + BufferAllocator allocator = ArrowAllocation.rootAllocator(); + + Schema arrowSchema = new Schema(java.util.List.of( + new Field("name", FieldType.notNullable(new ArrowType.Utf8()), null), + new Field("age", FieldType.notNullable(new ArrowType.Int(32, true)), null))); + + try (VortexWriter writer = VortexWriter.create(writePath, writeSchema, new HashMap<>())) { + // Build a batch with Arrow Java + try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator)) { + VarCharVector nameVec = (VarCharVector) root.getVector("name"); + IntVector ageVec = (IntVector) root.getVector("age"); + + nameVec.allocateNew(3); + ageVec.allocateNew(3); + + nameVec.setSafe(0, "Alice".getBytes(UTF_8)); + nameVec.setSafe(1, "Bob".getBytes(UTF_8)); + nameVec.setSafe(2, "Carol".getBytes(UTF_8)); + ageVec.setSafe(0, 30); + ageVec.setSafe(1, 25); + ageVec.setSafe(2, 40); + + root.setRowCount(3); + + // Export to C Data Interface + try (ArrowArray arrowArray = ArrowArray.allocateNew(allocator); + ArrowSchema arrowSchemaFfi = ArrowSchema.allocateNew(allocator)) { + Data.exportVectorSchemaRoot(allocator, root, null, arrowArray, arrowSchemaFfi); + + writer.writeBatchFfi(arrowArray.memoryAddress(), arrowSchemaFfi.memoryAddress()); + } + } + } + + assertTrue(Files.exists(outputPath), "Output file should exist"); + + // Read back and verify + try (var file = dev.vortex.api.Files.open(outputPath.toAbsolutePath().toString()); + var scan = file.newScan(ScanOptions.of())) { + assertEquals(3, file.rowCount()); + + var batch = scan.next(); + var nameField = batch.getField(0); + var ageField = batch.getField(1); + + assertEquals("Alice", nameField.getUTF8(0)); + assertEquals("Bob", nameField.getUTF8(1)); + assertEquals("Carol", nameField.getUTF8(2)); + assertEquals(30, ageField.getInt(0)); + assertEquals(25, ageField.getInt(1)); + assertEquals(40, ageField.getInt(2)); + } + } } diff --git a/vortex-jni/src/writer.rs b/vortex-jni/src/writer.rs index 6b2e1778bea..4254ddc24a8 100644 --- a/vortex-jni/src/writer.rs +++ b/vortex-jni/src/writer.rs @@ -4,6 +4,9 @@ use std::io::Cursor; use arrow_array::RecordBatch; +use arrow_array::StructArray; +use arrow_array::ffi::FFI_ArrowArray; +use arrow_array::ffi::FFI_ArrowSchema; use arrow_ipc::reader::StreamReader; use futures::SinkExt; use futures::channel::mpsc; @@ -226,6 +229,40 @@ pub extern "system" fn Java_dev_vortex_jni_NativeWriterMethods_writeBatch<'local }) } +/// Writes a batch to the Vortex file directly from Arrow C Data Interface pointers. +/// +/// This avoids the IPC serialization/deserialization overhead of `writeBatch` by accepting +/// raw Arrow FFI pointers directly. +#[unsafe(no_mangle)] +pub extern "system" fn Java_dev_vortex_jni_NativeWriterMethods_writeBatchFfi<'local>( + mut env: JNIEnv<'local>, + _class: JClass<'local>, + writer_ptr: jlong, + arrow_array_addr: jlong, + arrow_schema_addr: jlong, +) -> jboolean { + if writer_ptr <= 0 { + return JNI_FALSE; + } + + try_or_throw(&mut env, |_env| { + let writer = unsafe { NativeWriter::from_ptr(writer_ptr) }; + + // Reconstruct FFI structs from the raw pointers provided by Java. + let ffi_array = + unsafe { FFI_ArrowArray::from_raw(arrow_array_addr as *mut FFI_ArrowArray) }; + let ffi_schema = unsafe { &*(arrow_schema_addr as *const FFI_ArrowSchema) }; + + let array_data = unsafe { arrow_array::ffi::from_ffi(ffi_array, ffi_schema) } + .map_err(|e| JNIError::Vortex(vortex_err!("Failed to import Arrow FFI data: {}", e)))?; + + let batch = RecordBatch::from(StructArray::from(array_data)); + writer.write_record_batch(batch)?; + + Ok(JNI_TRUE) + }) +} + /// Closes the writer #[unsafe(no_mangle)] pub extern "system" fn Java_dev_vortex_jni_NativeWriterMethods_close<'local>(