diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java index d3277e3aa11..53764cf5a30 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java @@ -133,7 +133,7 @@ public class PaimonSinkITCase { private final TableId table1 = TableId.tableId("test", "table1"); private final TableId table2 = TableId.tableId("test", "table2"); - private static int checkpointId = 1; + private int checkpointId = 1; public static final String TEST_DATABASE = "test"; private static final String HADOOP_CONF_DIR = @@ -756,18 +756,17 @@ public void testSinkWithMultiTables(String metastore, boolean enableDeleteVector .containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, "1", "1")); } - private static void commit( - PaimonWriter writer, Committer committer) + private void commit(PaimonWriter writer, Committer committer) throws IOException, InterruptedException { Collection> commitRequests = writer.prepareCommit().stream() - .map(PaimonSinkITCase::correctCheckpointId) + .map(this::correctCheckpointId) .map(MockCommitRequestImpl::new) .collect(Collectors.toList()); committer.commit(commitRequests); } - private static void writeAndCommit( + private void writeAndCommit( PaimonWriter writer, Committer committer, Event... events) throws IOException, InterruptedException { for (Event event : events) { @@ -777,7 +776,7 @@ private static void writeAndCommit( commit(writer, committer); } - private static void writeAndCommit( + private void writeAndCommit( BucketAssignOperator bucketAssignOperator, PaimonWriter writer, Committer committer, @@ -868,7 +867,7 @@ public void testDuplicateCommitAfterRestore(String metastore, boolean enableDele writer.flush(false); Collection> commitRequests = writer.prepareCommit().stream() - .map(PaimonSinkITCase::correctCheckpointId) + .map(this::correctCheckpointId) .map(MockCommitRequestImpl::new) .collect(Collectors.toList()); committer.commit(commitRequests); @@ -900,7 +899,7 @@ public void testDuplicateCommitAfterRestore(String metastore, boolean enableDele // Checkpoint id start from 1 committer.commit( writer.prepareCommit().stream() - .map(PaimonSinkITCase::correctCheckpointId) + .map(this::correctCheckpointId) .map(MockCommitRequestImpl::new) .collect(Collectors.toList())); } @@ -1039,7 +1038,7 @@ private void runJobWithEvents(List events, boolean isBatchMode) throws Ex env.execute("runJobWithEvents").getJobExecutionResult(); } - private static MultiTableCommittable correctCheckpointId(MultiTableCommittable committable) { + private MultiTableCommittable correctCheckpointId(MultiTableCommittable committable) { // update the right checkpointId for MultiTableCommittable return new MultiTableCommittable( committable.getDatabase(),