diff --git a/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java b/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java index e96a18cdd6ac..e9ddc9e000eb 100644 --- a/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java @@ -845,7 +845,7 @@ public void testStreamWriteFrequentCompactWithOverlappingKeys() throws Exception } write.compact(BinaryRow.EMPTY_ROW, 0, false); write.compact(BinaryRow.EMPTY_ROW, 1, false); - commit.commit(commitId, write.prepareCommit(false, commitId)); + commit.commit(commitId, write.prepareCommit(true, commitId)); commitId++; } diff --git a/paimon-vortex/paimon-vortex-format/src/main/java/org/apache/paimon/format/vortex/VortexRecordsWriter.java b/paimon-vortex/paimon-vortex-format/src/main/java/org/apache/paimon/format/vortex/VortexRecordsWriter.java index 9691f4567059..170c0e81c26a 100644 --- a/paimon-vortex/paimon-vortex-format/src/main/java/org/apache/paimon/format/vortex/VortexRecordsWriter.java +++ b/paimon-vortex/paimon-vortex-format/src/main/java/org/apache/paimon/format/vortex/VortexRecordsWriter.java @@ -19,6 +19,8 @@ package org.apache.paimon.format.vortex; import org.apache.paimon.arrow.ArrowBundleRecords; +import org.apache.paimon.arrow.ArrowUtils; +import org.apache.paimon.arrow.vector.ArrowCStruct; import org.apache.paimon.arrow.vector.ArrowFormatWriter; import org.apache.paimon.data.InternalRow; import org.apache.paimon.format.BundleFormatWriter; @@ -28,6 +30,9 @@ import dev.vortex.api.DType; import dev.vortex.api.VortexWriter; +import org.apache.arrow.c.ArrowArray; +import org.apache.arrow.c.ArrowSchema; +import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.VectorSchemaRoot; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,8 +40,6 @@ import java.io.IOException; import java.util.Map; -import static org.apache.paimon.arrow.ArrowUtils.serializeToIpc; - /** Vortex records writer. */ public class VortexRecordsWriter implements BundleFormatWriter { @@ -45,7 +48,9 @@ public class VortexRecordsWriter implements BundleFormatWriter { private final ArrowFormatWriter arrowFormatWriter; private final VortexWriter nativeWriter; private final String path; - private long bytesWritten = 0; + private final ArrowArray arrowArray; + private final ArrowSchema arrowSchema; + private final BufferAllocator allocator; private long jniCost = 0; public VortexRecordsWriter( @@ -56,6 +61,9 @@ public VortexRecordsWriter( throws IOException { this.arrowFormatWriter = arrowFormatWriter; this.path = path.toUri().toString(); + this.allocator = arrowFormatWriter.getAllocator(); + this.arrowArray = ArrowArray.allocateNew(allocator); + this.arrowSchema = ArrowSchema.allocateNew(allocator); DType dtype = VortexTypeUtils.toDType(rowType); this.nativeWriter = VortexWriter.create(this.path, dtype, storageOptions); @@ -84,9 +92,9 @@ public void writeBundle(BundleRecords bundleRecords) throws IOException { @Override public boolean reachTargetSize(boolean suggestedCheck, long targetSize) { - // Note: bytesWritten tracks Arrow IPC serialized bytes, not the actual Vortex file size - // (which may differ due to Vortex's own compression/encoding). - return suggestedCheck && (bytesWritten > targetSize); + // Vortex applies its own compression/encoding, so in-memory Arrow size is much larger + // than the actual file size on disk. Always return false to avoid rolling into small files. + return false; } @Override @@ -95,6 +103,8 @@ public void close() throws IOException { LOG.info("Jni cost: {}ms for file: {}", jniCost, path); long t1 = System.currentTimeMillis(); nativeWriter.close(); + arrowArray.close(); + arrowSchema.close(); arrowFormatWriter.close(); long closeCost = (System.currentTimeMillis() - t1); LOG.info("Close cost: {}ms for file: {}", closeCost, path); @@ -109,10 +119,10 @@ private void flush() throws IOException { } private void writeVsr(VectorSchemaRoot vsr) throws IOException { - byte[] arrowData = serializeToIpc(vsr); - bytesWritten += arrowData.length; + ArrowCStruct cStruct = + ArrowUtils.serializeToCStruct(vsr, arrowArray, arrowSchema, allocator); long t1 = System.currentTimeMillis(); - nativeWriter.writeBatch(arrowData); + nativeWriter.writeBatchFfi(cStruct.arrayAddress(), cStruct.schemaAddress()); jniCost += (System.currentTimeMillis() - t1); } } diff --git a/paimon-vortex/paimon-vortex-jni/src/main/java/dev/vortex/api/VortexWriter.java b/paimon-vortex/paimon-vortex-jni/src/main/java/dev/vortex/api/VortexWriter.java index 4b2583782151..367739417b31 100644 --- a/paimon-vortex/paimon-vortex-jni/src/main/java/dev/vortex/api/VortexWriter.java +++ b/paimon-vortex/paimon-vortex-jni/src/main/java/dev/vortex/api/VortexWriter.java @@ -40,6 +40,8 @@ static VortexWriter create(String uri, DType dtype, Map options) void writeBatch(byte[] arrowData) throws IOException; + void writeBatchFfi(long arrowArrayAddr, long arrowSchemaAddr) throws IOException; + @Override void close() throws IOException; } diff --git a/paimon-vortex/paimon-vortex-jni/src/main/java/dev/vortex/jni/JNIWriter.java b/paimon-vortex/paimon-vortex-jni/src/main/java/dev/vortex/jni/JNIWriter.java index 260cc8271394..edf02c0bafbe 100644 --- a/paimon-vortex/paimon-vortex-jni/src/main/java/dev/vortex/jni/JNIWriter.java +++ b/paimon-vortex/paimon-vortex-jni/src/main/java/dev/vortex/jni/JNIWriter.java @@ -47,6 +47,17 @@ public void writeBatch(byte[] arrowData) throws IOException { } } + @Override + public void writeBatchFfi(long arrowArrayAddr, long arrowSchemaAddr) throws IOException { + logger.trace("Writing batch via FFI with arrayAddr={}, schemaAddr={}", arrowArrayAddr, arrowSchemaAddr); + + boolean success = NativeWriterMethods.writeBatchFfi(ptr.getAsLong(), arrowArrayAddr, arrowSchemaAddr); + if (!success) { + logger.error("Failed to write batch via FFI to Vortex file"); + throw new IOException("Failed to write batch via FFI to Vortex file"); + } + } + @Override public void close() { if (!this.ptr.isPresent()) { diff --git a/paimon-vortex/paimon-vortex-jni/src/main/java/dev/vortex/jni/NativeWriterMethods.java b/paimon-vortex/paimon-vortex-jni/src/main/java/dev/vortex/jni/NativeWriterMethods.java index 50a1bd3ed46c..e79cac294e92 100644 --- a/paimon-vortex/paimon-vortex-jni/src/main/java/dev/vortex/jni/NativeWriterMethods.java +++ b/paimon-vortex/paimon-vortex-jni/src/main/java/dev/vortex/jni/NativeWriterMethods.java @@ -33,5 +33,8 @@ private NativeWriterMethods() {} public static native boolean writeBatch(long writerPtr, byte[] arrowData); + public static native boolean writeBatchFfi( + long writerPtr, long arrowArrayAddr, long arrowSchemaAddr); + public static native void close(long writerPtr); }