Skip to content

Commit 439db01

Browse files
zhuxiangyiclaude
andcommitted
[flink] Fix restore_as_latest overwrite delta
Ensure restore_as_latest writes an overwrite delta so streaming overwrite readers can observe restored file changes. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 1492902 commit 439db01

2 files changed

Lines changed: 84 additions & 8 deletions

File tree

paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java

Lines changed: 50 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1171,17 +1171,61 @@ public boolean restoreAsLatest(Snapshot targetSnapshot) {
11711171
checkNotNull(
11721172
snapshotManager.latestSnapshot(),
11731173
"Latest snapshot is null, can not restore.");
1174+
1175+
Map<FileEntry.Identifier, ManifestEntry> latestEntries = new HashMap<>();
1176+
FileEntry.mergeEntries(
1177+
manifestFile,
1178+
manifestList.readDataManifests(latest),
1179+
latestEntries,
1180+
options.scanManifestParallelism());
1181+
1182+
latestEntries.entrySet().removeIf(entry -> entry.getValue().kind() != FileKind.ADD);
1183+
1184+
Map<FileEntry.Identifier, ManifestEntry> targetEntries = new HashMap<>();
1185+
FileEntry.mergeEntries(
1186+
manifestFile,
1187+
manifestList.readDataManifests(targetSnapshot),
1188+
targetEntries,
1189+
options.scanManifestParallelism());
1190+
targetEntries.entrySet().removeIf(entry -> entry.getValue().kind() != FileKind.ADD);
1191+
1192+
List<ManifestEntry> deltaFiles = new ArrayList<>();
1193+
for (Map.Entry<FileEntry.Identifier, ManifestEntry> entry : latestEntries.entrySet()) {
1194+
if (!targetEntries.containsKey(entry.getKey())) {
1195+
ManifestEntry manifestEntry = entry.getValue();
1196+
deltaFiles.add(
1197+
ManifestEntry.create(
1198+
FileKind.DELETE,
1199+
manifestEntry.partition(),
1200+
manifestEntry.bucket(),
1201+
manifestEntry.totalBuckets(),
1202+
manifestEntry.file()));
1203+
}
1204+
}
1205+
for (Map.Entry<FileEntry.Identifier, ManifestEntry> entry : targetEntries.entrySet()) {
1206+
if (!latestEntries.containsKey(entry.getKey())) {
1207+
ManifestEntry manifestEntry = entry.getValue();
1208+
deltaFiles.add(
1209+
ManifestEntry.create(
1210+
FileKind.ADD,
1211+
manifestEntry.partition(),
1212+
manifestEntry.bucket(),
1213+
manifestEntry.totalBuckets(),
1214+
manifestEntry.file()));
1215+
}
1216+
}
1217+
11741218
Pair<String, Long> baseManifestList =
1175-
manifestList.write(manifestList.readDataManifests(targetSnapshot));
1176-
Pair<String, Long> emptyDeltaManifestList = manifestList.write(emptyList());
1219+
manifestList.write(manifestFile.write(new ArrayList<>(latestEntries.values())));
1220+
Pair<String, Long> deltaManifestList = manifestList.write(manifestFile.write(deltaFiles));
11771221
Snapshot newSnapshot =
11781222
new Snapshot(
11791223
latest.id() + 1,
11801224
targetSnapshot.schemaId(),
11811225
baseManifestList.getKey(),
11821226
baseManifestList.getRight(),
1183-
emptyDeltaManifestList.getKey(),
1184-
emptyDeltaManifestList.getRight(),
1227+
deltaManifestList.getKey(),
1228+
deltaManifestList.getRight(),
11851229
null,
11861230
null,
11871231
targetSnapshot.indexManifest(),
@@ -1190,14 +1234,14 @@ public boolean restoreAsLatest(Snapshot targetSnapshot) {
11901234
CommitKind.OVERWRITE,
11911235
System.currentTimeMillis(),
11921236
targetSnapshot.totalRecordCount(),
1193-
0L,
1237+
recordCountAdd(deltaFiles) - recordCountDelete(deltaFiles),
11941238
null,
11951239
targetSnapshot.watermark(),
11961240
targetSnapshot.statistics(),
11971241
targetSnapshot.properties(),
11981242
targetSnapshot.nextRowId());
11991243

1200-
return commitSnapshotImpl(newSnapshot, emptyList());
1244+
return commitSnapshotImpl(newSnapshot, new ArrayList<>(PartitionEntry.merge(deltaFiles)));
12011245
}
12021246

12031247
public void compactManifest() {

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RestoreAsLatestProcedureITCase.java

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,12 @@
1818

1919
package org.apache.paimon.flink.procedure;
2020

21+
import org.apache.paimon.Snapshot;
2122
import org.apache.paimon.data.BinaryString;
2223
import org.apache.paimon.data.GenericRow;
2324
import org.apache.paimon.flink.CatalogITCaseBase;
25+
import org.apache.paimon.manifest.ManifestFileMeta;
26+
import org.apache.paimon.manifest.ManifestList;
2427
import org.apache.paimon.table.FileStoreTable;
2528
import org.apache.paimon.table.sink.BatchTableCommit;
2629
import org.apache.paimon.table.sink.BatchTableWrite;
@@ -30,6 +33,8 @@
3033
import org.apache.flink.types.Row;
3134
import org.junit.jupiter.api.Test;
3235

36+
import java.util.List;
37+
3338
import static org.assertj.core.api.Assertions.assertThat;
3439
import static org.junit.jupiter.api.Assertions.assertEquals;
3540
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -53,14 +58,24 @@ public void testRestoreSnapshotAsLatest() throws Exception {
5358
.containsExactly(Row.of(3L, 1L, 4L));
5459

5560
assertEquals(4, snapshotManager.latestSnapshotId());
61+
assertRestoreDelta(table, 4, 0, 2, -2L);
5662
assertTrue(snapshotManager.snapshotExists(2));
5763
assertTrue(snapshotManager.snapshotExists(3));
5864
assertThat(sql("SELECT * FROM T")).containsExactly(Row.of(1, "a"));
5965

60-
commitRow(table, 4, "d");
66+
assertThat(sql("CALL sys.restore_as_latest(`table` => 'default.T', snapshot_id => 3)"))
67+
.containsExactly(Row.of(4L, 3L, 5L));
68+
6169
assertEquals(5, snapshotManager.latestSnapshotId());
70+
assertRestoreDelta(table, 5, 2, 0, 2L);
71+
assertThat(sql("SELECT * FROM T"))
72+
.containsExactlyInAnyOrder(Row.of(1, "a"), Row.of(2, "b"), Row.of(3, "c"));
73+
74+
commitRow(table, 4, "d");
75+
assertEquals(6, snapshotManager.latestSnapshotId());
6276
assertThat(sql("SELECT * FROM T"))
63-
.containsExactlyInAnyOrder(Row.of(1, "a"), Row.of(4, "d"));
77+
.containsExactlyInAnyOrder(
78+
Row.of(1, "a"), Row.of(2, "b"), Row.of(3, "c"), Row.of(4, "d"));
6479
}
6580

6681
@Test
@@ -81,11 +96,28 @@ public void testRestoreTagAsLatest() throws Exception {
8196
.containsExactly(Row.of(3L, 1L, 4L));
8297

8398
assertEquals(4, snapshotManager.latestSnapshotId());
99+
assertRestoreDelta(table, 4, 0, 2, -2L);
84100
assertTrue(snapshotManager.snapshotExists(2));
85101
assertTrue(snapshotManager.snapshotExists(3));
86102
assertThat(sql("SELECT * FROM T")).containsExactly(Row.of(1, "a"));
87103
}
88104

105+
private void assertRestoreDelta(
106+
FileStoreTable table,
107+
long snapshotId,
108+
long expectedNumAddedFiles,
109+
long expectedNumDeletedFiles,
110+
long expectedDeltaRecordCount) {
111+
Snapshot snapshot = table.snapshot(snapshotId);
112+
ManifestList manifestList = table.store().manifestListFactory().create();
113+
List<ManifestFileMeta> deltaManifests = manifestList.readDeltaManifests(snapshot);
114+
115+
assertThat(deltaManifests).hasSize(1);
116+
assertThat(deltaManifests.get(0).numAddedFiles()).isEqualTo(expectedNumAddedFiles);
117+
assertThat(deltaManifests.get(0).numDeletedFiles()).isEqualTo(expectedNumDeletedFiles);
118+
assertThat(snapshot.deltaRecordCount()).isEqualTo(expectedDeltaRecordCount);
119+
}
120+
89121
private void commitRow(FileStoreTable table, int id, String name) throws Exception {
90122
BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
91123
try (BatchTableWrite write = writeBuilder.newWrite();

0 commit comments

Comments
 (0)