From 6a5e35ba37fe7e96e68cdae4e4169a37e4dc4d80 Mon Sep 17 00:00:00 2001 From: zhaoruifeng01 <130582631+zhaoruifeng01@users.noreply.github.com> Date: Thu, 26 Mar 2026 17:00:34 +0800 Subject: [PATCH] Fixes a bug where Flink CDC fails to write to Paimon dynamic bucket tables when dynamic-bucket.initial-buckets=1 and Flink parallelism > 1 --- .../paimon/sink/v2/bucket/BucketAssignOperator.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java index b2a14a2c420..bed9608a585 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java @@ -336,6 +336,9 @@ public DataChangeEvent convertDataChangeEvent(DataChangeEvent dataChangeEvent) long targetRowNum = table.coreOptions().dynamicBucketTargetRowNum(); Integer numAssigners = table.coreOptions().dynamicBucketInitialBuckets(); Integer maxBucketsNum = table.coreOptions().dynamicBucketMaxBuckets(); + int minAssigners = MathUtils.min(numAssigners, totalTasksNumber); + int assignId = minAssigners == 1 ? 0 : currentTaskNumber; + int numChannels = minAssigners == 1 ? 1 : totalTasksNumber; LOGGER.debug("Successfully get table info {}", table); return new Tuple4<>( table.bucketMode(), @@ -344,9 +347,9 @@ public DataChangeEvent convertDataChangeEvent(DataChangeEvent dataChangeEvent) table.snapshotManager(), commitUser, table.store().newIndexFileHandler(), - totalTasksNumber, - MathUtils.min(numAssigners, totalTasksNumber), - currentTaskNumber, + numChannels, + minAssigners, + assignId, targetRowNum, maxBucketsNum), new RowPartitionKeyExtractor(table.schema()));