Skip to content

Commit f7e1c09

Browse files
committed
update
1 parent a7d65e6 commit f7e1c09

1 file changed

Lines changed: 19 additions & 2 deletions

File tree

fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -662,12 +662,15 @@ public static Map<Long, Map<Long, AdaptiveBucketAssignment>> computeAdaptiveRand
662662
+ "sinkBackendId={}, selectedBucketSeqs={}, beToBucketSeqs={}",
663663
bucketSeq, partition.getId(), sinkBackendId, selectedBucketSeqs, beToBucketSeqs);
664664
}
665+
List<Integer> localBucketSeqs = rotateBucketSeqsForStartBucket(
666+
beToBucketSeqs.get(bucketBeId), bucketSeq);
665667
assignments.get(sinkBackendId).put(partition.getId(),
666-
new AdaptiveBucketAssignment(bucketBeId, bucketSeq, Collections.singletonList(bucketSeq)));
668+
new AdaptiveBucketAssignment(bucketBeId, bucketSeq, localBucketSeqs));
667669
bucketUseCounts.merge(bucketSeq, 1, Integer::sum);
668670
if (sinkAssignments != null) {
669671
sinkAssignments.put(sinkBackendId,
670-
"bucket=" + bucketSeq + ",bucketBeId=" + bucketBeId);
672+
"bucket=" + bucketSeq + ",bucketBeId=" + bucketBeId
673+
+ ",localBucketSeqs=" + localBucketSeqs);
671674
}
672675
}
673676
if (sinkAssignments != null) {
@@ -835,6 +838,20 @@ private static int selectLeastUsedBucketSeq(List<Integer> bucketSeqs, Map<Intege
835838
return selectedBucketSeq;
836839
}
837840

841+
private static List<Integer> rotateBucketSeqsForStartBucket(List<Integer> bucketSeqs, int startBucketSeq) {
842+
if (bucketSeqs == null || bucketSeqs.isEmpty()) {
843+
return Collections.singletonList(startBucketSeq);
844+
}
845+
int startIdx = bucketSeqs.indexOf(startBucketSeq);
846+
Preconditions.checkState(startIdx >= 0,
847+
"start bucket %s must exist in bucketSeqs %s", startBucketSeq, bucketSeqs);
848+
List<Integer> rotatedBucketSeqs = new ArrayList<>(bucketSeqs.size());
849+
for (int offset = 0; offset < bucketSeqs.size(); offset++) {
850+
rotatedBucketSeqs.add(bucketSeqs.get((startIdx + offset) % bucketSeqs.size()));
851+
}
852+
return rotatedBucketSeqs;
853+
}
854+
838855
private TOlapTablePartitionParam createPartition(long dbId, OlapTable table)
839856
throws UserException {
840857
TOlapTablePartitionParam partitionParam = new TOlapTablePartitionParam();

0 commit comments

Comments
 (0)