@@ -217,6 +217,7 @@ private static void setParamsForOlapTableSink(List<PipelineDistributedPlan> dist
217217 Map <DistributedPlanWorker , TPipelineFragmentParamsList > fragmentsGroupByWorker ,
218218 CoordinatorContext coordinatorContext ) {
219219 int numBackendsWithSink = 0 ;
220+ int totalSinkInstances = 0 ;
220221 for (PipelineDistributedPlan distributedPlan : distributedPlans ) {
221222 PlanFragment fragment = distributedPlan .getFragmentJob ().getFragment ();
222223 if (fragment .getSink () instanceof OlapTableSink ) {
@@ -225,11 +226,13 @@ private static void setParamsForOlapTableSink(List<PipelineDistributedPlan> dist
225226 .map (AssignedJob ::getAssignedWorker )
226227 .distinct ()
227228 .count ();
229+ totalSinkInstances += distributedPlan .getInstanceJobs ().size ();
228230 }
229231 }
230232
231233 ConnectContext connectContext = coordinatorContext .connectContext ;
232234 for (Entry <DistributedPlanWorker , TPipelineFragmentParamsList > kv : fragmentsGroupByWorker .entrySet ()) {
235+ DistributedPlanWorker worker = kv .getKey ();
233236 TPipelineFragmentParamsList fragments = kv .getValue ();
234237 for (TPipelineFragmentParams fragmentParams : fragments .getParamsList ()) {
235238 if (fragmentParams .getFragment ().getOutputSink ().getType () == TDataSinkType .OLAP_TABLE_SINK ) {
@@ -242,8 +245,130 @@ private static void setParamsForOlapTableSink(List<PipelineDistributedPlan> dist
242245 fragmentParams .setNumLocalSink (fragmentParams .getLocalParams ().size ());
243246 LOG .info ("num local sink for backend {} is {}" , fragmentParams .getBackendId (),
244247 fragmentParams .getNumLocalSink ());
248+
249+ // Assign per-BE starting bucket and bucket_be_id for random distribution
250+ assignRandomBucketPerBe (fragmentParams , worker .id (),
251+ numBackendsWithSink , totalSinkInstances );
252+ }
253+ }
254+ }
255+ }
256+
257+ /**
258+ * For random-distribution partitions, override load_tablet_idx and set bucket_be_id /
259+ * local_bucket_seqs so that each BE starts writing to one of its own local buckets and
260+ * can rotate within them on threshold.
261+ *
262+ * <p>The load_tablet_idx already stored in each TOlapTablePartition (set during
263+ * OlapTableSink.createPartitionParam) is re-used as a rotation offset so that consecutive
264+ * load tasks spread across different local buckets of the same BE.
265+ *
266+ * <p>The number of local buckets assigned per BE is capped by CalculateBucketNum:
267+ * <pre>activeBuckets = min(numBackendsWithSink, totalBucketNum, totalSinkInstances)</pre>
268+ * This prevents low-concurrency loads (e.g. stream load with a single instance) from
269+ * scattering data across more buckets than necessary.
270+ */
271+ private static void assignRandomBucketPerBe (TPipelineFragmentParams fragmentParams , long beId ,
272+ int numBackendsWithSink , int totalSinkInstances ) {
273+ org .apache .doris .thrift .TDataSink outputSink = fragmentParams .getFragment ().getOutputSink ();
274+ org .apache .doris .thrift .TOlapTableSink olapSink = outputSink .getOlapTableSink ();
275+ if (olapSink == null || olapSink .getPartition () == null
276+ || !olapSink .getPartition ().isSetPartitions ()) {
277+ return ;
278+ }
279+
280+ // Check whether any partition uses random distribution (identified by load_tablet_idx being set)
281+ boolean hasRandomPartition = olapSink .getPartition ().getPartitions ().stream ()
282+ .anyMatch (org .apache .doris .thrift .TOlapTablePartition ::isSetLoadTabletIdx );
283+ if (!hasRandomPartition ) {
284+ return ;
285+ }
286+
287+ // Build tablet_id -> BE IDs index from location
288+ org .apache .doris .thrift .TOlapTableLocationParam location = olapSink .getLocation ();
289+ if (location == null || !location .isSetTablets ()) {
290+ return ;
291+ }
292+ Map <Long , List <Long >> tabletToBeIds = new java .util .HashMap <>();
293+ for (org .apache .doris .thrift .TTabletLocation tabletLoc : location .getTablets ()) {
294+ tabletToBeIds .put (tabletLoc .getTabletId (), tabletLoc .getNodeIds ());
295+ }
296+
297+ // Deep-copy only the OlapTableSink so that per-BE modifications are isolated
298+ org .apache .doris .thrift .TOlapTableSink sinkCopy = olapSink .deepCopy ();
299+ outputSink .setOlapTableSink (sinkCopy );
300+
301+ for (org .apache .doris .thrift .TOlapTablePartition tPartition
302+ : sinkCopy .getPartition ().getPartitions ()) {
303+ if (!tPartition .isSetLoadTabletIdx ()) {
304+ continue ; // hash distribution – leave as-is
305+ }
306+
307+ int tabletIndex = (int ) tPartition .getLoadTabletIdx ();
308+ List <org .apache .doris .thrift .TOlapTableIndexTablets > indexes = tPartition .getIndexes ();
309+ if (indexes == null || indexes .isEmpty ()) {
310+ continue ;
311+ }
312+ List <Long > tablets = indexes .get (0 ).getTablets ();
313+ int numBuckets = tPartition .getNumBuckets ();
314+
315+ // Collect bucket indices assigned to this BE.
316+ //
317+ // For single-replica tablets, each bucket has exactly one entry in node_ids,
318+ // so the bucket naturally belongs to one BE.
319+ //
320+ // For multi-replica tablets, multiple BEs host the same bucket. To avoid all BEs
321+ // writing to the same bucket(s), we assign each bucket to exactly one BE using a
322+ // deterministic hash rule: sort node_ids, then give bucket[i] to the BE at position
323+ // (i % numReplicas) in the sorted list. This spreads buckets evenly across replica BEs
324+ // and is computed consistently on every BE without coordination.
325+ List <Integer > localBuckets = new ArrayList <>();
326+ for (int bucketIdx = 0 ; bucketIdx < tablets .size (); bucketIdx ++) {
327+ List <Long > beIds = tabletToBeIds .get (tablets .get (bucketIdx ));
328+ if (beIds == null || beIds .isEmpty ()) {
329+ continue ;
245330 }
331+ if (beIds .size () == 1 ) {
332+ // Single replica: the only BE is the owner
333+ if (beIds .get (0 ).equals (beId )) {
334+ localBuckets .add (bucketIdx );
335+ }
336+ } else {
337+ // Multi-replica: assign via hash so each bucket has exactly one owner BE
338+ List <Long > sorted = beIds .stream ()
339+ .sorted ()
340+ .collect (java .util .stream .Collectors .toList ());
341+ int pos = sorted .indexOf (beId );
342+ if (pos >= 0 && bucketIdx % sorted .size () == pos ) {
343+ localBuckets .add (bucketIdx );
344+ }
345+ }
346+ }
347+
348+ // CalculateBucketNum = min(numBackendsWithSink, totalBucketNum, totalSinkInstances)
349+ // determines how many distinct buckets are written to across all sinks.
350+ // Each BE gets at most ceil(activeBuckets / numBackendsWithSink) local buckets.
351+ // Example: stream load (totalSinkInstances=1, numBackendsWithSink=1, numBuckets=6)
352+ // → activeBuckets = 1, maxPerBe = 1 → only 1 bucket, no switching
353+ int activeBuckets = Math .min (numBackendsWithSink ,
354+ Math .min (numBuckets , totalSinkInstances ));
355+ int maxLocalBucketsPerBe = Math .max (1 ,
356+ (activeBuckets + numBackendsWithSink - 1 ) / numBackendsWithSink );
357+ if (localBuckets .size () > maxLocalBucketsPerBe ) {
358+ localBuckets = localBuckets .subList (0 , maxLocalBucketsPerBe );
246359 }
360+
361+ int loadTabletIdx ;
362+ if (!localBuckets .isEmpty ()) {
363+ loadTabletIdx = localBuckets .get (tabletIndex % localBuckets .size ());
364+ } else {
365+ // Fallback: BE has no local replica for this partition
366+ loadTabletIdx = tabletIndex % numBuckets ;
367+ }
368+
369+ tPartition .setLoadTabletIdx (loadTabletIdx );
370+ tPartition .setBucketBeId (beId );
371+ tPartition .setLocalBucketSeqs (localBuckets );
247372 }
248373 }
249374
0 commit comments