diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSink.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSink.java index 5d28fe09181..068d908875c 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSink.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSink.java @@ -98,7 +98,12 @@ public StatefulSinkWriter restoreWriter( context.getRestoredCheckpointId() .orElse(CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1); Preconditions.checkNotNull(paimonWriterStates); - String storedCommitUser = paimonWriterStates.iterator().next().getCommitUser(); + String storedCommitUser; + if (paimonWriterStates.isEmpty()) { + storedCommitUser = this.commitUser; + } else { + storedCommitUser = paimonWriterStates.iterator().next().getCommitUser(); + } return new PaimonWriter<>( catalogOptions, context.metricGroup(),