Skip to content

Commit ecd0935

Browse files
committed
[flink] Add restore_as_latest procedure
1 parent cc9bb8e commit ecd0935

9 files changed

Lines changed: 352 additions & 2 deletions

File tree

docs/docs/flink/procedures.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,34 @@ All available procedures are listed below.
498498
CALL sys.rollback_to(`table` => 'default.T', snapshot_id => 10)
499499
</td>
500500
</tr>
501+
<tr>
502+
<td>restore_as_latest</td>
503+
<td>
504+
-- for Flink 1.18<br/>
505+
-- restore a snapshot as the latest snapshot<br/>
506+
CALL [catalog.]sys.restore_as_latest('identifier', cast(null as string), snapshotId)<br/><br/>
507+
-- restore a tag as the latest snapshot<br/>
508+
CALL [catalog.]sys.restore_as_latest('identifier', 'tagName', cast(null as bigint))<br/><br/>
509+
-- for Flink 1.19 and later<br/>
510+
-- restore a snapshot as the latest snapshot<br/>
511+
CALL [catalog.]sys.restore_as_latest(`table` => 'identifier', snapshot_id => snapshotId)<br/><br/>
512+
-- restore a tag as the latest snapshot<br/>
513+
CALL [catalog.]sys.restore_as_latest(`table` => 'identifier', tag => 'tagName')
514+
</td>
515+
<td>
516+
To restore a specific version of target table as the latest snapshot without deleting later snapshots or tags.
517+
Argument:
518+
<li>table: the target table identifier. Cannot be empty.</li>
519+
<li>snapshotId (Long): id of the snapshot that will restore from.</li>
520+
<li>tagName: name of the tag that will restore from.</li>
521+
</td>
522+
<td>
523+
-- for Flink 1.18<br/>
524+
CALL sys.restore_as_latest('default.T', cast(null as string), 10)<br/><br/>
525+
-- for Flink 1.19 and later<br/>
526+
CALL sys.restore_as_latest(`table` => 'default.T', snapshot_id => 10)
527+
</td>
528+
</tr>
501529
<tr>
502530
<td>rollback_to_timestamp</td>
503531
<td>

docs/docs/maintenance/manage-snapshots.mdx

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,25 @@ CALL sys.rollback(table => 'database_name.table_name', snapshot => snasphot_id);
353353
354354
</Tabs>
355355
356+
## Restore Snapshot as Latest
357+
358+
Restore a table to the state of a specific snapshot ID by creating a new latest snapshot. Unlike rollback, this operation
359+
does not delete snapshots or tags whose snapshot id is larger than the restored snapshot.
360+
361+
<Tabs groupId="restore-as-latest">
362+
363+
<TabItem value="flink-sql" label="Flink SQL">
364+
365+
Run the following command:
366+
367+
```sql
368+
CALL sys.restore_as_latest(`table` => 'database_name.table_name', snapshot_id => <snapshot-id>);
369+
```
370+
371+
</TabItem>
372+
373+
</Tabs>
374+
356375
## Remove Orphan Files
357376
358377
Paimon files are deleted physically only when expiring snapshots. However, it is possible that some unexpected errors occurred
@@ -402,4 +421,4 @@ The table can be `*` to clean all tables in the database.
402421
403422
</TabItem>
404423
405-
</Tabs>
424+
</Tabs>

docs/docs/maintenance/manage-tags.mdx

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,3 +300,22 @@ CALL sys.rollback(table => 'test.t', version => '2');
300300
</TabItem>
301301
302302
</Tabs>
303+
304+
## Restore Tag as Latest
305+
306+
Restore a table to the state of a specific tag by creating a new latest snapshot. Unlike rollback, this operation does not
307+
delete snapshots or tags whose snapshot id is larger than the restored tag.
308+
309+
<Tabs groupId="restore-as-latest">
310+
311+
<TabItem value="flink-sql" label="Flink SQL">
312+
313+
Run the following command:
314+
315+
```sql
316+
CALL sys.restore_as_latest(`table` => 'database_name.table_name', tag => 'tag_name');
317+
```
318+
319+
</TabItem>
320+
321+
</Tabs>

paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@ int overwritePartition(
7474
/** Compact the manifest entries only. */
7575
void compactManifest();
7676

77+
/** Restore the target snapshot as the latest snapshot. */
78+
boolean restoreAsLatest(Snapshot targetSnapshot);
79+
7780
/** Abort an unsuccessful commit. The data files will be deleted. */
7881
void abort(List<CommitMessage> commitMessages);
7982

paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@
107107
import static org.apache.paimon.partition.PartitionPredicate.createBinaryPartitions;
108108
import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
109109
import static org.apache.paimon.utils.Preconditions.checkArgument;
110+
import static org.apache.paimon.utils.Preconditions.checkNotNull;
110111

111112
/**
112113
* Default implementation of {@link FileStoreCommit}.
@@ -1164,6 +1165,41 @@ public boolean replaceManifestList(
11641165
return commitSnapshotImpl(newSnapshot, emptyList());
11651166
}
11661167

1168+
@Override
1169+
public boolean restoreAsLatest(Snapshot targetSnapshot) {
1170+
Snapshot latest =
1171+
checkNotNull(
1172+
snapshotManager.latestSnapshot(),
1173+
"Latest snapshot is null, can not restore.");
1174+
Pair<String, Long> baseManifestList =
1175+
manifestList.write(manifestList.readDataManifests(targetSnapshot));
1176+
Pair<String, Long> emptyDeltaManifestList = manifestList.write(emptyList());
1177+
Snapshot newSnapshot =
1178+
new Snapshot(
1179+
latest.id() + 1,
1180+
targetSnapshot.schemaId(),
1181+
baseManifestList.getKey(),
1182+
baseManifestList.getRight(),
1183+
emptyDeltaManifestList.getKey(),
1184+
emptyDeltaManifestList.getRight(),
1185+
null,
1186+
null,
1187+
targetSnapshot.indexManifest(),
1188+
commitUser,
1189+
Long.MAX_VALUE,
1190+
CommitKind.OVERWRITE,
1191+
System.currentTimeMillis(),
1192+
targetSnapshot.totalRecordCount(),
1193+
0L,
1194+
null,
1195+
targetSnapshot.watermark(),
1196+
targetSnapshot.statistics(),
1197+
targetSnapshot.properties(),
1198+
targetSnapshot.nextRowId());
1199+
1200+
return commitSnapshotImpl(newSnapshot, emptyList());
1201+
}
1202+
11671203
public void compactManifest() {
11681204
int retryCount = 0;
11691205
long startMillis = System.currentTimeMillis();

paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.paimon.table.sink;
2020

21+
import org.apache.paimon.Snapshot;
2122
import org.apache.paimon.annotation.VisibleForTesting;
2223
import org.apache.paimon.consumer.ConsumerManager;
2324
import org.apache.paimon.fs.Path;
@@ -203,6 +204,15 @@ public void compactManifests() {
203204
commit.compactManifest();
204205
}
205206

207+
public boolean restoreAsLatest(Snapshot targetSnapshot) {
208+
checkCommitted();
209+
boolean success = commit.restoreAsLatest(targetSnapshot);
210+
if (success) {
211+
maintain(COMMIT_IDENTIFIER, maintainExecutor, true);
212+
}
213+
return success;
214+
}
215+
206216
private void checkCommitted() {
207217
checkState(!batchCommitted, "BatchTableCommit only support one-time committing.");
208218
batchCommitted = true;
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.flink.procedure;
20+
21+
import org.apache.paimon.FileStore;
22+
import org.apache.paimon.Snapshot;
23+
import org.apache.paimon.catalog.Catalog;
24+
import org.apache.paimon.catalog.Identifier;
25+
import org.apache.paimon.table.FileStoreTable;
26+
import org.apache.paimon.table.Table;
27+
import org.apache.paimon.table.sink.TableCommitImpl;
28+
import org.apache.paimon.utils.Preconditions;
29+
import org.apache.paimon.utils.SnapshotManager;
30+
import org.apache.paimon.utils.StringUtils;
31+
32+
import org.apache.flink.table.annotation.ArgumentHint;
33+
import org.apache.flink.table.annotation.DataTypeHint;
34+
import org.apache.flink.table.annotation.ProcedureHint;
35+
import org.apache.flink.table.procedure.ProcedureContext;
36+
import org.apache.flink.types.Row;
37+
38+
import java.util.List;
39+
import java.util.Map;
40+
import java.util.SortedMap;
41+
import java.util.UUID;
42+
43+
/**
44+
* Restore as latest procedure. Usage:
45+
*
46+
* <pre><code>
47+
* -- restore a snapshot as the latest snapshot
48+
* CALL sys.restore_as_latest(`table` => 'tableId', snapshot_id => snapshotId)
49+
*
50+
* -- restore a tag as the latest snapshot
51+
* CALL sys.restore_as_latest(`table` => 'tableId', tag => 'tagName')
52+
* </code></pre>
53+
*/
54+
public class RestoreAsLatestProcedure extends ProcedureBase {
55+
56+
public static final String IDENTIFIER = "restore_as_latest";
57+
58+
@ProcedureHint(
59+
argument = {
60+
@ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
61+
@ArgumentHint(name = "tag", type = @DataTypeHint("STRING"), isOptional = true),
62+
@ArgumentHint(
63+
name = "snapshot_id",
64+
type = @DataTypeHint("BIGINT"),
65+
isOptional = true)
66+
})
67+
public @DataTypeHint(
68+
"ROW<previous_snapshot_id BIGINT, restored_snapshot_id BIGINT, current_snapshot_id BIGINT>")
69+
Row[] call(
70+
ProcedureContext procedureContext,
71+
String tableId,
72+
String tagName,
73+
Long snapshotId)
74+
throws Catalog.TableNotExistException {
75+
Table table = catalog.getTable(Identifier.fromString(tableId));
76+
FileStoreTable fileStoreTable = (FileStoreTable) table;
77+
78+
FileStore<?> store = fileStoreTable.store();
79+
Snapshot latestSnapshot = store.snapshotManager().latestSnapshot();
80+
Preconditions.checkNotNull(latestSnapshot, "Latest snapshot is null, can not restore.");
81+
82+
boolean hasTag = !StringUtils.isNullOrWhitespaceOnly(tagName);
83+
boolean hasSnapshot = snapshotId != null;
84+
Preconditions.checkArgument(
85+
hasTag != hasSnapshot, "Must specify exactly one of tag and snapshot_id.");
86+
87+
Snapshot targetSnapshot;
88+
if (hasTag) {
89+
targetSnapshot = store.newTagManager().getOrThrow(tagName).trimToSnapshot();
90+
} else {
91+
targetSnapshot = findSnapshot(store, snapshotId);
92+
}
93+
94+
try (TableCommitImpl commit =
95+
fileStoreTable.newCommit("restore-as-latest-" + UUID.randomUUID().toString())) {
96+
Preconditions.checkState(
97+
commit.restoreAsLatest(targetSnapshot),
98+
"Failed to restore snapshot %s as latest.",
99+
targetSnapshot.id());
100+
} catch (Exception e) {
101+
throw new RuntimeException(
102+
String.format("Failed to restore snapshot %s as latest.", targetSnapshot.id()),
103+
e);
104+
}
105+
106+
return new Row[] {
107+
Row.of(
108+
latestSnapshot.id(),
109+
targetSnapshot.id(),
110+
store.snapshotManager().latestSnapshotId())
111+
};
112+
}
113+
114+
private Snapshot findSnapshot(FileStore<?> store, long snapshotId) {
115+
SnapshotManager snapshotManager = store.snapshotManager();
116+
if (snapshotManager.snapshotExists(snapshotId)) {
117+
return snapshotManager.snapshot(snapshotId);
118+
}
119+
120+
SortedMap<Snapshot, List<String>> tags = store.newTagManager().tags();
121+
for (Map.Entry<Snapshot, List<String>> entry : tags.entrySet()) {
122+
if (entry.getKey().id() == snapshotId) {
123+
return entry.getKey();
124+
} else if (entry.getKey().id() > snapshotId) {
125+
break;
126+
}
127+
}
128+
129+
throw new IllegalArgumentException(
130+
String.format("Restore snapshot '%s' doesn't exist.", snapshotId));
131+
}
132+
133+
@Override
134+
public String identifier() {
135+
return IDENTIFIER;
136+
}
137+
}

paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ org.apache.paimon.flink.procedure.DropPartitionProcedure
6969
org.apache.paimon.flink.procedure.MergeIntoProcedure
7070
org.apache.paimon.flink.procedure.ResetConsumerProcedure
7171
org.apache.paimon.flink.procedure.RollbackToProcedure
72+
org.apache.paimon.flink.procedure.RestoreAsLatestProcedure
7273
org.apache.paimon.flink.procedure.RollbackToTimestampProcedure
7374
org.apache.paimon.flink.procedure.RollbackToWatermarkProcedure
7475
org.apache.paimon.flink.procedure.MigrateTableProcedure
@@ -106,4 +107,4 @@ org.apache.paimon.flink.procedure.DataEvolutionMergeIntoProcedure
106107
org.apache.paimon.flink.procedure.ReassignRowIdProcedure
107108
org.apache.paimon.flink.procedure.CreateGlobalIndexProcedure
108109
org.apache.paimon.flink.procedure.VectorSearchProcedure
109-
org.apache.paimon.flink.procedure.DropGlobalIndexProcedure
110+
org.apache.paimon.flink.procedure.DropGlobalIndexProcedure

0 commit comments

Comments
 (0)