Skip to content

Commit dfa121b

Browse files
[iceberg] Enhance Iceberg snapshot summary (#6370)
1 parent a4f5b28 commit dfa121b

5 files changed

Lines changed: 70 additions & 19 deletions

File tree

paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ private void createMetadataWithoutBase(long snapshotId) throws IOException {
348348
null);
349349

350350
// Tags can only be included in Iceberg if they point to an Iceberg snapshot that
351-
// exists. Otherwise an Iceberg client fails to parse the metadata and all reads fail.
351+
// exists. Otherwise, an Iceberg client fails to parse the metadata and all reads fail.
352352
// Only the latest snapshot ID is added to Iceberg in this code path. Since this snapshot
353353
// has just been committed to Paimon, it is not possible for any Paimon tag to reference it
354354
// yet.

paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshotSummary.java

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@
1919
package org.apache.paimon.iceberg.metadata;
2020

2121
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
22-
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
2322
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
24-
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
23+
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonValue;
2524

25+
import java.util.HashMap;
26+
import java.util.Map;
2627
import java.util.Objects;
2728

2829
/**
@@ -38,22 +39,38 @@ public class IcebergSnapshotSummary {
3839
public static final IcebergSnapshotSummary APPEND = new IcebergSnapshotSummary("append");
3940
public static final IcebergSnapshotSummary OVERWRITE = new IcebergSnapshotSummary("overwrite");
4041

41-
@JsonProperty(FIELD_OPERATION)
42-
private final String operation;
42+
private final Map<String, String> summary;
4343

4444
@JsonCreator
45-
public IcebergSnapshotSummary(@JsonProperty(FIELD_OPERATION) String operation) {
46-
this.operation = operation;
45+
public IcebergSnapshotSummary(Map<String, String> summary) {
46+
this.summary = summary != null ? new HashMap<>(summary) : new HashMap<>();
47+
}
48+
49+
public IcebergSnapshotSummary(String operation) {
50+
this.summary = new HashMap<>();
51+
this.summary.put(FIELD_OPERATION, operation);
52+
}
53+
54+
@JsonValue
55+
public Map<String, String> getSummary() {
56+
return new HashMap<>(summary);
4757
}
4858

49-
@JsonGetter(FIELD_OPERATION)
5059
public String operation() {
51-
return operation;
60+
return summary.get(FIELD_OPERATION);
61+
}
62+
63+
public String get(String key) {
64+
return summary.get(key);
65+
}
66+
67+
public void put(String key, String value) {
68+
summary.put(key, value);
5269
}
5370

5471
@Override
5572
public int hashCode() {
56-
return Objects.hash(operation);
73+
return Objects.hash(summary);
5774
}
5875

5976
@Override
@@ -66,6 +83,6 @@ public boolean equals(Object o) {
6683
}
6784

6885
IcebergSnapshotSummary that = (IcebergSnapshotSummary) o;
69-
return Objects.equals(operation, that.operation);
86+
return Objects.equals(summary, that.summary);
7087
}
7188
}

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,8 +169,8 @@ protected Map<String, String> extractRowData(JsonNode record, CdcSchema.Builder
169169
typeInfo.f0, typeInfo.f1, typeInfo.f2, typeMapping);
170170
schemaBuilder.column(originalName, paimonDataType);
171171

172-
String filedValue = Objects.toString(recordMap.get(originalName), null);
173-
String newValue = transformValue(filedValue, typeInfo.f0, originalType);
172+
String fieldValue = Objects.toString(recordMap.get(originalName), null);
173+
String newValue = transformValue(fieldValue, typeInfo.f0, originalType);
174174
rowData.put(originalName, newValue);
175175
}
176176
} else {

paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReaderUtil.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -168,17 +168,17 @@ private static ParquetField constructField(
168168
int repetitionLevel = columnIO.getRepetitionLevel();
169169
int definitionLevel = columnIO.getDefinitionLevel();
170170
DataType type = dataField.type();
171-
String filedName = dataField.name();
171+
String fieldName = dataField.name();
172172
if (type instanceof RowType) {
173173
GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO;
174174
RowType rowType = (RowType) type;
175175
ImmutableList.Builder<ParquetField> fieldsBuilder = ImmutableList.builder();
176176
List<String> fieldNames = rowType.getFieldNames();
177-
List<DataField> childrens = rowType.getFields();
178-
for (int i = 0; i < childrens.size(); i++) {
177+
List<DataField> children = rowType.getFields();
178+
for (int i = 0; i < children.size(); i++) {
179179
fieldsBuilder.add(
180180
constructField(
181-
childrens.get(i),
181+
children.get(i),
182182
lookupColumnByName(groupColumnIO, fieldNames.get(i))));
183183
}
184184

@@ -281,8 +281,8 @@ private static ParquetField constructField(
281281
ColumnIO elementTypeColumnIO;
282282
if (columnIO instanceof GroupColumnIO) {
283283
GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO;
284-
if (!StringUtils.isNullOrWhitespaceOnly(filedName)) {
285-
while (!Objects.equals(groupColumnIO.getName(), filedName)) {
284+
if (!StringUtils.isNullOrWhitespaceOnly(fieldName)) {
285+
while (!Objects.equals(groupColumnIO.getName(), fieldName)) {
286286
groupColumnIO = (GroupColumnIO) groupColumnIO.getChild(0);
287287
}
288288
elementTypeColumnIO = groupColumnIO;

paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergMetadataTest.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -483,6 +483,40 @@ void testFormatVersionV3Table() throws Exception {
483483
assertThat(paimonIcebergMetadata2.snapshots().get(1).sequenceNumber()).isEqualTo(2L);
484484
}
485485

486+
@Test
487+
@DisplayName("Test reading metadata with snapshot summary")
488+
void testReadMetadataWithSnapshotSummary() throws Exception {
489+
// Create a basic Iceberg table
490+
Table icebergTable = createBasicIcebergTable("snapshot_summary_table");
491+
492+
// Perform append operation
493+
icebergTable
494+
.newFastAppend()
495+
.appendFile(
496+
DataFiles.builder(PartitionSpec.unpartitioned())
497+
.withPath("/path/to/append-data.parquet")
498+
.withFileSizeInBytes(100)
499+
.withRecordCount(10)
500+
.build())
501+
.commit();
502+
503+
// Read metadata after append operation
504+
IcebergMetadata paimonIcebergMetadata = readIcebergMetadata("snapshot_summary_table");
505+
506+
// Verify snapshots
507+
assertThat(paimonIcebergMetadata.snapshots()).hasSize(1);
508+
IcebergSnapshot snapshot = paimonIcebergMetadata.snapshots().get(0);
509+
assertThat(snapshot.snapshotId()).isNotNull();
510+
assertThat(snapshot.parentSnapshotId()).isNull();
511+
512+
// Verify snapshot summary contains operation information
513+
assertThat(snapshot.summary().operation()).isEqualTo("append");
514+
assertThat(snapshot.summary().getSummary().get("operation")).isEqualTo("append");
515+
assertThat(snapshot.summary().getSummary().get("total-records")).isEqualTo("10");
516+
assertThat(snapshot.summary().getSummary().get("added-data-files")).isEqualTo("1");
517+
assertThat(snapshot.summary().getSummary().get("added-files-size")).isEqualTo("100");
518+
}
519+
486520
/** Helper method to create a basic Iceberg table with simple schema. */
487521
private Table createBasicIcebergTable(String tableName) {
488522
TableIdentifier identifier = TableIdentifier.of("testdb", tableName);

0 commit comments

Comments
 (0)