Skip to content

Commit 4a83228

Browse files
[iceberg] Enhance iceberg snapshot metadata. (#6354)
1 parent 7962e22 commit 4a83228

6 files changed

Lines changed: 689 additions & 6 deletions

File tree

paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -339,10 +339,13 @@ private void createMetadataWithoutBase(long snapshotId) throws IOException {
339339
new IcebergSnapshot(
340340
snapshotId,
341341
snapshotId,
342+
null,
342343
System.currentTimeMillis(),
343344
IcebergSnapshotSummary.APPEND,
344345
pathFactory.toManifestListPath(manifestListFileName).toString(),
345-
schemaId);
346+
schemaId,
347+
null,
348+
null);
346349

347350
// Tags can only be included in Iceberg if they point to an Iceberg snapshot that
348351
// exists. Otherwise an Iceberg client fails to parse the metadata and all reads fail.
@@ -599,10 +602,13 @@ private void createMetadataWithBase(
599602
new IcebergSnapshot(
600603
snapshotId,
601604
snapshotId,
605+
snapshotId - 1,
602606
System.currentTimeMillis(),
603607
snapshotSummary,
604608
pathFactory.toManifestListPath(manifestListFileName).toString(),
605-
schemaId));
609+
schemaId,
610+
null,
611+
null));
606612

607613
// all snapshots in this list, except the last one, need to expire
608614
List<IcebergSnapshot> toExpireExceptLast = new ArrayList<>();

paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,7 @@ public int hashCode() {
319319
defaultSortOrderId,
320320
snapshots,
321321
currentSnapshotId,
322+
properties,
322323
refs);
323324
}
324325

@@ -347,6 +348,7 @@ public boolean equals(Object o) {
347348
&& defaultSortOrderId == that.defaultSortOrderId
348349
&& Objects.equals(snapshots, that.snapshots)
349350
&& currentSnapshotId == that.currentSnapshotId
351+
&& Objects.equals(properties, that.properties)
350352
&& Objects.equals(refs, that.refs);
351353
}
352354
}

paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshot.java

Lines changed: 56 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,11 @@
2121
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
2222
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
2323
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
24+
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
2425
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
2526

27+
import javax.annotation.Nullable;
28+
2629
import java.util.Objects;
2730

2831
/**
@@ -35,17 +38,25 @@ public class IcebergSnapshot {
3538

3639
private static final String FIELD_SEQUENCE_NUMBER = "sequence-number";
3740
private static final String FIELD_SNAPSHOT_ID = "snapshot-id";
41+
private static final String FIELD_PARENT_SNAPSHOT_ID = "parent-snapshot-id";
3842
private static final String FIELD_TIMESTAMP_MS = "timestamp-ms";
3943
private static final String FIELD_SUMMARY = "summary";
4044
private static final String FIELD_MANIFEST_LIST = "manifest-list";
4145
private static final String FIELD_SCHEMA_ID = "schema-id";
46+
private static final String FIELD_FIRST_ROW_ID = "first-row-id";
47+
private static final String FIELD_ADDED_ROWS = "added-rows";
4248

4349
@JsonProperty(FIELD_SEQUENCE_NUMBER)
4450
private final long sequenceNumber;
4551

4652
@JsonProperty(FIELD_SNAPSHOT_ID)
4753
private final long snapshotId;
4854

55+
@JsonProperty(FIELD_PARENT_SNAPSHOT_ID)
56+
@JsonInclude(JsonInclude.Include.NON_NULL)
57+
@Nullable
58+
private final Long parentSnapshotId;
59+
4960
@JsonProperty(FIELD_TIMESTAMP_MS)
5061
private final long timestampMs;
5162

@@ -58,20 +69,36 @@ public class IcebergSnapshot {
5869
@JsonProperty(FIELD_SCHEMA_ID)
5970
private final int schemaId;
6071

72+
@JsonProperty(FIELD_FIRST_ROW_ID)
73+
@JsonInclude(JsonInclude.Include.NON_NULL)
74+
@Nullable
75+
private final Long firstRowId;
76+
77+
@JsonProperty(FIELD_ADDED_ROWS)
78+
@JsonInclude(JsonInclude.Include.NON_NULL)
79+
@Nullable
80+
private final Long addedRows;
81+
6182
@JsonCreator
6283
public IcebergSnapshot(
6384
@JsonProperty(FIELD_SEQUENCE_NUMBER) long sequenceNumber,
6485
@JsonProperty(FIELD_SNAPSHOT_ID) long snapshotId,
86+
@JsonProperty(FIELD_PARENT_SNAPSHOT_ID) Long parentSnapshotId,
6587
@JsonProperty(FIELD_TIMESTAMP_MS) long timestampMs,
6688
@JsonProperty(FIELD_SUMMARY) IcebergSnapshotSummary summary,
6789
@JsonProperty(FIELD_MANIFEST_LIST) String manifestList,
68-
@JsonProperty(FIELD_SCHEMA_ID) int schemaId) {
90+
@JsonProperty(FIELD_SCHEMA_ID) int schemaId,
91+
@JsonProperty(FIELD_FIRST_ROW_ID) Long firstRowId,
92+
@JsonProperty(FIELD_ADDED_ROWS) Long addedRows) {
6993
this.sequenceNumber = sequenceNumber;
7094
this.snapshotId = snapshotId;
95+
this.parentSnapshotId = parentSnapshotId;
7196
this.timestampMs = timestampMs;
7297
this.summary = summary;
7398
this.manifestList = manifestList;
7499
this.schemaId = schemaId;
100+
this.firstRowId = firstRowId;
101+
this.addedRows = addedRows;
75102
}
76103

77104
@JsonGetter(FIELD_SEQUENCE_NUMBER)
@@ -104,10 +131,33 @@ public int schemaId() {
104131
return schemaId;
105132
}
106133

134+
@JsonGetter(FIELD_PARENT_SNAPSHOT_ID)
135+
public Long parentSnapshotId() {
136+
return parentSnapshotId;
137+
}
138+
139+
@JsonGetter(FIELD_ADDED_ROWS)
140+
public Long addedRows() {
141+
return addedRows;
142+
}
143+
144+
@JsonGetter(FIELD_FIRST_ROW_ID)
145+
public Long firstRowId() {
146+
return firstRowId;
147+
}
148+
107149
@Override
108150
public int hashCode() {
109151
return Objects.hash(
110-
sequenceNumber, snapshotId, timestampMs, summary, manifestList, schemaId);
152+
sequenceNumber,
153+
snapshotId,
154+
parentSnapshotId,
155+
timestampMs,
156+
summary,
157+
manifestList,
158+
schemaId,
159+
addedRows,
160+
firstRowId);
111161
}
112162

113163
@Override
@@ -122,9 +172,12 @@ public boolean equals(Object o) {
122172
IcebergSnapshot that = (IcebergSnapshot) o;
123173
return sequenceNumber == that.sequenceNumber
124174
&& snapshotId == that.snapshotId
175+
&& Objects.equals(parentSnapshotId, that.parentSnapshotId)
125176
&& timestampMs == that.timestampMs
126177
&& Objects.equals(summary, that.summary)
127178
&& Objects.equals(manifestList, that.manifestList)
128-
&& schemaId == that.schemaId;
179+
&& schemaId == that.schemaId
180+
&& Objects.equals(addedRows, that.addedRows)
181+
&& Objects.equals(firstRowId, that.firstRowId);
129182
}
130183
}

paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitter.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -374,10 +374,13 @@ private IcebergMetadata adjustMetadataForRest(IcebergMetadata newIcebergMetadata
374374
new IcebergSnapshot(
375375
snapshot.sequenceNumber(),
376376
snapshot.snapshotId(),
377+
snapshot.parentSnapshotId(),
377378
snapshot.timestampMs(),
378379
snapshot.summary(),
379380
snapshot.manifestList(),
380-
snapshot.schemaId() + 1))
381+
snapshot.schemaId() + 1,
382+
snapshot.firstRowId(),
383+
snapshot.addedRows()))
381384
.collect(Collectors.toList());
382385
return new IcebergMetadata(
383386
newIcebergMetadata.formatVersion(),

0 commit comments

Comments
 (0)