diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index f2e90e5dcef1..42c574d60477 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -534,7 +534,7 @@
full-compaction.delta-commits
(none) Integer - Full compaction will be constantly triggered after delta commits. + For streaming write, full compaction will be constantly triggered after delta commits. For batch write, full compaction will be triggered with each commit as long as this value is greater than 0.
ignore-delete
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index e2046da9262a..44e4c48f34ce 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -1204,7 +1204,8 @@ public InlineElement getDescription() { .intType() .noDefaultValue() .withDescription( - "Full compaction will be constantly triggered after delta commits."); + "For streaming write, full compaction will be constantly triggered after delta commits. " + + "For batch write, full compaction will be triggered with each commit as long as this value is greater than 0."); @ExcludeFromDocumentation("Internal use only") public static final ConfigOption STREAM_SCAN_MODE = @@ -2788,6 +2789,11 @@ public String consumerId() { return consumerId; } + @Nullable + public Integer fullCompactionDeltaCommits() { + return options.get(FULL_COMPACTION_DELTA_COMMITS); + } + public static StreamingReadMode streamReadType(Options options) { return options.get(STREAMING_READ_MODE); } diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index 905d8ad64749..a05d198ffa99 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -60,7 +60,6 @@ import static org.apache.paimon.CoreOptions.DEFAULT_AGG_FUNCTION; import static org.apache.paimon.CoreOptions.FIELDS_PREFIX; import static org.apache.paimon.CoreOptions.FIELDS_SEPARATOR; -import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS; import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN; import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP; import static org.apache.paimon.CoreOptions.INCREMENTAL_TO_AUTO_TAG; @@ -563,8 +562,7 @@ private static void validateBucket(TableSchema schema, CoreOptions options) { "Cannot define 'bucket-key' with bucket = -1, please remove the 'bucket-key' setting or specify a bucket number."); } - if (schema.primaryKeys().isEmpty() - && options.toMap().get(FULL_COMPACTION_DELTA_COMMITS.key()) != null) { + if (schema.primaryKeys().isEmpty() && options.fullCompactionDeltaCommits() != null) { throw new RuntimeException( "AppendOnlyTable of unaware or dynamic bucket does not support 'full-compaction.delta-commits'"); } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTableWrite.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTableWrite.scala index 704a6c2d5bde..5169e6ba764b 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTableWrite.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTableWrite.scala @@ -20,11 +20,11 @@ package org.apache.paimon.spark import org.apache.paimon.disk.IOManager import org.apache.paimon.spark.util.SparkRowUtils -import org.apache.paimon.table.sink.{BatchTableWrite, BatchWriteBuilder, CommitMessageImpl, CommitMessageSerializer} +import org.apache.paimon.spark.write.DataWriteHelper +import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessageImpl, CommitMessageSerializer, TableWriteImpl} import org.apache.paimon.types.RowType -import org.apache.spark.TaskContext -import org.apache.spark.sql.{PaimonUtils, Row} +import org.apache.spark.sql.Row import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer @@ -33,18 +33,21 @@ case class SparkTableWrite( writeBuilder: BatchWriteBuilder, writeType: RowType, rowKindColIdx: Int = -1, - writeRowTracking: Boolean = false) - extends SparkTableWriteTrait { + writeRowTracking: Boolean = false, + fullCompactionDeltaCommits: Option[Int], + batchId: Long) + extends SparkTableWriteTrait + with DataWriteHelper { private val ioManager: IOManager = SparkUtils.createIOManager - private val write: BatchTableWrite = { + val write: TableWriteImpl[Row] = { val _write = writeBuilder.newWrite() _write.withIOManager(ioManager) if (writeRowTracking) { _write.withWriteType(writeType) } - _write + _write.asInstanceOf[TableWriteImpl[Row]] } private val toPaimonRow = { @@ -52,14 +55,15 @@ case class SparkTableWrite( } def write(row: Row): Unit = { - write.write(toPaimonRow(row)) + postWrite(write.writeAndReturn(toPaimonRow(row))) } def write(row: Row, bucket: Int): Unit = { - write.write(toPaimonRow(row), bucket) + postWrite(write.writeAndReturn(toPaimonRow(row), bucket)) } def finish(): Iterator[Array[Byte]] = { + preFinish() var bytesWritten = 0L var recordsWritten = 0L val commitMessages = new ListBuffer[Array[Byte]]() diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala index 246245c0527a..0260df1db728 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala @@ -51,13 +51,19 @@ import java.util.Collections.singletonMap import scala.collection.JavaConverters._ -case class PaimonSparkWriter(table: FileStoreTable, writeRowTracking: Boolean = false) +case class PaimonSparkWriter( + table: FileStoreTable, + writeRowTracking: Boolean = false, + batchId: Long = -1) extends WriteHelper { private lazy val tableSchema = table.schema private lazy val bucketMode = table.bucketMode + private val fullCompactionDeltaCommits: Option[Int] = + Option.apply(coreOptions.fullCompactionDeltaCommits()) + @transient private lazy val serializer = new CommitMessageSerializer private val writeType = { @@ -98,7 +104,13 @@ case class PaimonSparkWriter(table: FileStoreTable, writeRowTracking: Boolean = val bucketColIdx = SparkRowUtils.getFieldIndex(withInitBucketCol.schema, BUCKET_COL) val encoderGroupWithBucketCol = EncoderSerDeGroup(withInitBucketCol.schema) - def newWrite() = SparkTableWrite(writeBuilder, writeType, rowKindColIdx, writeRowTracking) + def newWrite() = SparkTableWrite( + writeBuilder, + writeType, + rowKindColIdx, + writeRowTracking, + fullCompactionDeltaCommits, + batchId) def sparkParallelism = { val defaultParallelism = sparkSession.sparkContext.defaultParallelism diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala index 5a9b36e269eb..2056590ead18 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala @@ -36,7 +36,8 @@ case class WriteIntoPaimonTable( override val originTable: FileStoreTable, saveMode: SaveMode, _data: DataFrame, - options: Options) + options: Options, + batchId: Long = -1) extends RunnableCommand with ExpressionHelper with SchemaHelper @@ -50,7 +51,7 @@ case class WriteIntoPaimonTable( updateTableWithOptions( Map(DYNAMIC_PARTITION_OVERWRITE.key -> dynamicPartitionOverwriteMode.toString)) - val writer = PaimonSparkWriter(table) + val writer = PaimonSparkWriter(table, batchId = batchId) if (overwritePartition != null) { writer.writeBuilder.withOverwrite(overwritePartition.asJava) } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSink.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSink.scala index 3387a536ab6a..1c51208089dd 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSink.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSink.scala @@ -43,8 +43,8 @@ class PaimonSink( } else { InsertInto } - partitionColumns.foreach(println) val newData = PaimonUtils.createNewDataFrame(data) - WriteIntoPaimonTable(originTable, saveMode, newData, options).run(sqlContext.sparkSession) + WriteIntoPaimonTable(originTable, saveMode, newData, options, batchId).run( + sqlContext.sparkSession) } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataWriteHelper.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataWriteHelper.scala new file mode 100644 index 000000000000..6334eebe69b7 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataWriteHelper.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.write + +import org.apache.paimon.data.BinaryRow +import org.apache.paimon.table.sink.{BatchTableWrite, SinkRecord} + +import org.apache.spark.internal.Logging + +import scala.collection.mutable + +trait DataWriteHelper extends Logging { + + val write: BatchTableWrite + + val fullCompactionDeltaCommits: Option[Int] + + /** + * For batch write, batchId is -1, for streaming write, batchId is the current batch id (>= 0). + */ + val batchId: Long + + private val needFullCompaction: Boolean = { + fullCompactionDeltaCommits match { + case Some(deltaCommits) => + deltaCommits > 0 && (batchId == -1 || (batchId + 1) % deltaCommits == 0) + case None => false + } + } + + private val writtenBuckets = mutable.Set[(BinaryRow, Integer)]() + + def postWrite(record: SinkRecord): Unit = { + if (record == null) { + return + } + + if (needFullCompaction && !writtenBuckets.contains((record.partition(), record.bucket()))) { + writtenBuckets.add((record.partition().copy(), record.bucket())) + } + } + + def preFinish(): Unit = { + if (needFullCompaction && writtenBuckets.nonEmpty) { + logInfo("Start to compact buckets: " + writtenBuckets) + writtenBuckets.foreach( + (bucket: (BinaryRow, Integer)) => { + write.compact(bucket._1, bucket._2, true) + }) + } + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala index 9eaa1bf72f26..62a383eb7c62 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala @@ -23,7 +23,7 @@ import org.apache.paimon.options.Options import org.apache.paimon.spark.{SparkInternalRowWrapper, SparkUtils} import org.apache.paimon.spark.commands.SchemaHelper import org.apache.paimon.table.FileStoreTable -import org.apache.paimon.table.sink.{BatchTableWrite, BatchWriteBuilder, CommitMessage, CommitMessageSerializer} +import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage, CommitMessageSerializer, TableWriteImpl} import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow @@ -103,8 +103,11 @@ private case class PaimonBatchWrite( builder } - override def createBatchWriterFactory(info: PhysicalWriteInfo): DataWriterFactory = - WriterFactory(writeSchema, dataSchema, batchWriteBuilder) + override def createBatchWriterFactory(info: PhysicalWriteInfo): DataWriterFactory = { + val fullCompactionDeltaCommits: Option[Int] = + Option.apply(coreOptions.fullCompactionDeltaCommits()) + WriterFactory(writeSchema, dataSchema, batchWriteBuilder, fullCompactionDeltaCommits) + } override def useCommitCoordinator(): Boolean = false @@ -139,23 +142,27 @@ private case class PaimonBatchWrite( private case class WriterFactory( writeSchema: StructType, dataSchema: StructType, - batchWriteBuilder: BatchWriteBuilder) + batchWriteBuilder: BatchWriteBuilder, + fullCompactionDeltaCommits: Option[Int]) extends DataWriterFactory { override def createWriter(partitionId: Int, taskId: Long): DataWriter[InternalRow] = { - val batchTableWrite = batchWriteBuilder.newWrite() - new PaimonDataWriter(batchTableWrite, writeSchema, dataSchema) + val batchTableWrite = batchWriteBuilder.newWrite().asInstanceOf[TableWriteImpl[InternalRow]] + PaimonDataWriter(batchTableWrite, writeSchema, dataSchema, fullCompactionDeltaCommits) } } -private class PaimonDataWriter( - batchTableWrite: BatchTableWrite, +private case class PaimonDataWriter( + write: TableWriteImpl[InternalRow], writeSchema: StructType, - dataSchema: StructType) - extends DataWriter[InternalRow] { + dataSchema: StructType, + fullCompactionDeltaCommits: Option[Int], + batchId: Long = -1) + extends DataWriter[InternalRow] + with DataWriteHelper { private val ioManager = SparkUtils.createIOManager() - batchTableWrite.withIOManager(ioManager) + write.withIOManager(ioManager) private val rowConverter: InternalRow => SparkInternalRowWrapper = { val numFields = writeSchema.fields.length @@ -164,12 +171,13 @@ private class PaimonDataWriter( } override def write(record: InternalRow): Unit = { - batchTableWrite.write(rowConverter.apply(record)) + postWrite(write.writeAndReturn(rowConverter.apply(record))) } override def commit(): WriterCommitMessage = { try { - val commitMessages = batchTableWrite.prepareCommit().asScala.toSeq + preFinish() + val commitMessages = write.prepareCommit().asScala.toSeq TaskCommit(commitMessages) } finally { close() @@ -180,7 +188,7 @@ private class PaimonDataWriter( override def close(): Unit = { try { - batchTableWrite.close() + write.close() ioManager.close() } catch { case e: Exception => throw new RuntimeException(e) diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala index 61bf5524942d..c43170d7ba1b 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala @@ -18,6 +18,8 @@ package org.apache.paimon.spark +import org.apache.paimon.Snapshot.CommitKind._ + import org.apache.spark.SparkConf import org.apache.spark.sql.{Dataset, Row} import org.apache.spark.sql.execution.streaming.MemoryStream @@ -283,4 +285,81 @@ class PaimonSinkTest extends PaimonSparkTestBase with StreamTest { } } } + + test("Paimon SinK: set full-compaction.delta-commits with batch write") { + for (useV2Write <- Seq("true", "false")) { + withSparkSQLConf("spark.paimon.write.use-v2-write" -> useV2Write) { + withTable("t") { + sql(""" + |CREATE TABLE t ( + | a INT, + | b INT + |) TBLPROPERTIES ( + | 'primary-key'='a', + | 'bucket'='1', + | 'full-compaction.delta-commits'='1' + |) + |""".stripMargin) + + sql("INSERT INTO t VALUES (1, 1)") + sql("INSERT INTO t VALUES (2, 2)") + checkAnswer(sql("SELECT * FROM t ORDER BY a"), Seq(Row(1, 1), Row(2, 2))) + assert(loadTable("t").snapshotManager().latestSnapshot().commitKind == COMPACT) + } + } + } + } + + test("Paimon SinK: set full-compaction.delta-commits with streaming write") { + failAfter(streamingTimeout) { + withTempDir { + checkpointDir => + spark.sql(s""" + |CREATE TABLE T (a INT, b INT) + |TBLPROPERTIES ( + | 'primary-key'='a', + | 'bucket'='1', + | 'full-compaction.delta-commits'='2' + |) + |""".stripMargin) + val table = loadTable("T") + val location = table.location().toString + + val inputData = MemoryStream[(Int, Int)] + val stream = inputData + .toDS() + .toDF("a", "b") + .writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .format("paimon") + .start(location) + + val query = () => spark.sql("SELECT * FROM T ORDER BY a") + + try { + inputData.addData((1, 1)) + stream.processAllAvailable() + checkAnswer(query(), Seq(Row(1, 1))) + assert(table.snapshotManager().latestSnapshot().commitKind == APPEND) + + inputData.addData((2, 1)) + stream.processAllAvailable() + checkAnswer(query(), Seq(Row(1, 1), Row(2, 1))) + assert(table.snapshotManager().latestSnapshot().commitKind == COMPACT) + + inputData.addData((2, 2)) + stream.processAllAvailable() + checkAnswer(query(), Seq(Row(1, 1), Row(2, 2))) + assert(table.snapshotManager().latestSnapshot().commitKind == APPEND) + + inputData.addData((3, 1)) + stream.processAllAvailable() + checkAnswer(query(), Seq(Row(1, 1), Row(2, 2), Row(3, 1))) + assert(table.snapshotManager().latestSnapshot().commitKind == COMPACT) + } finally { + stream.stop() + } + } + } + } }