Skip to content

Commit 22f8666

Browse files
kevinjqliuCopilot
andauthored
Flink: Backport PR #16324 to v2.0 and v1.20 (#16338)
Agent-Logs-Url: https://github.com/kevinjqliu/iceberg/sessions/682ce8b4-890f-41a9-a89a-b1f2873be44c Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: kevinjqliu <9057843+kevinjqliu@users.noreply.github.com>
1 parent bf340af commit 22f8666

4 files changed

Lines changed: 72 additions & 0 deletions

File tree

flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ListMetadataFiles.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ public void open(OpenContext openContext) throws Exception {
6666
public void processElement(Trigger trigger, Context ctx, Collector<String> collector)
6767
throws Exception {
6868
try {
69+
table.refresh();
6970
table
7071
.snapshots()
7172
.forEach(

flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestListMetadataFiles.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@
2323
import java.util.List;
2424
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
2525
import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;
26+
import org.apache.iceberg.Snapshot;
2627
import org.apache.iceberg.Table;
2728
import org.apache.iceberg.flink.maintenance.api.Trigger;
29+
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
2830
import org.junit.jupiter.api.Test;
2931

3032
class TestListMetadataFiles extends OperatorTestBase {
@@ -87,4 +89,37 @@ void testMetadataFilesWithEmptyTable() throws Exception {
8789
assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).isNull();
8890
}
8991
}
92+
93+
@Test
94+
void testMetadataFilesIncludesSnapshotsAddedAfterOpen() throws Exception {
95+
Table table = createTable();
96+
insert(table, 1, "a");
97+
98+
try (OneInputStreamOperatorTestHarness<Trigger, String> testHarness =
99+
ProcessFunctionTestHarnesses.forProcessFunction(
100+
new ListMetadataFiles(OperatorTestBase.DUMMY_TABLE_NAME, 0, tableLoader()))) {
101+
testHarness.open();
102+
103+
// Add more snapshots AFTER the operator has been opened
104+
insert(table, 2, "b");
105+
insert(table, 3, "c");
106+
107+
OperatorTestBase.trigger(testHarness);
108+
109+
List<String> tableMetadataFiles = testHarness.extractOutputValues();
110+
111+
// Verify that manifest lists from ALL 3 snapshots are present, not just the first one.
112+
// Without table.refresh() in processElement, only snapshot 1's files would be emitted.
113+
table.refresh();
114+
List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
115+
assertThat(snapshots).hasSize(3);
116+
for (Snapshot snapshot : snapshots) {
117+
assertThat(tableMetadataFiles).contains(snapshot.manifestListLocation());
118+
}
119+
// Verify total count matches what 3 snapshots should produce
120+
assertThat(tableMetadataFiles).hasSize(24);
121+
122+
assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).isNull();
123+
}
124+
}
90125
}

flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ListMetadataFiles.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ public void open(OpenContext openContext) throws Exception {
6666
public void processElement(Trigger trigger, Context ctx, Collector<String> collector)
6767
throws Exception {
6868
try {
69+
table.refresh();
6970
table
7071
.snapshots()
7172
.forEach(

flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestListMetadataFiles.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@
2323
import java.util.List;
2424
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
2525
import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;
26+
import org.apache.iceberg.Snapshot;
2627
import org.apache.iceberg.Table;
2728
import org.apache.iceberg.flink.maintenance.api.Trigger;
29+
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
2830
import org.junit.jupiter.api.Test;
2931

3032
class TestListMetadataFiles extends OperatorTestBase {
@@ -87,4 +89,37 @@ void testMetadataFilesWithEmptyTable() throws Exception {
8789
assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).isNull();
8890
}
8991
}
92+
93+
@Test
94+
void testMetadataFilesIncludesSnapshotsAddedAfterOpen() throws Exception {
95+
Table table = createTable();
96+
insert(table, 1, "a");
97+
98+
try (OneInputStreamOperatorTestHarness<Trigger, String> testHarness =
99+
ProcessFunctionTestHarnesses.forProcessFunction(
100+
new ListMetadataFiles(OperatorTestBase.DUMMY_TABLE_NAME, 0, tableLoader()))) {
101+
testHarness.open();
102+
103+
// Add more snapshots AFTER the operator has been opened
104+
insert(table, 2, "b");
105+
insert(table, 3, "c");
106+
107+
OperatorTestBase.trigger(testHarness);
108+
109+
List<String> tableMetadataFiles = testHarness.extractOutputValues();
110+
111+
// Verify that manifest lists from ALL 3 snapshots are present, not just the first one.
112+
// Without table.refresh() in processElement, only snapshot 1's files would be emitted.
113+
table.refresh();
114+
List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
115+
assertThat(snapshots).hasSize(3);
116+
for (Snapshot snapshot : snapshots) {
117+
assertThat(tableMetadataFiles).contains(snapshot.manifestListLocation());
118+
}
119+
// Verify total count matches what 3 snapshots should produce
120+
assertThat(tableMetadataFiles).hasSize(24);
121+
122+
assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).isNull();
123+
}
124+
}
90125
}

0 commit comments

Comments
 (0)