Skip to content

Commit da5b743

Browse files
committed
feat: Introduce writeBatchFfi in Java JNI to avoid IPC serialization overhead
Signed-off-by: JingsongLi <jingsonglee0@gmail.com>
1 parent e8cd130 commit da5b743

5 files changed

Lines changed: 157 additions & 0 deletions

File tree

java/vortex-jni/src/main/java/dev/vortex/api/VortexWriter.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,18 @@ static VortexWriter create(String uri, DType dtype, Map<String, String> options)
4242
*/
4343
void writeBatch(byte[] arrowData) throws IOException;
4444

45+
/**
46+
* Writes a batch of Arrow data directly from Arrow C Data Interface pointers.
47+
* <p>
48+
* This avoids the IPC serialization overhead by accepting raw memory addresses
49+
* of ArrowArray and ArrowSchema structs.
50+
*
51+
* @param arrowArrayAddr memory address of the ArrowArray struct
52+
* @param arrowSchemaAddr memory address of the ArrowSchema struct
53+
* @throws IOException if writing fails
54+
*/
55+
void writeBatchFfi(long arrowArrayAddr, long arrowSchemaAddr) throws IOException;
56+
4557
/**
4658
* Closes the writer and finalizes the Vortex file.
4759
* <p>

java/vortex-jni/src/main/java/dev/vortex/jni/JNIWriter.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,24 @@ public void writeBatch(byte[] arrowData) throws IOException {
4848
}
4949
}
5050

51+
/**
52+
* Writes a batch of Arrow data directly from Arrow C Data Interface pointers.
53+
*
54+
* @param arrowArrayAddr memory address of the ArrowArray struct
55+
* @param arrowSchemaAddr memory address of the ArrowSchema struct
56+
* @throws IOException if writing fails
57+
*/
58+
@Override
59+
public void writeBatchFfi(long arrowArrayAddr, long arrowSchemaAddr) throws IOException {
60+
logger.trace("Writing batch via FFI (arrayAddr={}, schemaAddr={})", arrowArrayAddr, arrowSchemaAddr);
61+
62+
boolean success = NativeWriterMethods.writeBatchFfi(ptr.getAsLong(), arrowArrayAddr, arrowSchemaAddr);
63+
if (!success) {
64+
logger.error("Failed to write FFI batch to Vortex file");
65+
throw new IOException("Failed to write FFI batch to Vortex file");
66+
}
67+
}
68+
5169
/**
5270
* Closes the writer and finalizes the Vortex file.
5371
*

java/vortex-jni/src/main/java/dev/vortex/jni/NativeWriterMethods.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,16 @@ private NativeWriterMethods() {}
3535
*/
3636
public static native boolean writeBatch(long writerPtr, byte[] arrowData);
3737

38+
/**
39+
* Writes a batch of Arrow data to the Vortex file directly from Arrow C Data Interface pointers.
40+
*
41+
* @param writerPtr the native writer pointer
42+
* @param arrowArrayAddr memory address of the ArrowArray struct
43+
* @param arrowSchemaAddr memory address of the ArrowSchema struct
44+
* @return true if successful, false otherwise
45+
*/
46+
public static native boolean writeBatchFfi(long writerPtr, long arrowArrayAddr, long arrowSchemaAddr);
47+
3848
/**
3949
* Close and flush the writer, finalizing it to the storage system.
4050
*

java/vortex-jni/src/test/java/dev/vortex/jni/JNIWriterTest.java

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,32 @@
33

44
package dev.vortex.jni;
55

6+
import static org.junit.jupiter.api.Assertions.assertEquals;
67
import static org.junit.jupiter.api.Assertions.assertNotNull;
78
import static org.junit.jupiter.api.Assertions.assertTrue;
89

910
import dev.vortex.api.DType;
11+
import dev.vortex.api.ScanOptions;
1012
import dev.vortex.api.VortexWriter;
13+
import dev.vortex.arrow.ArrowAllocation;
14+
import static java.nio.charset.StandardCharsets.UTF_8;
15+
1116
import java.io.IOException;
1217
import java.nio.file.Files;
1318
import java.nio.file.Path;
1419
import java.util.HashMap;
1520
import java.util.Map;
21+
import org.apache.arrow.c.ArrowArray;
22+
import org.apache.arrow.c.ArrowSchema;
23+
import org.apache.arrow.c.Data;
24+
import org.apache.arrow.memory.BufferAllocator;
25+
import org.apache.arrow.vector.IntVector;
26+
import org.apache.arrow.vector.VarCharVector;
27+
import org.apache.arrow.vector.VectorSchemaRoot;
28+
import org.apache.arrow.vector.types.pojo.ArrowType;
29+
import org.apache.arrow.vector.types.pojo.Field;
30+
import org.apache.arrow.vector.types.pojo.FieldType;
31+
import org.apache.arrow.vector.types.pojo.Schema;
1632
import org.junit.jupiter.api.BeforeAll;
1733
import org.junit.jupiter.api.Test;
1834
import org.junit.jupiter.api.io.TempDir;
@@ -58,4 +74,68 @@ public void testCreateWriter() throws IOException {
5874
assertTrue(Files.exists(outputPath), "Output file should exist");
5975
System.err.println("File created at: " + outputPath);
6076
}
77+
78+
@Test
79+
public void testWriteBatchFfi() throws IOException {
80+
Path outputPath = tempDir.resolve("test_ffi.vortex");
81+
String writePath = outputPath.toAbsolutePath().toUri().toString();
82+
83+
var writeSchema = DType.newStruct(
84+
new String[] {"name", "age"},
85+
new DType[] {DType.newUtf8(false), DType.newInt(false)},
86+
false);
87+
88+
BufferAllocator allocator = ArrowAllocation.rootAllocator();
89+
90+
Schema arrowSchema = new Schema(java.util.List.of(
91+
new Field("name", FieldType.notNullable(new ArrowType.Utf8()), null),
92+
new Field("age", FieldType.notNullable(new ArrowType.Int(32, true)), null)));
93+
94+
try (VortexWriter writer = VortexWriter.create(writePath, writeSchema, new HashMap<>())) {
95+
// Build a batch with Arrow Java
96+
try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator)) {
97+
VarCharVector nameVec = (VarCharVector) root.getVector("name");
98+
IntVector ageVec = (IntVector) root.getVector("age");
99+
100+
nameVec.allocateNew(3);
101+
ageVec.allocateNew(3);
102+
103+
nameVec.setSafe(0, "Alice".getBytes(UTF_8));
104+
nameVec.setSafe(1, "Bob".getBytes(UTF_8));
105+
nameVec.setSafe(2, "Carol".getBytes(UTF_8));
106+
ageVec.setSafe(0, 30);
107+
ageVec.setSafe(1, 25);
108+
ageVec.setSafe(2, 40);
109+
110+
root.setRowCount(3);
111+
112+
// Export to C Data Interface
113+
try (ArrowArray arrowArray = ArrowArray.allocateNew(allocator);
114+
ArrowSchema arrowSchemaFfi = ArrowSchema.allocateNew(allocator)) {
115+
Data.exportVectorSchemaRoot(allocator, root, null, arrowArray, arrowSchemaFfi);
116+
117+
writer.writeBatchFfi(arrowArray.memoryAddress(), arrowSchemaFfi.memoryAddress());
118+
}
119+
}
120+
}
121+
122+
assertTrue(Files.exists(outputPath), "Output file should exist");
123+
124+
// Read back and verify
125+
try (var file = dev.vortex.api.Files.open(outputPath.toAbsolutePath().toString());
126+
var scan = file.newScan(ScanOptions.of())) {
127+
assertEquals(3, file.rowCount());
128+
129+
var batch = scan.next();
130+
var nameField = batch.getField(0);
131+
var ageField = batch.getField(1);
132+
133+
assertEquals("Alice", nameField.getUTF8(0));
134+
assertEquals("Bob", nameField.getUTF8(1));
135+
assertEquals("Carol", nameField.getUTF8(2));
136+
assertEquals(30, ageField.getInt(0));
137+
assertEquals(25, ageField.getInt(1));
138+
assertEquals(40, ageField.getInt(2));
139+
}
140+
}
61141
}

vortex-jni/src/writer.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44
use std::io::Cursor;
55

66
use arrow_array::RecordBatch;
7+
use arrow_array::StructArray;
8+
use arrow_array::ffi::FFI_ArrowArray;
9+
use arrow_array::ffi::FFI_ArrowSchema;
710
use arrow_ipc::reader::StreamReader;
811
use futures::SinkExt;
912
use futures::channel::mpsc;
@@ -226,6 +229,40 @@ pub extern "system" fn Java_dev_vortex_jni_NativeWriterMethods_writeBatch<'local
226229
})
227230
}
228231

232+
/// Writes a batch to the Vortex file directly from Arrow C Data Interface pointers.
233+
///
234+
/// This avoids the IPC serialization/deserialization overhead of `writeBatch` by accepting
235+
/// raw Arrow FFI pointers directly.
236+
#[unsafe(no_mangle)]
237+
pub extern "system" fn Java_dev_vortex_jni_NativeWriterMethods_writeBatchFfi<'local>(
238+
mut env: JNIEnv<'local>,
239+
_class: JClass<'local>,
240+
writer_ptr: jlong,
241+
arrow_array_addr: jlong,
242+
arrow_schema_addr: jlong,
243+
) -> jboolean {
244+
if writer_ptr <= 0 {
245+
return JNI_FALSE;
246+
}
247+
248+
try_or_throw(&mut env, |_env| {
249+
let writer = unsafe { NativeWriter::from_ptr(writer_ptr) };
250+
251+
// Reconstruct FFI structs from the raw pointers provided by Java.
252+
let ffi_array =
253+
unsafe { FFI_ArrowArray::from_raw(arrow_array_addr as *mut FFI_ArrowArray) };
254+
let ffi_schema = unsafe { &*(arrow_schema_addr as *const FFI_ArrowSchema) };
255+
256+
let array_data = unsafe { arrow_array::ffi::from_ffi(ffi_array, ffi_schema) }
257+
.map_err(|e| JNIError::Vortex(vortex_err!("Failed to import Arrow FFI data: {}", e)))?;
258+
259+
let batch = RecordBatch::from(StructArray::from(array_data));
260+
writer.write_record_batch(batch)?;
261+
262+
Ok(JNI_TRUE)
263+
})
264+
}
265+
229266
/// Closes the writer
230267
#[unsafe(no_mangle)]
231268
pub extern "system" fn Java_dev_vortex_jni_NativeWriterMethods_close<'local>(

0 commit comments

Comments
 (0)