Skip to content

Commit 0307b78

Browse files
committed
[FLINK-39363][Test] Fix flaky test PaimonSinkITCase
1 parent b48d8eb commit 0307b78

1 file changed

Lines changed: 14 additions & 9 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
import org.apache.paimon.flink.sink.MultiTableCommittable;
8989
import org.apache.paimon.options.Options;
9090
import org.assertj.core.api.Assertions;
91+
import org.junit.jupiter.api.BeforeEach;
9192
import org.junit.jupiter.api.io.TempDir;
9293
import org.junit.jupiter.params.ParameterizedTest;
9394
import org.junit.jupiter.params.provider.CsvSource;
@@ -133,7 +134,12 @@ public class PaimonSinkITCase {
133134
private final TableId table1 = TableId.tableId("test", "table1");
134135
private final TableId table2 = TableId.tableId("test", "table2");
135136

136-
private static int checkpointId = 1;
137+
private int checkpointId = 1;
138+
139+
@BeforeEach
140+
public void resetCheckpointId() {
141+
checkpointId = 1;
142+
}
137143

138144
public static final String TEST_DATABASE = "test";
139145
private static final String HADOOP_CONF_DIR =
@@ -756,18 +762,17 @@ public void testSinkWithMultiTables(String metastore, boolean enableDeleteVector
756762
.containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, "1", "1"));
757763
}
758764

759-
private static void commit(
760-
PaimonWriter<Event> writer, Committer<MultiTableCommittable> committer)
765+
private void commit(PaimonWriter<Event> writer, Committer<MultiTableCommittable> committer)
761766
throws IOException, InterruptedException {
762767
Collection<Committer.CommitRequest<MultiTableCommittable>> commitRequests =
763768
writer.prepareCommit().stream()
764-
.map(PaimonSinkITCase::correctCheckpointId)
769+
.map(this::correctCheckpointId)
765770
.map(MockCommitRequestImpl::new)
766771
.collect(Collectors.toList());
767772
committer.commit(commitRequests);
768773
}
769774

770-
private static void writeAndCommit(
775+
private void writeAndCommit(
771776
PaimonWriter<Event> writer, Committer<MultiTableCommittable> committer, Event... events)
772777
throws IOException, InterruptedException {
773778
for (Event event : events) {
@@ -777,7 +782,7 @@ private static void writeAndCommit(
777782
commit(writer, committer);
778783
}
779784

780-
private static void writeAndCommit(
785+
private void writeAndCommit(
781786
BucketAssignOperator bucketAssignOperator,
782787
PaimonWriter<Event> writer,
783788
Committer<MultiTableCommittable> committer,
@@ -868,7 +873,7 @@ public void testDuplicateCommitAfterRestore(String metastore, boolean enableDele
868873
writer.flush(false);
869874
Collection<Committer.CommitRequest<MultiTableCommittable>> commitRequests =
870875
writer.prepareCommit().stream()
871-
.map(PaimonSinkITCase::correctCheckpointId)
876+
.map(this::correctCheckpointId)
872877
.map(MockCommitRequestImpl::new)
873878
.collect(Collectors.toList());
874879
committer.commit(commitRequests);
@@ -900,7 +905,7 @@ public void testDuplicateCommitAfterRestore(String metastore, boolean enableDele
900905
// Checkpoint id start from 1
901906
committer.commit(
902907
writer.prepareCommit().stream()
903-
.map(PaimonSinkITCase::correctCheckpointId)
908+
.map(this::correctCheckpointId)
904909
.map(MockCommitRequestImpl::new)
905910
.collect(Collectors.toList()));
906911
}
@@ -1039,7 +1044,7 @@ private void runJobWithEvents(List<Event> events, boolean isBatchMode) throws Ex
10391044
env.execute("runJobWithEvents").getJobExecutionResult();
10401045
}
10411046

1042-
private static MultiTableCommittable correctCheckpointId(MultiTableCommittable committable) {
1047+
private MultiTableCommittable correctCheckpointId(MultiTableCommittable committable) {
10431048
// update the right checkpointId for MultiTableCommittable
10441049
return new MultiTableCommittable(
10451050
committable.getDatabase(),

0 commit comments

Comments
 (0)