Skip to content

Commit 9df6045

Browse files
committed
[core] Fix in RowDataFileWriter
1 parent 4780564 commit 9df6045

File tree

4 files changed

+81
-13
lines changed

4 files changed

+81
-13
lines changed

paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.paimon.manifest.FileSource;
2626
import org.apache.paimon.stats.SimpleStats;
2727
import org.apache.paimon.stats.SimpleStatsConverter;
28+
import org.apache.paimon.table.SpecialFields;
2829
import org.apache.paimon.types.RowType;
2930
import org.apache.paimon.utils.LongCounter;
3031
import org.apache.paimon.utils.Pair;
@@ -52,6 +53,10 @@ public class RowDataFileWriter extends StatsCollectingSingleFileWriter<InternalR
5253
@Nullable private final DataFileIndexWriter dataFileIndexWriter;
5354
private final FileSource fileSource;
5455
@Nullable private final List<String> writeCols;
56+
private final int seqNumberFieldIndex;
57+
private long minSeqNumber;
58+
private long maxSeqNumber;
59+
private boolean hasNullSeqNumber;
5560

5661
public RowDataFileWriter(
5762
FileIO fileIO,
@@ -76,6 +81,10 @@ public RowDataFileWriter(
7681
fileIO, dataFileToFileIndexPath(path), writeSchema, fileIndexOptions);
7782
this.fileSource = fileSource;
7883
this.writeCols = writeCols;
84+
this.seqNumberFieldIndex = writeSchema.getFieldIndex(SpecialFields.SEQUENCE_NUMBER.name());
85+
this.minSeqNumber = Long.MAX_VALUE;
86+
this.maxSeqNumber = Long.MIN_VALUE;
87+
this.hasNullSeqNumber = false;
7988
}
8089

8190
@Override
@@ -85,7 +94,7 @@ public void write(InternalRow row) throws IOException {
8594
if (dataFileIndexWriter != null) {
8695
dataFileIndexWriter.write(row);
8796
}
88-
seqNumCounter.add(1L);
97+
updateSeqNumber(row);
8998
}
9099

91100
@Override
@@ -111,8 +120,8 @@ public DataFileMeta result() throws IOException {
111120
fileSize,
112121
recordCount(),
113122
statsPair.getRight(),
114-
seqNumCounter.getValue() - super.recordCount(),
115-
seqNumCounter.getValue() - 1,
123+
minSeqNumber(),
124+
maxSeqNumber(),
116125
schemaId,
117126
indexResult.independentIndexFile() == null
118127
? Collections.emptyList()
@@ -124,4 +133,37 @@ public DataFileMeta result() throws IOException {
124133
null,
125134
writeCols);
126135
}
136+
137+
private long minSeqNumber() {
138+
if (seqNumberFieldIndex == -1) {
139+
return seqNumCounter.getValue() - super.recordCount();
140+
}
141+
// minSeqNumber stays at Long.MAX_VALUE when all records have null sequence numbers.
142+
// Returning 0 triggers RowTrackingCommitUtils.assignSnapshotId() to use snapshot ID.
143+
return minSeqNumber == Long.MAX_VALUE ? 0 : minSeqNumber;
144+
}
145+
146+
private long maxSeqNumber() {
147+
if (seqNumberFieldIndex == -1) {
148+
return seqNumCounter.getValue() - 1;
149+
}
150+
// When hasNullSeqNumber is true, some records have null sequence numbers.
151+
// Returning 0 triggers RowTrackingCommitUtils.assignSnapshotId() to use snapshot ID for
152+
// max.
153+
return hasNullSeqNumber ? 0 : maxSeqNumber;
154+
}
155+
156+
private void updateSeqNumber(InternalRow row) {
157+
seqNumCounter.add(1L);
158+
159+
// If sequence number field exists, extract min/max from row data
160+
if (seqNumberFieldIndex != -1 && !row.isNullAt(seqNumberFieldIndex)) {
161+
long seqNum = row.getLong(seqNumberFieldIndex);
162+
minSeqNumber = Math.min(minSeqNumber, seqNum);
163+
maxSeqNumber = Math.max(maxSeqNumber, seqNum);
164+
} else if (seqNumberFieldIndex != -1) {
165+
// Manifest will calculate the correct max based on snapshot id
166+
hasNullSeqNumber = true;
167+
}
168+
}
127169
}

paimon-core/src/main/java/org/apache/paimon/operation/commit/RowTrackingCommitUtils.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,20 @@ public static RowTrackingAssigned assignRowTracking(
5050
private static void assignSnapshotId(
5151
long snapshotId, List<ManifestEntry> deltaFiles, List<ManifestEntry> snapshotAssigned) {
5252
for (ManifestEntry entry : deltaFiles) {
53-
if (entry.file().minSequenceNumber() == 0L) {
53+
long minSeqNumber = entry.file().minSequenceNumber();
54+
long maxSeqNumber = entry.file().maxSequenceNumber();
55+
if (minSeqNumber == 0L) {
56+
// Case 1: New file (e.g., from INSERT)
57+
// All records in this file get the current snapshot ID as sequence number
5458
snapshotAssigned.add(entry.assignSequenceNumber(snapshotId, snapshotId));
59+
} else if (maxSeqNumber == 0L) {
60+
// Case 2: File with some modified records
61+
// - min: Preserve original sequence number (from unmodified records)
62+
// - max: Assign current snapshot ID
63+
snapshotAssigned.add(entry.assignSequenceNumber(minSeqNumber, snapshotId));
5564
} else {
65+
// Case 3: Pure compact file (no modified records)
66+
// Preserve original min/max sequence numbers from source files
5667
snapshotAssigned.add(entry);
5768
}
5869
}

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@
8787
import java.util.ArrayList;
8888
import java.util.Arrays;
8989
import java.util.Collections;
90-
import java.util.Comparator;
9190
import java.util.HashMap;
9291
import java.util.Iterator;
9392
import java.util.List;
@@ -460,13 +459,6 @@ private void compactUnAwareBucketTable(
460459
ser.deserialize(
461460
ser.getVersion(),
462461
taskIterator.next());
463-
if (coreOptions.rowTrackingEnabled()) {
464-
task.compactBefore()
465-
.sort(
466-
Comparator.comparingLong(
467-
DataFileMeta
468-
::minSequenceNumber));
469-
}
470462
messages.add(
471463
messageSer.serialize(
472464
task.doCompact(table, write)));

paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,10 +255,33 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase {
255255

256256
sql("INSERT INTO t VALUES (4, '4')")
257257
sql("INSERT INTO t VALUES (5, '5')")
258+
// snapshot 7: should merge files with sequence numbers [1, 6]
258259
sql("CALL sys.compact(table => 't')")
259260
checkAnswer(
260261
sql("SELECT min_sequence_number, max_sequence_number FROM `t$files`"),
261-
Seq(Row(1, 5))
262+
Seq(Row(1, 6))
263+
)
264+
// snapshot 8: Updated record has null sequence number
265+
sql("UPDATE t SET data = 22 WHERE id = 2")
266+
267+
// snapshot 9 ~ 10: add new file, and set sequence number to null
268+
sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS data FROM range(6, 8)")
269+
sql("UPDATE t SET data = 67 WHERE _SEQUENCE_NUMBER = 9")
270+
checkAnswer(
271+
sql(
272+
"SELECT min_sequence_number, max_sequence_number FROM `t$files` order by min_sequence_number"),
273+
Seq(Row(1, 8), Row(10, 10))
274+
)
275+
checkAnswer(
276+
sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
277+
Seq(
278+
Row(1, 1, 0, 1),
279+
Row(2, 22, 1, 8),
280+
Row(3, 3, 2, 3),
281+
Row(4, 4, 3, 5),
282+
Row(5, 5, 4, 6),
283+
Row(6, 67, 5, 10),
284+
Row(7, 67, 6, 10))
262285
)
263286
}
264287
}

0 commit comments

Comments
 (0)