Skip to content

Commit 13cd198

Browse files
authored
[FLINK-39363][test] Fix flaky test PaimonSinkITCase (#4366)
1 parent b48d8eb commit 13cd198

1 file changed

Lines changed: 8 additions & 9 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ public class PaimonSinkITCase {
133133
private final TableId table1 = TableId.tableId("test", "table1");
134134
private final TableId table2 = TableId.tableId("test", "table2");
135135

136-
private static int checkpointId = 1;
136+
private int checkpointId = 1;
137137

138138
public static final String TEST_DATABASE = "test";
139139
private static final String HADOOP_CONF_DIR =
@@ -756,18 +756,17 @@ public void testSinkWithMultiTables(String metastore, boolean enableDeleteVector
756756
.containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, "1", "1"));
757757
}
758758

759-
private static void commit(
760-
PaimonWriter<Event> writer, Committer<MultiTableCommittable> committer)
759+
private void commit(PaimonWriter<Event> writer, Committer<MultiTableCommittable> committer)
761760
throws IOException, InterruptedException {
762761
Collection<Committer.CommitRequest<MultiTableCommittable>> commitRequests =
763762
writer.prepareCommit().stream()
764-
.map(PaimonSinkITCase::correctCheckpointId)
763+
.map(this::correctCheckpointId)
765764
.map(MockCommitRequestImpl::new)
766765
.collect(Collectors.toList());
767766
committer.commit(commitRequests);
768767
}
769768

770-
private static void writeAndCommit(
769+
private void writeAndCommit(
771770
PaimonWriter<Event> writer, Committer<MultiTableCommittable> committer, Event... events)
772771
throws IOException, InterruptedException {
773772
for (Event event : events) {
@@ -777,7 +776,7 @@ private static void writeAndCommit(
777776
commit(writer, committer);
778777
}
779778

780-
private static void writeAndCommit(
779+
private void writeAndCommit(
781780
BucketAssignOperator bucketAssignOperator,
782781
PaimonWriter<Event> writer,
783782
Committer<MultiTableCommittable> committer,
@@ -868,7 +867,7 @@ public void testDuplicateCommitAfterRestore(String metastore, boolean enableDele
868867
writer.flush(false);
869868
Collection<Committer.CommitRequest<MultiTableCommittable>> commitRequests =
870869
writer.prepareCommit().stream()
871-
.map(PaimonSinkITCase::correctCheckpointId)
870+
.map(this::correctCheckpointId)
872871
.map(MockCommitRequestImpl::new)
873872
.collect(Collectors.toList());
874873
committer.commit(commitRequests);
@@ -900,7 +899,7 @@ public void testDuplicateCommitAfterRestore(String metastore, boolean enableDele
900899
// Checkpoint id start from 1
901900
committer.commit(
902901
writer.prepareCommit().stream()
903-
.map(PaimonSinkITCase::correctCheckpointId)
902+
.map(this::correctCheckpointId)
904903
.map(MockCommitRequestImpl::new)
905904
.collect(Collectors.toList()));
906905
}
@@ -1039,7 +1038,7 @@ private void runJobWithEvents(List<Event> events, boolean isBatchMode) throws Ex
10391038
env.execute("runJobWithEvents").getJobExecutionResult();
10401039
}
10411040

1042-
private static MultiTableCommittable correctCheckpointId(MultiTableCommittable committable) {
1041+
private MultiTableCommittable correctCheckpointId(MultiTableCommittable committable) {
10431042
// update the right checkpointId for MultiTableCommittable
10441043
return new MultiTableCommittable(
10451044
committable.getDatabase(),

0 commit comments

Comments
 (0)