Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public class PaimonSinkITCase {
private final TableId table1 = TableId.tableId("test", "table1");
private final TableId table2 = TableId.tableId("test", "table2");

private static int checkpointId = 1;
private int checkpointId = 1;

public static final String TEST_DATABASE = "test";
private static final String HADOOP_CONF_DIR =
Expand Down Expand Up @@ -756,18 +756,17 @@ public void testSinkWithMultiTables(String metastore, boolean enableDeleteVector
.containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, "1", "1"));
}

private static void commit(
PaimonWriter<Event> writer, Committer<MultiTableCommittable> committer)
private void commit(PaimonWriter<Event> writer, Committer<MultiTableCommittable> committer)
throws IOException, InterruptedException {
Collection<Committer.CommitRequest<MultiTableCommittable>> commitRequests =
writer.prepareCommit().stream()
.map(PaimonSinkITCase::correctCheckpointId)
.map(this::correctCheckpointId)
.map(MockCommitRequestImpl::new)
.collect(Collectors.toList());
committer.commit(commitRequests);
}

private static void writeAndCommit(
private void writeAndCommit(
PaimonWriter<Event> writer, Committer<MultiTableCommittable> committer, Event... events)
throws IOException, InterruptedException {
for (Event event : events) {
Expand All @@ -777,7 +776,7 @@ private static void writeAndCommit(
commit(writer, committer);
}

private static void writeAndCommit(
private void writeAndCommit(
BucketAssignOperator bucketAssignOperator,
PaimonWriter<Event> writer,
Committer<MultiTableCommittable> committer,
Expand Down Expand Up @@ -868,7 +867,7 @@ public void testDuplicateCommitAfterRestore(String metastore, boolean enableDele
writer.flush(false);
Collection<Committer.CommitRequest<MultiTableCommittable>> commitRequests =
writer.prepareCommit().stream()
.map(PaimonSinkITCase::correctCheckpointId)
.map(this::correctCheckpointId)
.map(MockCommitRequestImpl::new)
.collect(Collectors.toList());
committer.commit(commitRequests);
Expand Down Expand Up @@ -900,7 +899,7 @@ public void testDuplicateCommitAfterRestore(String metastore, boolean enableDele
// Checkpoint id start from 1
committer.commit(
writer.prepareCommit().stream()
.map(PaimonSinkITCase::correctCheckpointId)
.map(this::correctCheckpointId)
.map(MockCommitRequestImpl::new)
.collect(Collectors.toList()));
}
Expand Down Expand Up @@ -1039,7 +1038,7 @@ private void runJobWithEvents(List<Event> events, boolean isBatchMode) throws Ex
env.execute("runJobWithEvents").getJobExecutionResult();
}

private static MultiTableCommittable correctCheckpointId(MultiTableCommittable committable) {
private MultiTableCommittable correctCheckpointId(MultiTableCommittable committable) {
// update the right checkpointId for MultiTableCommittable
return new MultiTableCommittable(
committable.getDatabase(),
Expand Down
Loading