Skip to content

Commit 66aea7d

Browse files
authored
[vortex] Use nativeWriter.writeBatchFfi in VortexRecordsWriter (#7567)
1 parent d312694 commit 66aea7d

5 files changed

Lines changed: 36 additions & 10 deletions

File tree

paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -845,7 +845,7 @@ public void testStreamWriteFrequentCompactWithOverlappingKeys() throws Exception
845845
}
846846
write.compact(BinaryRow.EMPTY_ROW, 0, false);
847847
write.compact(BinaryRow.EMPTY_ROW, 1, false);
848-
commit.commit(commitId, write.prepareCommit(false, commitId));
848+
commit.commit(commitId, write.prepareCommit(true, commitId));
849849
commitId++;
850850
}
851851

paimon-vortex/paimon-vortex-format/src/main/java/org/apache/paimon/format/vortex/VortexRecordsWriter.java

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
package org.apache.paimon.format.vortex;
2020

2121
import org.apache.paimon.arrow.ArrowBundleRecords;
22+
import org.apache.paimon.arrow.ArrowUtils;
23+
import org.apache.paimon.arrow.vector.ArrowCStruct;
2224
import org.apache.paimon.arrow.vector.ArrowFormatWriter;
2325
import org.apache.paimon.data.InternalRow;
2426
import org.apache.paimon.format.BundleFormatWriter;
@@ -28,15 +30,16 @@
2830

2931
import dev.vortex.api.DType;
3032
import dev.vortex.api.VortexWriter;
33+
import org.apache.arrow.c.ArrowArray;
34+
import org.apache.arrow.c.ArrowSchema;
35+
import org.apache.arrow.memory.BufferAllocator;
3136
import org.apache.arrow.vector.VectorSchemaRoot;
3237
import org.slf4j.Logger;
3338
import org.slf4j.LoggerFactory;
3439

3540
import java.io.IOException;
3641
import java.util.Map;
3742

38-
import static org.apache.paimon.arrow.ArrowUtils.serializeToIpc;
39-
4043
/** Vortex records writer. */
4144
public class VortexRecordsWriter implements BundleFormatWriter {
4245

@@ -45,7 +48,9 @@ public class VortexRecordsWriter implements BundleFormatWriter {
4548
private final ArrowFormatWriter arrowFormatWriter;
4649
private final VortexWriter nativeWriter;
4750
private final String path;
48-
private long bytesWritten = 0;
51+
private final ArrowArray arrowArray;
52+
private final ArrowSchema arrowSchema;
53+
private final BufferAllocator allocator;
4954
private long jniCost = 0;
5055

5156
public VortexRecordsWriter(
@@ -56,6 +61,9 @@ public VortexRecordsWriter(
5661
throws IOException {
5762
this.arrowFormatWriter = arrowFormatWriter;
5863
this.path = path.toUri().toString();
64+
this.allocator = arrowFormatWriter.getAllocator();
65+
this.arrowArray = ArrowArray.allocateNew(allocator);
66+
this.arrowSchema = ArrowSchema.allocateNew(allocator);
5967

6068
DType dtype = VortexTypeUtils.toDType(rowType);
6169
this.nativeWriter = VortexWriter.create(this.path, dtype, storageOptions);
@@ -84,9 +92,9 @@ public void writeBundle(BundleRecords bundleRecords) throws IOException {
8492

8593
@Override
8694
public boolean reachTargetSize(boolean suggestedCheck, long targetSize) {
87-
// Note: bytesWritten tracks Arrow IPC serialized bytes, not the actual Vortex file size
88-
// (which may differ due to Vortex's own compression/encoding).
89-
return suggestedCheck && (bytesWritten > targetSize);
95+
// Vortex applies its own compression/encoding, so in-memory Arrow size is much larger
96+
// than the actual file size on disk. Always return false to avoid rolling into small files.
97+
return false;
9098
}
9199

92100
@Override
@@ -95,6 +103,8 @@ public void close() throws IOException {
95103
LOG.info("Jni cost: {}ms for file: {}", jniCost, path);
96104
long t1 = System.currentTimeMillis();
97105
nativeWriter.close();
106+
arrowArray.close();
107+
arrowSchema.close();
98108
arrowFormatWriter.close();
99109
long closeCost = (System.currentTimeMillis() - t1);
100110
LOG.info("Close cost: {}ms for file: {}", closeCost, path);
@@ -109,10 +119,10 @@ private void flush() throws IOException {
109119
}
110120

111121
private void writeVsr(VectorSchemaRoot vsr) throws IOException {
112-
byte[] arrowData = serializeToIpc(vsr);
113-
bytesWritten += arrowData.length;
122+
ArrowCStruct cStruct =
123+
ArrowUtils.serializeToCStruct(vsr, arrowArray, arrowSchema, allocator);
114124
long t1 = System.currentTimeMillis();
115-
nativeWriter.writeBatch(arrowData);
125+
nativeWriter.writeBatchFfi(cStruct.arrayAddress(), cStruct.schemaAddress());
116126
jniCost += (System.currentTimeMillis() - t1);
117127
}
118128
}

paimon-vortex/paimon-vortex-jni/src/main/java/dev/vortex/api/VortexWriter.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ static VortexWriter create(String uri, DType dtype, Map<String, String> options)
4040

4141
void writeBatch(byte[] arrowData) throws IOException;
4242

43+
void writeBatchFfi(long arrowArrayAddr, long arrowSchemaAddr) throws IOException;
44+
4345
@Override
4446
void close() throws IOException;
4547
}

paimon-vortex/paimon-vortex-jni/src/main/java/dev/vortex/jni/JNIWriter.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,17 @@ public void writeBatch(byte[] arrowData) throws IOException {
4747
}
4848
}
4949

50+
@Override
51+
public void writeBatchFfi(long arrowArrayAddr, long arrowSchemaAddr) throws IOException {
52+
logger.trace("Writing batch via FFI with arrayAddr={}, schemaAddr={}", arrowArrayAddr, arrowSchemaAddr);
53+
54+
boolean success = NativeWriterMethods.writeBatchFfi(ptr.getAsLong(), arrowArrayAddr, arrowSchemaAddr);
55+
if (!success) {
56+
logger.error("Failed to write batch via FFI to Vortex file");
57+
throw new IOException("Failed to write batch via FFI to Vortex file");
58+
}
59+
}
60+
5061
@Override
5162
public void close() {
5263
if (!this.ptr.isPresent()) {

paimon-vortex/paimon-vortex-jni/src/main/java/dev/vortex/jni/NativeWriterMethods.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,5 +33,8 @@ private NativeWriterMethods() {}
3333

3434
public static native boolean writeBatch(long writerPtr, byte[] arrowData);
3535

36+
public static native boolean writeBatchFfi(
37+
long writerPtr, long arrowArrayAddr, long arrowSchemaAddr);
38+
3639
public static native void close(long writerPtr);
3740
}

0 commit comments

Comments
 (0)