@@ -230,6 +230,7 @@ private static void setParamsForOlapTableSink(List<PipelineDistributedPlan> dist
230230
231231 ConnectContext connectContext = coordinatorContext .connectContext ;
232232 for (Entry <DistributedPlanWorker , TPipelineFragmentParamsList > kv : fragmentsGroupByWorker .entrySet ()) {
233+ DistributedPlanWorker worker = kv .getKey ();
233234 TPipelineFragmentParamsList fragments = kv .getValue ();
234235 for (TPipelineFragmentParams fragmentParams : fragments .getParamsList ()) {
235236 if (fragmentParams .getFragment ().getOutputSink ().getType () == TDataSinkType .OLAP_TABLE_SINK ) {
@@ -242,11 +243,116 @@ private static void setParamsForOlapTableSink(List<PipelineDistributedPlan> dist
242243 fragmentParams .setNumLocalSink (fragmentParams .getLocalParams ().size ());
243244 LOG .info ("num local sink for backend {} is {}" , fragmentParams .getBackendId (),
244245 fragmentParams .getNumLocalSink ());
246+
247+ // Assign per-BE starting bucket and bucket_be_id for random distribution
248+ assignRandomBucketPerBe (fragmentParams , worker .id ());
245249 }
246250 }
247251 }
248252 }
249253
254+ /**
255+ * For random-distribution partitions, override load_tablet_idx and set bucket_be_id /
256+ * local_bucket_seqs so that each BE starts writing to one of its own local buckets and
257+ * can rotate within them once per-tablet bytes exceed the threshold.
258+ *
259+ * <p>local_bucket_seqs always contains all local buckets for this BE. The starting bucket
260+ * rotates across consecutive load tasks via tabletIndex; within a single load, the BE
261+ * switches to the next entry in the list only when the 200 MB threshold is reached.
262+ * Small loads (e.g. stream load) naturally stay on one bucket because the threshold is
263+ * never triggered.
264+ */
265+ private static void assignRandomBucketPerBe (TPipelineFragmentParams fragmentParams , long beId ) {
266+ org .apache .doris .thrift .TDataSink outputSink = fragmentParams .getFragment ().getOutputSink ();
267+ org .apache .doris .thrift .TOlapTableSink olapSink = outputSink .getOlapTableSink ();
268+ if (olapSink == null || olapSink .getPartition () == null
269+ || !olapSink .getPartition ().isSetPartitions ()) {
270+ return ;
271+ }
272+
273+ // Check whether any partition uses random distribution (identified by load_tablet_idx being set)
274+ boolean hasRandomPartition = olapSink .getPartition ().getPartitions ().stream ()
275+ .anyMatch (org .apache .doris .thrift .TOlapTablePartition ::isSetLoadTabletIdx );
276+ if (!hasRandomPartition ) {
277+ return ;
278+ }
279+
280+ // Build tablet_id -> BE IDs index from location
281+ org .apache .doris .thrift .TOlapTableLocationParam location = olapSink .getLocation ();
282+ if (location == null || !location .isSetTablets ()) {
283+ return ;
284+ }
285+ Map <Long , List <Long >> tabletToBeIds = new java .util .HashMap <>();
286+ for (org .apache .doris .thrift .TTabletLocation tabletLoc : location .getTablets ()) {
287+ tabletToBeIds .put (tabletLoc .getTabletId (), tabletLoc .getNodeIds ());
288+ }
289+
290+ // Deep-copy only the OlapTableSink so that per-BE modifications are isolated
291+ org .apache .doris .thrift .TOlapTableSink sinkCopy = olapSink .deepCopy ();
292+ outputSink .setOlapTableSink (sinkCopy );
293+
294+ for (org .apache .doris .thrift .TOlapTablePartition tPartition
295+ : sinkCopy .getPartition ().getPartitions ()) {
296+ if (!tPartition .isSetLoadTabletIdx ()) {
297+ continue ; // hash distribution – leave as-is
298+ }
299+
300+ int tabletIndex = (int ) tPartition .getLoadTabletIdx ();
301+ List <org .apache .doris .thrift .TOlapTableIndexTablets > indexes = tPartition .getIndexes ();
302+ if (indexes == null || indexes .isEmpty ()) {
303+ continue ;
304+ }
305+ List <Long > tablets = indexes .get (0 ).getTablets ();
306+ int numBuckets = tPartition .getNumBuckets ();
307+
308+ // Collect bucket indices assigned to this BE.
309+ //
310+ // For single-replica tablets, each bucket has exactly one entry in node_ids,
311+ // so the bucket naturally belongs to one BE.
312+ //
313+ // For multi-replica tablets, multiple BEs host the same bucket. To avoid all BEs
314+ // writing to the same bucket(s), we assign each bucket to exactly one BE using a
315+ // deterministic hash rule: sort node_ids, then give bucket[i] to the BE at position
316+ // (i % numReplicas) in the sorted list. This spreads buckets evenly across replica BEs
317+ // and is computed consistently on every BE without coordination.
318+ List <Integer > localBuckets = new ArrayList <>();
319+ for (int bucketIdx = 0 ; bucketIdx < tablets .size (); bucketIdx ++) {
320+ List <Long > beIds = tabletToBeIds .get (tablets .get (bucketIdx ));
321+ if (beIds == null || beIds .isEmpty ()) {
322+ continue ;
323+ }
324+ if (beIds .size () == 1 ) {
325+ // Single replica: the only BE is the owner
326+ if (beIds .get (0 ).equals (beId )) {
327+ localBuckets .add (bucketIdx );
328+ }
329+ } else {
330+ // Multi-replica: assign via hash so each bucket has exactly one owner BE
331+ List <Long > sorted = beIds .stream ()
332+ .sorted ()
333+ .collect (java .util .stream .Collectors .toList ());
334+ int pos = sorted .indexOf (beId );
335+ if (pos >= 0 && bucketIdx % sorted .size () == pos ) {
336+ localBuckets .add (bucketIdx );
337+ }
338+ }
339+ }
340+
341+ if (!localBuckets .isEmpty ()) {
342+ int loadTabletIdx = localBuckets .get (tabletIndex % localBuckets .size ());
343+ tPartition .setLoadTabletIdx (loadTabletIdx );
344+ tPartition .setBucketBeId (beId );
345+ // Send the full local bucket list; BE rotates through it in order when the
346+ // per-tablet write threshold is reached.
347+ tPartition .setLocalBucketSeqs (localBuckets );
348+ } else {
349+ // Fallback: BE has no local replica for this partition
350+ tPartition .setLoadTabletIdx (tabletIndex % numBuckets );
351+ // Leave bucket_be_id / local_bucket_seqs unset → no switching
352+ }
353+ }
354+ }
355+
250356 private static Multiset <DistributedPlanWorker > computeInstanceNumPerWorker (
251357 List <PipelineDistributedPlan > distributedPlans ) {
252358 Multiset <DistributedPlanWorker > workerCounter = LinkedHashMultiset .create ();
0 commit comments