diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index f2e90e5dcef1..42c574d60477 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -534,7 +534,7 @@
full-compaction.delta-commits |
(none) |
Integer |
- Full compaction will be constantly triggered after delta commits. |
+ For streaming write, full compaction will be constantly triggered after delta commits. For batch write, full compaction will be triggered with each commit as long as this value is greater than 0. |
ignore-delete |
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index e2046da9262a..44e4c48f34ce 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1204,7 +1204,8 @@ public InlineElement getDescription() {
.intType()
.noDefaultValue()
.withDescription(
- "Full compaction will be constantly triggered after delta commits.");
+ "For streaming write, full compaction will be constantly triggered after delta commits. "
+ + "For batch write, full compaction will be triggered with each commit as long as this value is greater than 0.");
@ExcludeFromDocumentation("Internal use only")
public static final ConfigOption STREAM_SCAN_MODE =
@@ -2788,6 +2789,11 @@ public String consumerId() {
return consumerId;
}
+ @Nullable
+ public Integer fullCompactionDeltaCommits() {
+ return options.get(FULL_COMPACTION_DELTA_COMMITS);
+ }
+
public static StreamingReadMode streamReadType(Options options) {
return options.get(STREAMING_READ_MODE);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 905d8ad64749..a05d198ffa99 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -60,7 +60,6 @@
import static org.apache.paimon.CoreOptions.DEFAULT_AGG_FUNCTION;
import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
import static org.apache.paimon.CoreOptions.FIELDS_SEPARATOR;
-import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP;
import static org.apache.paimon.CoreOptions.INCREMENTAL_TO_AUTO_TAG;
@@ -563,8 +562,7 @@ private static void validateBucket(TableSchema schema, CoreOptions options) {
"Cannot define 'bucket-key' with bucket = -1, please remove the 'bucket-key' setting or specify a bucket number.");
}
- if (schema.primaryKeys().isEmpty()
- && options.toMap().get(FULL_COMPACTION_DELTA_COMMITS.key()) != null) {
+ if (schema.primaryKeys().isEmpty() && options.fullCompactionDeltaCommits() != null) {
throw new RuntimeException(
"AppendOnlyTable of unaware or dynamic bucket does not support 'full-compaction.delta-commits'");
}
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTableWrite.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTableWrite.scala
index 704a6c2d5bde..5169e6ba764b 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTableWrite.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTableWrite.scala
@@ -20,11 +20,11 @@ package org.apache.paimon.spark
import org.apache.paimon.disk.IOManager
import org.apache.paimon.spark.util.SparkRowUtils
-import org.apache.paimon.table.sink.{BatchTableWrite, BatchWriteBuilder, CommitMessageImpl, CommitMessageSerializer}
+import org.apache.paimon.spark.write.DataWriteHelper
+import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessageImpl, CommitMessageSerializer, TableWriteImpl}
import org.apache.paimon.types.RowType
-import org.apache.spark.TaskContext
-import org.apache.spark.sql.{PaimonUtils, Row}
+import org.apache.spark.sql.Row
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
@@ -33,18 +33,21 @@ case class SparkTableWrite(
writeBuilder: BatchWriteBuilder,
writeType: RowType,
rowKindColIdx: Int = -1,
- writeRowTracking: Boolean = false)
- extends SparkTableWriteTrait {
+ writeRowTracking: Boolean = false,
+ fullCompactionDeltaCommits: Option[Int],
+ batchId: Long)
+ extends SparkTableWriteTrait
+ with DataWriteHelper {
private val ioManager: IOManager = SparkUtils.createIOManager
- private val write: BatchTableWrite = {
+ val write: TableWriteImpl[Row] = {
val _write = writeBuilder.newWrite()
_write.withIOManager(ioManager)
if (writeRowTracking) {
_write.withWriteType(writeType)
}
- _write
+ _write.asInstanceOf[TableWriteImpl[Row]]
}
private val toPaimonRow = {
@@ -52,14 +55,15 @@ case class SparkTableWrite(
}
def write(row: Row): Unit = {
- write.write(toPaimonRow(row))
+ postWrite(write.writeAndReturn(toPaimonRow(row)))
}
def write(row: Row, bucket: Int): Unit = {
- write.write(toPaimonRow(row), bucket)
+ postWrite(write.writeAndReturn(toPaimonRow(row), bucket))
}
def finish(): Iterator[Array[Byte]] = {
+ preFinish()
var bytesWritten = 0L
var recordsWritten = 0L
val commitMessages = new ListBuffer[Array[Byte]]()
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index 246245c0527a..0260df1db728 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -51,13 +51,19 @@ import java.util.Collections.singletonMap
import scala.collection.JavaConverters._
-case class PaimonSparkWriter(table: FileStoreTable, writeRowTracking: Boolean = false)
+case class PaimonSparkWriter(
+ table: FileStoreTable,
+ writeRowTracking: Boolean = false,
+ batchId: Long = -1)
extends WriteHelper {
private lazy val tableSchema = table.schema
private lazy val bucketMode = table.bucketMode
+ private val fullCompactionDeltaCommits: Option[Int] =
+ Option.apply(coreOptions.fullCompactionDeltaCommits())
+
@transient private lazy val serializer = new CommitMessageSerializer
private val writeType = {
@@ -98,7 +104,13 @@ case class PaimonSparkWriter(table: FileStoreTable, writeRowTracking: Boolean =
val bucketColIdx = SparkRowUtils.getFieldIndex(withInitBucketCol.schema, BUCKET_COL)
val encoderGroupWithBucketCol = EncoderSerDeGroup(withInitBucketCol.schema)
- def newWrite() = SparkTableWrite(writeBuilder, writeType, rowKindColIdx, writeRowTracking)
+ def newWrite() = SparkTableWrite(
+ writeBuilder,
+ writeType,
+ rowKindColIdx,
+ writeRowTracking,
+ fullCompactionDeltaCommits,
+ batchId)
def sparkParallelism = {
val defaultParallelism = sparkSession.sparkContext.defaultParallelism
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
index 5a9b36e269eb..2056590ead18 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
@@ -36,7 +36,8 @@ case class WriteIntoPaimonTable(
override val originTable: FileStoreTable,
saveMode: SaveMode,
_data: DataFrame,
- options: Options)
+ options: Options,
+ batchId: Long = -1)
extends RunnableCommand
with ExpressionHelper
with SchemaHelper
@@ -50,7 +51,7 @@ case class WriteIntoPaimonTable(
updateTableWithOptions(
Map(DYNAMIC_PARTITION_OVERWRITE.key -> dynamicPartitionOverwriteMode.toString))
- val writer = PaimonSparkWriter(table)
+ val writer = PaimonSparkWriter(table, batchId = batchId)
if (overwritePartition != null) {
writer.writeBuilder.withOverwrite(overwritePartition.asJava)
}
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSink.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSink.scala
index 3387a536ab6a..1c51208089dd 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSink.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSink.scala
@@ -43,8 +43,8 @@ class PaimonSink(
} else {
InsertInto
}
- partitionColumns.foreach(println)
val newData = PaimonUtils.createNewDataFrame(data)
- WriteIntoPaimonTable(originTable, saveMode, newData, options).run(sqlContext.sparkSession)
+ WriteIntoPaimonTable(originTable, saveMode, newData, options, batchId).run(
+ sqlContext.sparkSession)
}
}
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataWriteHelper.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataWriteHelper.scala
new file mode 100644
index 000000000000..6334eebe69b7
--- /dev/null
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataWriteHelper.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.write
+
+import org.apache.paimon.data.BinaryRow
+import org.apache.paimon.table.sink.{BatchTableWrite, SinkRecord}
+
+import org.apache.spark.internal.Logging
+
+import scala.collection.mutable
+
+trait DataWriteHelper extends Logging {
+
+ val write: BatchTableWrite
+
+ val fullCompactionDeltaCommits: Option[Int]
+
+ /**
+ * For batch write, batchId is -1, for streaming write, batchId is the current batch id (>= 0).
+ */
+ val batchId: Long
+
+ private val needFullCompaction: Boolean = {
+ fullCompactionDeltaCommits match {
+ case Some(deltaCommits) =>
+ deltaCommits > 0 && (batchId == -1 || (batchId + 1) % deltaCommits == 0)
+ case None => false
+ }
+ }
+
+ private val writtenBuckets = mutable.Set[(BinaryRow, Integer)]()
+
+ def postWrite(record: SinkRecord): Unit = {
+ if (record == null) {
+ return
+ }
+
+ if (needFullCompaction && !writtenBuckets.contains((record.partition(), record.bucket()))) {
+ writtenBuckets.add((record.partition().copy(), record.bucket()))
+ }
+ }
+
+ def preFinish(): Unit = {
+ if (needFullCompaction && writtenBuckets.nonEmpty) {
+ logInfo("Start to compact buckets: " + writtenBuckets)
+ writtenBuckets.foreach(
+ (bucket: (BinaryRow, Integer)) => {
+ write.compact(bucket._1, bucket._2, true)
+ })
+ }
+ }
+}
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
index 9eaa1bf72f26..62a383eb7c62 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
@@ -23,7 +23,7 @@ import org.apache.paimon.options.Options
import org.apache.paimon.spark.{SparkInternalRowWrapper, SparkUtils}
import org.apache.paimon.spark.commands.SchemaHelper
import org.apache.paimon.table.FileStoreTable
-import org.apache.paimon.table.sink.{BatchTableWrite, BatchWriteBuilder, CommitMessage, CommitMessageSerializer}
+import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage, CommitMessageSerializer, TableWriteImpl}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
@@ -103,8 +103,11 @@ private case class PaimonBatchWrite(
builder
}
- override def createBatchWriterFactory(info: PhysicalWriteInfo): DataWriterFactory =
- WriterFactory(writeSchema, dataSchema, batchWriteBuilder)
+ override def createBatchWriterFactory(info: PhysicalWriteInfo): DataWriterFactory = {
+ val fullCompactionDeltaCommits: Option[Int] =
+ Option.apply(coreOptions.fullCompactionDeltaCommits())
+ WriterFactory(writeSchema, dataSchema, batchWriteBuilder, fullCompactionDeltaCommits)
+ }
override def useCommitCoordinator(): Boolean = false
@@ -139,23 +142,27 @@ private case class PaimonBatchWrite(
private case class WriterFactory(
writeSchema: StructType,
dataSchema: StructType,
- batchWriteBuilder: BatchWriteBuilder)
+ batchWriteBuilder: BatchWriteBuilder,
+ fullCompactionDeltaCommits: Option[Int])
extends DataWriterFactory {
override def createWriter(partitionId: Int, taskId: Long): DataWriter[InternalRow] = {
- val batchTableWrite = batchWriteBuilder.newWrite()
- new PaimonDataWriter(batchTableWrite, writeSchema, dataSchema)
+ val batchTableWrite = batchWriteBuilder.newWrite().asInstanceOf[TableWriteImpl[InternalRow]]
+ PaimonDataWriter(batchTableWrite, writeSchema, dataSchema, fullCompactionDeltaCommits)
}
}
-private class PaimonDataWriter(
- batchTableWrite: BatchTableWrite,
+private case class PaimonDataWriter(
+ write: TableWriteImpl[InternalRow],
writeSchema: StructType,
- dataSchema: StructType)
- extends DataWriter[InternalRow] {
+ dataSchema: StructType,
+ fullCompactionDeltaCommits: Option[Int],
+ batchId: Long = -1)
+ extends DataWriter[InternalRow]
+ with DataWriteHelper {
private val ioManager = SparkUtils.createIOManager()
- batchTableWrite.withIOManager(ioManager)
+ write.withIOManager(ioManager)
private val rowConverter: InternalRow => SparkInternalRowWrapper = {
val numFields = writeSchema.fields.length
@@ -164,12 +171,13 @@ private class PaimonDataWriter(
}
override def write(record: InternalRow): Unit = {
- batchTableWrite.write(rowConverter.apply(record))
+ postWrite(write.writeAndReturn(rowConverter.apply(record)))
}
override def commit(): WriterCommitMessage = {
try {
- val commitMessages = batchTableWrite.prepareCommit().asScala.toSeq
+ preFinish()
+ val commitMessages = write.prepareCommit().asScala.toSeq
TaskCommit(commitMessages)
} finally {
close()
@@ -180,7 +188,7 @@ private class PaimonDataWriter(
override def close(): Unit = {
try {
- batchTableWrite.close()
+ write.close()
ioManager.close()
} catch {
case e: Exception => throw new RuntimeException(e)
diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
index 61bf5524942d..c43170d7ba1b 100644
--- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
+++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
@@ -18,6 +18,8 @@
package org.apache.paimon.spark
+import org.apache.paimon.Snapshot.CommitKind._
+
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.execution.streaming.MemoryStream
@@ -283,4 +285,81 @@ class PaimonSinkTest extends PaimonSparkTestBase with StreamTest {
}
}
}
+
+ test("Paimon SinK: set full-compaction.delta-commits with batch write") {
+ for (useV2Write <- Seq("true", "false")) {
+ withSparkSQLConf("spark.paimon.write.use-v2-write" -> useV2Write) {
+ withTable("t") {
+ sql("""
+ |CREATE TABLE t (
+ | a INT,
+ | b INT
+ |) TBLPROPERTIES (
+ | 'primary-key'='a',
+ | 'bucket'='1',
+ | 'full-compaction.delta-commits'='1'
+ |)
+ |""".stripMargin)
+
+ sql("INSERT INTO t VALUES (1, 1)")
+ sql("INSERT INTO t VALUES (2, 2)")
+ checkAnswer(sql("SELECT * FROM t ORDER BY a"), Seq(Row(1, 1), Row(2, 2)))
+ assert(loadTable("t").snapshotManager().latestSnapshot().commitKind == COMPACT)
+ }
+ }
+ }
+ }
+
+ test("Paimon SinK: set full-compaction.delta-commits with streaming write") {
+ failAfter(streamingTimeout) {
+ withTempDir {
+ checkpointDir =>
+ spark.sql(s"""
+ |CREATE TABLE T (a INT, b INT)
+ |TBLPROPERTIES (
+ | 'primary-key'='a',
+ | 'bucket'='1',
+ | 'full-compaction.delta-commits'='2'
+ |)
+ |""".stripMargin)
+ val table = loadTable("T")
+ val location = table.location().toString
+
+ val inputData = MemoryStream[(Int, Int)]
+ val stream = inputData
+ .toDS()
+ .toDF("a", "b")
+ .writeStream
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .format("paimon")
+ .start(location)
+
+ val query = () => spark.sql("SELECT * FROM T ORDER BY a")
+
+ try {
+ inputData.addData((1, 1))
+ stream.processAllAvailable()
+ checkAnswer(query(), Seq(Row(1, 1)))
+ assert(table.snapshotManager().latestSnapshot().commitKind == APPEND)
+
+ inputData.addData((2, 1))
+ stream.processAllAvailable()
+ checkAnswer(query(), Seq(Row(1, 1), Row(2, 1)))
+ assert(table.snapshotManager().latestSnapshot().commitKind == COMPACT)
+
+ inputData.addData((2, 2))
+ stream.processAllAvailable()
+ checkAnswer(query(), Seq(Row(1, 1), Row(2, 2)))
+ assert(table.snapshotManager().latestSnapshot().commitKind == APPEND)
+
+ inputData.addData((3, 1))
+ stream.processAllAvailable()
+ checkAnswer(query(), Seq(Row(1, 1), Row(2, 2), Row(3, 1)))
+ assert(table.snapshotManager().latestSnapshot().commitKind == COMPACT)
+ } finally {
+ stream.stop()
+ }
+ }
+ }
+ }
}