diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java index b2a14a2c420..bed9608a585 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java @@ -336,6 +336,9 @@ public DataChangeEvent convertDataChangeEvent(DataChangeEvent dataChangeEvent) long targetRowNum = table.coreOptions().dynamicBucketTargetRowNum(); Integer numAssigners = table.coreOptions().dynamicBucketInitialBuckets(); Integer maxBucketsNum = table.coreOptions().dynamicBucketMaxBuckets(); + int minAssigners = MathUtils.min(numAssigners, totalTasksNumber); + int assignId = minAssigners == 1 ? 0 : currentTaskNumber; + int numChannels = minAssigners == 1 ? 1 : totalTasksNumber; LOGGER.debug("Successfully get table info {}", table); return new Tuple4<>( table.bucketMode(), @@ -344,9 +347,9 @@ public DataChangeEvent convertDataChangeEvent(DataChangeEvent dataChangeEvent) table.snapshotManager(), commitUser, table.store().newIndexFileHandler(), - totalTasksNumber, - MathUtils.min(numAssigners, totalTasksNumber), - currentTaskNumber, + numChannels, + minAssigners, + assignId, targetRowNum, maxBucketsNum), new RowPartitionKeyExtractor(table.schema()));