Skip to content

Commit e02f035

Browse files
authored
[core] spark: support format table write (#6365)
1 parent ae5b7be commit e02f035

8 files changed

Lines changed: 225 additions & 46 deletions

File tree

paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStream.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,8 @@ private void uploadPart() throws IOException {
158158

159159
private static class MultiPartUploadCommitter<T, C> implements Committer {
160160

161+
private static final long serialVersionUID = 1L;
162+
161163
private final MultiPartUploadStore<T, C> multiPartUploadStore;
162164
private final String uploadId;
163165
private final String objectName;

paimon-common/src/main/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStream.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@ private Path generateTempPath(Path targetPath) {
9696
/** Committer implementation that renames temporary file to target path. */
9797
private static class TempFileCommitter implements Committer {
9898

99+
private static final long serialVersionUID = 1L;
100+
99101
private final FileIO fileIO;
100102
private final Path tempPath;
101103
private final Path targetPath;

paimon-common/src/main/java/org/apache/paimon/fs/TwoPhaseOutputStream.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.paimon.fs;
2020

2121
import java.io.IOException;
22+
import java.io.Serializable;
2223

2324
/** TwoPhaseOutputStream provides a way to write to a file and get a committer that can commit. */
2425
public abstract class TwoPhaseOutputStream extends PositionOutputStream {
@@ -35,7 +36,7 @@ public abstract class TwoPhaseOutputStream extends PositionOutputStream {
3536
public abstract Committer closeForCommit() throws IOException;
3637

3738
/** A committer interface that can commit or discard the written data. */
38-
public interface Committer {
39+
public interface Committer extends Serializable {
3940

4041
/**
4142
* Commits the written data, making it visible.

paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -583,22 +583,27 @@ public void testFormatTableOnlyPartitionValueRead() throws Exception {
583583
Random random = new Random();
584584
String dbName = "test_db";
585585
catalog.createDatabase(dbName, true);
586-
HadoopCompressionType compressionType = HadoopCompressionType.GZIP;
587586
Schema.Builder schemaBuilder = Schema.newBuilder();
588587
schemaBuilder.column("f1", DataTypes.INT());
589588
schemaBuilder.column("f2", DataTypes.INT());
590589
schemaBuilder.column("dt", DataTypes.INT());
591590
schemaBuilder.column("dt2", DataTypes.VARCHAR(64));
592591
schemaBuilder.partitionKeys("dt", "dt2");
593592
schemaBuilder.option("type", "format-table");
594-
schemaBuilder.option("file.compression", compressionType.value());
595593
schemaBuilder.option("format-table.partition-path-only-value", "true");
596-
String[] formats = {"csv", "parquet", "json"};
594+
Pair[] format2Compressions = {
595+
Pair.of("csv", HadoopCompressionType.GZIP),
596+
Pair.of("parquet", HadoopCompressionType.ZSTD),
597+
Pair.of("json", HadoopCompressionType.GZIP),
598+
Pair.of("orc", HadoopCompressionType.ZSTD)
599+
};
597600
int dtPartitionValue = 10;
598601
String dt2PartitionValue = "2022-01-01";
599-
for (String format : formats) {
600-
Identifier identifier = Identifier.create(dbName, "partition_table_" + format);
601-
schemaBuilder.option("file.format", format);
602+
for (Pair<String, HadoopCompressionType> format2Compression : format2Compressions) {
603+
Identifier identifier =
604+
Identifier.create(dbName, "partition_table_" + format2Compression.getKey());
605+
schemaBuilder.option("file.compression", format2Compression.getValue().value());
606+
schemaBuilder.option("file.format", format2Compression.getKey());
602607
catalog.createTable(identifier, schemaBuilder.build(), true);
603608
FormatTable table = (FormatTable) catalog.getTable(identifier);
604609
int size = 5;
@@ -619,7 +624,7 @@ public void testFormatTableOnlyPartitionValueRead() throws Exception {
619624
partitionSpec.put("dt2", dt2PartitionValue + 1);
620625
List<InternalRow> readFilterData = read(table, null, null, partitionSpec, null);
621626
assertThat(readFilterData).isEmpty();
622-
catalog.dropTable(Identifier.create(dbName, format), true);
627+
catalog.dropTable(identifier, true);
623628
}
624629
}
625630

@@ -633,21 +638,26 @@ public void testFormatTableReadAndWrite(boolean partitioned) throws Exception {
633638
String dbName = "test_db";
634639
catalog.createDatabase(dbName, true);
635640
int partitionValue = 10;
636-
HadoopCompressionType compressionType = HadoopCompressionType.GZIP;
637641
Schema.Builder schemaBuilder = Schema.newBuilder();
638642
schemaBuilder.column("f1", DataTypes.INT());
639643
schemaBuilder.column("f2", DataTypes.INT());
640644
schemaBuilder.column("dt", DataTypes.INT());
641645
schemaBuilder.option("type", "format-table");
642646
schemaBuilder.option("target-file-size", "1 kb");
643-
schemaBuilder.option("file.compression", compressionType.value());
644-
String[] formats = {"csv", "parquet", "json"};
645-
for (String format : formats) {
647+
Pair[] format2Compressions = {
648+
Pair.of("csv", HadoopCompressionType.GZIP),
649+
Pair.of("parquet", HadoopCompressionType.ZSTD),
650+
Pair.of("json", HadoopCompressionType.GZIP),
651+
Pair.of("orc", HadoopCompressionType.ZSTD)
652+
};
653+
for (Pair<String, HadoopCompressionType> format2Compression : format2Compressions) {
646654
if (partitioned) {
647655
schemaBuilder.partitionKeys("dt");
648656
}
649-
Identifier identifier = Identifier.create(dbName, "table_" + format);
650-
schemaBuilder.option("file.format", format);
657+
Identifier identifier =
658+
Identifier.create(dbName, "table_" + format2Compression.getKey());
659+
schemaBuilder.option("file.format", format2Compression.getKey());
660+
schemaBuilder.option("file.compression", format2Compression.getValue().value());
651661
catalog.createTable(identifier, schemaBuilder.build(), true);
652662
FormatTable table = (FormatTable) catalog.getTable(identifier);
653663
int[] projection = new int[] {1, 2};
@@ -696,7 +706,7 @@ public void testFormatTableReadAndWrite(boolean partitioned) throws Exception {
696706
read(table, partitionFilterPredicate, projection, null, null);
697707
assertThat(readPartitionAndNoPartitionFilterData).hasSize(size);
698708
}
699-
catalog.dropTable(Identifier.create(dbName, format), true);
709+
catalog.dropTable(identifier, true);
700710
}
701711
}
702712

paimon-format/src/main/java/org/apache/paimon/format/orc/writer/RowDataVectorizer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public RowDataVectorizer(
5353
@Override
5454
public void vectorize(InternalRow row, VectorizedRowBatch batch) {
5555
int rowId = batch.size++;
56-
for (int i = 0; i < row.getFieldCount(); ++i) {
56+
for (int i = 0; i < fieldNames.length; ++i) {
5757
ColumnVector fieldColumn = batch.cols[i];
5858
if (row.isNullAt(i)) {
5959
if (!isNullable[i]) {

paimon-format/src/main/java/org/apache/paimon/format/text/HadoopCompressionUtils.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.paimon.fs.PositionOutputStream;
2424
import org.apache.paimon.fs.SeekableInputStream;
2525

26+
import org.apache.hadoop.conf.Configurable;
2627
import org.apache.hadoop.conf.Configuration;
2728
import org.apache.hadoop.io.compress.CompressionCodec;
2829
import org.apache.hadoop.io.compress.CompressionCodecFactory;
@@ -101,6 +102,12 @@ public static Optional<CompressionCodec> getCompressionCodecByCompression(String
101102
Class<?> codecClass = Class.forName(codecName);
102103
CompressionCodec codec =
103104
(CompressionCodec) codecClass.getDeclaredConstructor().newInstance();
105+
106+
// To fix npe when the codec implements Configurable
107+
if (codec instanceof Configurable) {
108+
((Configurable) codec).setConf(new Configuration());
109+
}
110+
104111
codec.createOutputStream(new java.io.ByteArrayOutputStream());
105112
return Optional.of(codec);
106113
} catch (Exception | UnsatisfiedLinkError e) {

paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala

Lines changed: 147 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,22 @@
1818

1919
package org.apache.spark.sql.execution
2020

21-
import org.apache.paimon.spark.{PaimonFormatTableScanBuilder, SparkTypeUtils}
21+
import org.apache.paimon.fs.TwoPhaseOutputStream
22+
import org.apache.paimon.spark.{PaimonFormatTableScanBuilder, SparkInternalRowWrapper, SparkTypeUtils}
2223
import org.apache.paimon.table.FormatTable
24+
import org.apache.paimon.table.format.{FormatBatchWriteBuilder, TwoPhaseCommitMessage}
25+
import org.apache.paimon.table.sink.BatchTableWrite
2326

2427
import org.apache.hadoop.fs.Path
28+
import org.apache.spark.internal.Logging
2529
import org.apache.spark.sql.SparkSession
2630
import org.apache.spark.sql.catalyst.InternalRow
2731
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal}
2832
import org.apache.spark.sql.connector.catalog.{SupportsPartitionManagement, SupportsRead, SupportsWrite, TableCapability}
29-
import org.apache.spark.sql.connector.catalog.TableCapability.BATCH_READ
33+
import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ, BATCH_WRITE}
3034
import org.apache.spark.sql.connector.read.ScanBuilder
31-
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
35+
import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, LogicalWriteInfo, PhysicalWriteInfo, Write, WriteBuilder, WriterCommitMessage}
36+
import org.apache.spark.sql.connector.write.streaming.StreamingWrite
3237
import org.apache.spark.sql.execution.datasources._
3338
import org.apache.spark.sql.execution.datasources.v2.csv.{CSVScanBuilder, CSVTable}
3439
import org.apache.spark.sql.execution.datasources.v2.json.JsonTable
@@ -186,15 +191,15 @@ case class PaimonFormatTable(
186191
}
187192

188193
override def capabilities(): util.Set[TableCapability] = {
189-
util.EnumSet.of(BATCH_READ)
194+
util.EnumSet.of(BATCH_READ, BATCH_WRITE)
190195
}
191196

192197
override def newScanBuilder(caseInsensitiveStringMap: CaseInsensitiveStringMap): ScanBuilder = {
193198
PaimonFormatTableScanBuilder(table.copy(caseInsensitiveStringMap), schema, Seq.empty)
194199
}
195200

196201
override def newWriteBuilder(logicalWriteInfo: LogicalWriteInfo): WriteBuilder = {
197-
throw new UnsupportedOperationException()
202+
PaimonFormatTableWriterBuilder(table, schema)
198203
}
199204
}
200205

@@ -297,3 +302,140 @@ class PartitionedJsonTable(
297302
partitionSchema())
298303
}
299304
}
305+
306+
case class PaimonFormatTableWriterBuilder(table: FormatTable, writeSchema: StructType)
307+
extends WriteBuilder {
308+
override def build: Write = new Write() {
309+
override def toBatch: BatchWrite = {
310+
FormatTableBatchWrite(table, writeSchema)
311+
}
312+
313+
override def toStreaming: StreamingWrite = {
314+
throw new UnsupportedOperationException("FormatTable doesn't support streaming write")
315+
}
316+
}
317+
}
318+
319+
private case class FormatTableBatchWrite(table: FormatTable, writeSchema: StructType)
320+
extends BatchWrite
321+
with Logging {
322+
323+
override def createBatchWriterFactory(info: PhysicalWriteInfo): DataWriterFactory =
324+
FormatTableWriterFactory(table, writeSchema)
325+
326+
override def useCommitCoordinator(): Boolean = false
327+
328+
override def commit(messages: Array[WriterCommitMessage]): Unit = {
329+
logInfo(s"Committing to FormatTable ${table.name()}")
330+
331+
val committers = messages
332+
.collect {
333+
case taskCommit: FormatTableTaskCommit => taskCommit.committers()
334+
case other =>
335+
throw new IllegalArgumentException(s"${other.getClass.getName} is not supported")
336+
}
337+
.flatten
338+
.toSeq
339+
340+
try {
341+
val start = System.currentTimeMillis()
342+
committers.foreach(_.commit())
343+
logInfo(s"Committed in ${System.currentTimeMillis() - start} ms")
344+
} catch {
345+
case e: Exception =>
346+
logError("Failed to commit FormatTable writes", e)
347+
throw e
348+
}
349+
}
350+
351+
override def abort(messages: Array[WriterCommitMessage]): Unit = {
352+
logInfo(s"Aborting write to FormatTable ${table.name()}")
353+
val committers = messages.collect {
354+
case taskCommit: FormatTableTaskCommit => taskCommit.committers()
355+
}.flatten
356+
357+
committers.foreach {
358+
committer =>
359+
try {
360+
committer.discard()
361+
} catch {
362+
case e: Exception => logWarning(s"Failed to abort committer: ${e.getMessage}")
363+
}
364+
}
365+
}
366+
}
367+
368+
private case class FormatTableWriterFactory(table: FormatTable, writeSchema: StructType)
369+
extends DataWriterFactory {
370+
371+
override def createWriter(partitionId: Int, taskId: Long): DataWriter[InternalRow] = {
372+
val formatTableWrite = table.newBatchWriteBuilder().newWrite()
373+
new FormatTableDataWriter(table, formatTableWrite, writeSchema)
374+
}
375+
}
376+
377+
private class FormatTableDataWriter(
378+
table: FormatTable,
379+
formatTableWrite: BatchTableWrite,
380+
writeSchema: StructType)
381+
extends DataWriter[InternalRow]
382+
with Logging {
383+
384+
private val rowConverter: InternalRow => org.apache.paimon.data.InternalRow = {
385+
val numFields = writeSchema.fields.length
386+
record => {
387+
new SparkInternalRowWrapper(-1, writeSchema, numFields).replace(record)
388+
}
389+
}
390+
391+
override def write(record: InternalRow): Unit = {
392+
val paimonRow = rowConverter.apply(record)
393+
formatTableWrite.write(paimonRow)
394+
}
395+
396+
override def commit(): WriterCommitMessage = {
397+
try {
398+
val committers = formatTableWrite
399+
.prepareCommit()
400+
.asScala
401+
.map {
402+
case committer: TwoPhaseCommitMessage => committer.getCommitter
403+
case other =>
404+
throw new IllegalArgumentException(
405+
"Unsupported commit message type: " + other.getClass.getSimpleName)
406+
}
407+
.toSeq
408+
FormatTableTaskCommit(committers)
409+
} finally {
410+
close()
411+
}
412+
}
413+
414+
override def abort(): Unit = {
415+
logInfo("Aborting FormatTable data writer")
416+
close()
417+
}
418+
419+
override def close(): Unit = {
420+
try {
421+
formatTableWrite.close()
422+
} catch {
423+
case e: Exception =>
424+
logError("Error closing FormatTableDataWriter", e)
425+
throw new RuntimeException(e)
426+
}
427+
}
428+
}
429+
430+
/** Commit message container for FormatTable writes, holding committers that need to be executed. */
431+
class FormatTableTaskCommit private (private val _committers: Seq[TwoPhaseOutputStream.Committer])
432+
extends WriterCommitMessage {
433+
434+
def committers(): Seq[TwoPhaseOutputStream.Committer] = _committers
435+
}
436+
437+
object FormatTableTaskCommit {
438+
def apply(committers: Seq[TwoPhaseOutputStream.Committer]): FormatTableTaskCommit = {
439+
new FormatTableTaskCommit(committers)
440+
}
441+
}

0 commit comments

Comments
 (0)