Skip to content

Commit 81ceea9

Browse files
authored
[spark] Spark write supports 'full-compaction.delta-commits' (#6364)
1 parent 60454a1 commit 81ceea9

10 files changed

Lines changed: 210 additions & 34 deletions

File tree

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -534,7 +534,7 @@
534534
<td><h5>full-compaction.delta-commits</h5></td>
535535
<td style="word-wrap: break-word;">(none)</td>
536536
<td>Integer</td>
537-
<td>Full compaction will be constantly triggered after delta commits.</td>
537+
<td>For streaming write, full compaction will be constantly triggered after delta commits. For batch write, full compaction will be triggered with each commit as long as this value is greater than 0.</td>
538538
</tr>
539539
<tr>
540540
<td><h5>ignore-delete</h5></td>

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1204,7 +1204,8 @@ public InlineElement getDescription() {
12041204
.intType()
12051205
.noDefaultValue()
12061206
.withDescription(
1207-
"Full compaction will be constantly triggered after delta commits.");
1207+
"For streaming write, full compaction will be constantly triggered after delta commits. "
1208+
+ "For batch write, full compaction will be triggered with each commit as long as this value is greater than 0.");
12081209

12091210
@ExcludeFromDocumentation("Internal use only")
12101211
public static final ConfigOption<StreamScanMode> STREAM_SCAN_MODE =
@@ -2788,6 +2789,11 @@ public String consumerId() {
27882789
return consumerId;
27892790
}
27902791

2792+
@Nullable
2793+
public Integer fullCompactionDeltaCommits() {
2794+
return options.get(FULL_COMPACTION_DELTA_COMMITS);
2795+
}
2796+
27912797
public static StreamingReadMode streamReadType(Options options) {
27922798
return options.get(STREAMING_READ_MODE);
27932799
}

paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@
6060
import static org.apache.paimon.CoreOptions.DEFAULT_AGG_FUNCTION;
6161
import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
6262
import static org.apache.paimon.CoreOptions.FIELDS_SEPARATOR;
63-
import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
6463
import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
6564
import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP;
6665
import static org.apache.paimon.CoreOptions.INCREMENTAL_TO_AUTO_TAG;
@@ -563,8 +562,7 @@ private static void validateBucket(TableSchema schema, CoreOptions options) {
563562
"Cannot define 'bucket-key' with bucket = -1, please remove the 'bucket-key' setting or specify a bucket number.");
564563
}
565564

566-
if (schema.primaryKeys().isEmpty()
567-
&& options.toMap().get(FULL_COMPACTION_DELTA_COMMITS.key()) != null) {
565+
if (schema.primaryKeys().isEmpty() && options.fullCompactionDeltaCommits() != null) {
568566
throw new RuntimeException(
569567
"AppendOnlyTable of unaware or dynamic bucket does not support 'full-compaction.delta-commits'");
570568
}

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTableWrite.scala

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@ package org.apache.paimon.spark
2020

2121
import org.apache.paimon.disk.IOManager
2222
import org.apache.paimon.spark.util.SparkRowUtils
23-
import org.apache.paimon.table.sink.{BatchTableWrite, BatchWriteBuilder, CommitMessageImpl, CommitMessageSerializer}
23+
import org.apache.paimon.spark.write.DataWriteHelper
24+
import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessageImpl, CommitMessageSerializer, TableWriteImpl}
2425
import org.apache.paimon.types.RowType
2526

26-
import org.apache.spark.TaskContext
27-
import org.apache.spark.sql.{PaimonUtils, Row}
27+
import org.apache.spark.sql.Row
2828

2929
import scala.collection.JavaConverters._
3030
import scala.collection.mutable.ListBuffer
@@ -33,33 +33,37 @@ case class SparkTableWrite(
3333
writeBuilder: BatchWriteBuilder,
3434
writeType: RowType,
3535
rowKindColIdx: Int = -1,
36-
writeRowTracking: Boolean = false)
37-
extends SparkTableWriteTrait {
36+
writeRowTracking: Boolean = false,
37+
fullCompactionDeltaCommits: Option[Int],
38+
batchId: Long)
39+
extends SparkTableWriteTrait
40+
with DataWriteHelper {
3841

3942
private val ioManager: IOManager = SparkUtils.createIOManager
4043

41-
private val write: BatchTableWrite = {
44+
val write: TableWriteImpl[Row] = {
4245
val _write = writeBuilder.newWrite()
4346
_write.withIOManager(ioManager)
4447
if (writeRowTracking) {
4548
_write.withWriteType(writeType)
4649
}
47-
_write
50+
_write.asInstanceOf[TableWriteImpl[Row]]
4851
}
4952

5053
private val toPaimonRow = {
5154
SparkRowUtils.toPaimonRow(writeType, rowKindColIdx)
5255
}
5356

5457
def write(row: Row): Unit = {
55-
write.write(toPaimonRow(row))
58+
postWrite(write.writeAndReturn(toPaimonRow(row)))
5659
}
5760

5861
def write(row: Row, bucket: Int): Unit = {
59-
write.write(toPaimonRow(row), bucket)
62+
postWrite(write.writeAndReturn(toPaimonRow(row), bucket))
6063
}
6164

6265
def finish(): Iterator[Array[Byte]] = {
66+
preFinish()
6367
var bytesWritten = 0L
6468
var recordsWritten = 0L
6569
val commitMessages = new ListBuffer[Array[Byte]]()

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,19 @@ import java.util.Collections.singletonMap
5151

5252
import scala.collection.JavaConverters._
5353

54-
case class PaimonSparkWriter(table: FileStoreTable, writeRowTracking: Boolean = false)
54+
case class PaimonSparkWriter(
55+
table: FileStoreTable,
56+
writeRowTracking: Boolean = false,
57+
batchId: Long = -1)
5558
extends WriteHelper {
5659

5760
private lazy val tableSchema = table.schema
5861

5962
private lazy val bucketMode = table.bucketMode
6063

64+
private val fullCompactionDeltaCommits: Option[Int] =
65+
Option.apply(coreOptions.fullCompactionDeltaCommits())
66+
6167
@transient private lazy val serializer = new CommitMessageSerializer
6268

6369
private val writeType = {
@@ -98,7 +104,13 @@ case class PaimonSparkWriter(table: FileStoreTable, writeRowTracking: Boolean =
98104
val bucketColIdx = SparkRowUtils.getFieldIndex(withInitBucketCol.schema, BUCKET_COL)
99105
val encoderGroupWithBucketCol = EncoderSerDeGroup(withInitBucketCol.schema)
100106

101-
def newWrite() = SparkTableWrite(writeBuilder, writeType, rowKindColIdx, writeRowTracking)
107+
def newWrite() = SparkTableWrite(
108+
writeBuilder,
109+
writeType,
110+
rowKindColIdx,
111+
writeRowTracking,
112+
fullCompactionDeltaCommits,
113+
batchId)
102114

103115
def sparkParallelism = {
104116
val defaultParallelism = sparkSession.sparkContext.defaultParallelism

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ case class WriteIntoPaimonTable(
3636
override val originTable: FileStoreTable,
3737
saveMode: SaveMode,
3838
_data: DataFrame,
39-
options: Options)
39+
options: Options,
40+
batchId: Long = -1)
4041
extends RunnableCommand
4142
with ExpressionHelper
4243
with SchemaHelper
@@ -50,7 +51,7 @@ case class WriteIntoPaimonTable(
5051
updateTableWithOptions(
5152
Map(DYNAMIC_PARTITION_OVERWRITE.key -> dynamicPartitionOverwriteMode.toString))
5253

53-
val writer = PaimonSparkWriter(table)
54+
val writer = PaimonSparkWriter(table, batchId = batchId)
5455
if (overwritePartition != null) {
5556
writer.writeBuilder.withOverwrite(overwritePartition.asJava)
5657
}

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSink.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ class PaimonSink(
4343
} else {
4444
InsertInto
4545
}
46-
partitionColumns.foreach(println)
4746
val newData = PaimonUtils.createNewDataFrame(data)
48-
WriteIntoPaimonTable(originTable, saveMode, newData, options).run(sqlContext.sparkSession)
47+
WriteIntoPaimonTable(originTable, saveMode, newData, options, batchId).run(
48+
sqlContext.sparkSession)
4949
}
5050
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.spark.write
20+
21+
import org.apache.paimon.data.BinaryRow
22+
import org.apache.paimon.table.sink.{BatchTableWrite, SinkRecord}
23+
24+
import org.apache.spark.internal.Logging
25+
26+
import scala.collection.mutable
27+
28+
trait DataWriteHelper extends Logging {
29+
30+
val write: BatchTableWrite
31+
32+
val fullCompactionDeltaCommits: Option[Int]
33+
34+
/**
35+
* For batch write, batchId is -1, for streaming write, batchId is the current batch id (>= 0).
36+
*/
37+
val batchId: Long
38+
39+
private val needFullCompaction: Boolean = {
40+
fullCompactionDeltaCommits match {
41+
case Some(deltaCommits) =>
42+
deltaCommits > 0 && (batchId == -1 || (batchId + 1) % deltaCommits == 0)
43+
case None => false
44+
}
45+
}
46+
47+
private val writtenBuckets = mutable.Set[(BinaryRow, Integer)]()
48+
49+
def postWrite(record: SinkRecord): Unit = {
50+
if (record == null) {
51+
return
52+
}
53+
54+
if (needFullCompaction && !writtenBuckets.contains((record.partition(), record.bucket()))) {
55+
writtenBuckets.add((record.partition().copy(), record.bucket()))
56+
}
57+
}
58+
59+
def preFinish(): Unit = {
60+
if (needFullCompaction && writtenBuckets.nonEmpty) {
61+
logInfo("Start to compact buckets: " + writtenBuckets)
62+
writtenBuckets.foreach(
63+
(bucket: (BinaryRow, Integer)) => {
64+
write.compact(bucket._1, bucket._2, true)
65+
})
66+
}
67+
}
68+
}

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.apache.paimon.options.Options
2323
import org.apache.paimon.spark.{SparkInternalRowWrapper, SparkUtils}
2424
import org.apache.paimon.spark.commands.SchemaHelper
2525
import org.apache.paimon.table.FileStoreTable
26-
import org.apache.paimon.table.sink.{BatchTableWrite, BatchWriteBuilder, CommitMessage, CommitMessageSerializer}
26+
import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage, CommitMessageSerializer, TableWriteImpl}
2727

2828
import org.apache.spark.internal.Logging
2929
import org.apache.spark.sql.catalyst.InternalRow
@@ -103,8 +103,11 @@ private case class PaimonBatchWrite(
103103
builder
104104
}
105105

106-
override def createBatchWriterFactory(info: PhysicalWriteInfo): DataWriterFactory =
107-
WriterFactory(writeSchema, dataSchema, batchWriteBuilder)
106+
override def createBatchWriterFactory(info: PhysicalWriteInfo): DataWriterFactory = {
107+
val fullCompactionDeltaCommits: Option[Int] =
108+
Option.apply(coreOptions.fullCompactionDeltaCommits())
109+
WriterFactory(writeSchema, dataSchema, batchWriteBuilder, fullCompactionDeltaCommits)
110+
}
108111

109112
override def useCommitCoordinator(): Boolean = false
110113

@@ -139,23 +142,27 @@ private case class PaimonBatchWrite(
139142
private case class WriterFactory(
140143
writeSchema: StructType,
141144
dataSchema: StructType,
142-
batchWriteBuilder: BatchWriteBuilder)
145+
batchWriteBuilder: BatchWriteBuilder,
146+
fullCompactionDeltaCommits: Option[Int])
143147
extends DataWriterFactory {
144148

145149
override def createWriter(partitionId: Int, taskId: Long): DataWriter[InternalRow] = {
146-
val batchTableWrite = batchWriteBuilder.newWrite()
147-
new PaimonDataWriter(batchTableWrite, writeSchema, dataSchema)
150+
val batchTableWrite = batchWriteBuilder.newWrite().asInstanceOf[TableWriteImpl[InternalRow]]
151+
PaimonDataWriter(batchTableWrite, writeSchema, dataSchema, fullCompactionDeltaCommits)
148152
}
149153
}
150154

151-
private class PaimonDataWriter(
152-
batchTableWrite: BatchTableWrite,
155+
private case class PaimonDataWriter(
156+
write: TableWriteImpl[InternalRow],
153157
writeSchema: StructType,
154-
dataSchema: StructType)
155-
extends DataWriter[InternalRow] {
158+
dataSchema: StructType,
159+
fullCompactionDeltaCommits: Option[Int],
160+
batchId: Long = -1)
161+
extends DataWriter[InternalRow]
162+
with DataWriteHelper {
156163

157164
private val ioManager = SparkUtils.createIOManager()
158-
batchTableWrite.withIOManager(ioManager)
165+
write.withIOManager(ioManager)
159166

160167
private val rowConverter: InternalRow => SparkInternalRowWrapper = {
161168
val numFields = writeSchema.fields.length
@@ -164,12 +171,13 @@ private class PaimonDataWriter(
164171
}
165172

166173
override def write(record: InternalRow): Unit = {
167-
batchTableWrite.write(rowConverter.apply(record))
174+
postWrite(write.writeAndReturn(rowConverter.apply(record)))
168175
}
169176

170177
override def commit(): WriterCommitMessage = {
171178
try {
172-
val commitMessages = batchTableWrite.prepareCommit().asScala.toSeq
179+
preFinish()
180+
val commitMessages = write.prepareCommit().asScala.toSeq
173181
TaskCommit(commitMessages)
174182
} finally {
175183
close()
@@ -180,7 +188,7 @@ private class PaimonDataWriter(
180188

181189
override def close(): Unit = {
182190
try {
183-
batchTableWrite.close()
191+
write.close()
184192
ioManager.close()
185193
} catch {
186194
case e: Exception => throw new RuntimeException(e)

0 commit comments

Comments
 (0)