Skip to content

Commit 35540cb

Browse files
committed
adaptive random bucket load: per-BE bucket assignment with threshold-based rotation
1 parent c494892 commit 35540cb

8 files changed

Lines changed: 181 additions & 10 deletions

File tree

be/src/exec/sink/vtablet_finder.cpp

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <gen_cpp/FrontendService_types.h>
2323
#include <glog/logging.h>
2424

25+
#include <algorithm>
2526
#include <string>
2627
#include <utility>
2728

@@ -82,8 +83,39 @@ Status OlapTabletFinder::find_tablets(RuntimeState* state, Block* block, int row
8283

8384
if (_find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_ROW) {
8485
_vpartition->find_tablets(block, qualified_rows, partitions, tablet_index);
86+
} else if (_find_tablet_mode == FindTabletMode::FIND_TABLET_RANDOM_BUCKET) {
87+
// Use per-batch caching same as FIND_TABLET_EVERY_BATCH
88+
_vpartition->find_tablets(block, qualified_rows, partitions, tablet_index,
89+
&_partition_to_tablet_map);
90+
91+
int64_t bytes_per_row = rows > 0 ? block->bytes() / rows : 0;
92+
for (auto& [partition, _] : _partition_to_tablet_map) {
93+
if (partition->indexes.empty()) {
94+
continue;
95+
}
96+
int64_t cur_bucket = partition->load_tablet_idx % partition->num_buckets;
97+
int64_t cur_tablet_id = partition->indexes[0].tablets[cur_bucket];
98+
_tablet_written_bytes[cur_tablet_id] += bytes_per_row * rows;
99+
100+
// Switch to the next local bucket when the threshold is reached.
101+
// local_bucket_seqs carries the exact set of buckets assigned to this BE by the FE,
102+
// which handles both single-replica and multi-replica correctly.
103+
const auto& seqs = partition->local_bucket_seqs;
104+
if (_tablet_written_bytes[cur_tablet_id] >= BUCKET_SWITCH_THRESHOLD_BYTES &&
105+
!seqs.empty()) {
106+
auto it = std::find(seqs.begin(), seqs.end(), static_cast<int32_t>(cur_bucket));
107+
if (it != seqs.end()) {
108+
++it;
109+
if (it == seqs.end()) {
110+
it = seqs.begin();
111+
}
112+
partition->load_tablet_idx = *it;
113+
}
114+
}
115+
}
116+
_partition_to_tablet_map.clear();
85117
} else {
86-
// for random distribution
118+
// FIND_TABLET_EVERY_BATCH / FIND_TABLET_EVERY_SINK
87119
_vpartition->find_tablets(block, qualified_rows, partitions, tablet_index,
88120
&_partition_to_tablet_map);
89121
if (_find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_BATCH) {

be/src/exec/sink/vtablet_finder.h

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,21 @@ class OlapTabletFinder {
3737
// FIND_TABLET_EVERY_SINK is used for random distribution info when load_to_single_tablet set to true,
3838
// which indicates that we should only compute tablet index in the corresponding partition once for the
3939
// whole time in olap table sink
40-
enum FindTabletMode { FIND_TABLET_EVERY_ROW, FIND_TABLET_EVERY_BATCH, FIND_TABLET_EVERY_SINK };
41-
42-
OlapTabletFinder(VOlapTablePartitionParam* vpartition, FindTabletMode mode)
43-
: _vpartition(vpartition), _find_tablet_mode(mode), _filter_bitmap(1024) {};
40+
// FIND_TABLET_RANDOM_BUCKET is used for random distribution info where FE assigns a starting
41+
// bucket per BE; BE rotates within its local buckets once per-tablet bytes exceed a threshold
42+
enum FindTabletMode {
43+
FIND_TABLET_EVERY_ROW,
44+
FIND_TABLET_EVERY_BATCH,
45+
FIND_TABLET_EVERY_SINK,
46+
FIND_TABLET_RANDOM_BUCKET
47+
};
48+
49+
OlapTabletFinder(VOlapTablePartitionParam* vpartition, FindTabletMode mode,
50+
OlapTableLocationParam* location = nullptr)
51+
: _vpartition(vpartition),
52+
_find_tablet_mode(mode),
53+
_location(location),
54+
_filter_bitmap(1024) {};
4455

4556
Status find_tablets(RuntimeState* state, Block* block, int rows,
4657
std::vector<VOlapTablePartition*>& partitions,
@@ -64,11 +75,18 @@ class OlapTabletFinder {
6475

6576
Bitmap& filter_bitmap() { return _filter_bitmap; }
6677

78+
// Threshold for switching to the next local bucket in FIND_TABLET_RANDOM_BUCKET mode.
79+
static constexpr int64_t BUCKET_SWITCH_THRESHOLD_BYTES = 200LL * 1024 * 1024;
80+
6781
private:
6882
VOlapTablePartitionParam* _vpartition = nullptr;
6983
FindTabletMode _find_tablet_mode;
84+
// Reserved: was used for BE-side local-bucket lookup; FE now sends local_bucket_seqs directly
85+
OlapTableLocationParam* _location = nullptr;
7086
std::map<VOlapTablePartition*, int64_t> _partition_to_tablet_map;
7187
flat_hash_set<int64_t> _partition_ids;
88+
// tablet_id -> bytes written so far; used to decide when to rotate to the next local bucket
89+
std::map<int64_t, int64_t> _tablet_written_bytes;
7290

7391
int64_t _num_filtered_rows = 0;
7492
int64_t _num_immutable_partition_filtered_rows = 0;

be/src/exec/sink/writer/vtablet_writer.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1546,11 +1546,11 @@ Status VTabletWriter::_init(RuntimeState* state, RuntimeProfile* profile) {
15461546
if (table_sink.__isset.load_to_single_tablet && table_sink.load_to_single_tablet) {
15471547
find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_SINK;
15481548
} else {
1549-
find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_BATCH;
1549+
find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_RANDOM_BUCKET;
15501550
}
15511551
}
15521552
_vpartition = _pool->add(new doris::VOlapTablePartitionParam(_schema, table_sink.partition));
1553-
_tablet_finder = std::make_unique<OlapTabletFinder>(_vpartition, find_tablet_mode);
1553+
_tablet_finder = std::make_unique<OlapTabletFinder>(_vpartition, find_tablet_mode, _location);
15541554
RETURN_IF_ERROR(_vpartition->init());
15551555

15561556
_state = state;

be/src/exec/sink/writer/vtablet_writer_v2.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,11 +166,11 @@ Status VTabletWriterV2::_init(RuntimeState* state, RuntimeProfile* profile) {
166166
if (table_sink.__isset.load_to_single_tablet && table_sink.load_to_single_tablet) {
167167
find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_SINK;
168168
} else {
169-
find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_BATCH;
169+
find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_RANDOM_BUCKET;
170170
}
171171
}
172172
_vpartition = _pool->add(new doris::VOlapTablePartitionParam(_schema, table_sink.partition));
173-
_tablet_finder = std::make_unique<OlapTabletFinder>(_vpartition, find_tablet_mode);
173+
_tablet_finder = std::make_unique<OlapTabletFinder>(_vpartition, find_tablet_mode, _location);
174174
RETURN_IF_ERROR(_vpartition->init());
175175

176176
_state = state;

be/src/storage/tablet_info.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -700,10 +700,15 @@ Status VOlapTablePartitionParam::generate_partition_from(const TOlapTablePartiti
700700
part_result = _obj_pool.add(new VOlapTablePartition(&_partition_block));
701701
part_result->id = t_part.id;
702702
part_result->is_mutable = t_part.is_mutable;
703-
// only load_to_single_tablet = true will set load_tablet_idx
704703
if (t_part.__isset.load_tablet_idx) {
705704
part_result->load_tablet_idx = t_part.load_tablet_idx;
706705
}
706+
if (t_part.__isset.bucket_be_id) {
707+
part_result->bucket_be_id = t_part.bucket_be_id;
708+
}
709+
if (t_part.__isset.local_bucket_seqs) {
710+
part_result->local_bucket_seqs = t_part.local_bucket_seqs;
711+
}
707712

708713
if (_is_in_partition) {
709714
for (const auto& keys : t_part.in_keys) {

be/src/storage/tablet_info.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,11 @@ struct VOlapTablePartition {
164164
bool is_mutable;
165165
// -1 indicates partition with hash distribution
166166
int64_t load_tablet_idx = -1;
167+
// BE ID assigned by FE for local bucket rotation; -1 means not set
168+
int64_t bucket_be_id = -1;
169+
// Bucket indices (0-based) that FE assigned to this BE for rotation.
170+
// Empty means not set (fallback to full round-robin).
171+
std::vector<int32_t> local_bucket_seqs;
167172
int total_replica_num = 0;
168173
int load_required_replica_num = 0;
169174
// tablet_id -> set of backend_ids that have version gaps

fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ private static void setParamsForOlapTableSink(List<PipelineDistributedPlan> dist
230230

231231
ConnectContext connectContext = coordinatorContext.connectContext;
232232
for (Entry<DistributedPlanWorker, TPipelineFragmentParamsList> kv : fragmentsGroupByWorker.entrySet()) {
233+
DistributedPlanWorker worker = kv.getKey();
233234
TPipelineFragmentParamsList fragments = kv.getValue();
234235
for (TPipelineFragmentParams fragmentParams : fragments.getParamsList()) {
235236
if (fragmentParams.getFragment().getOutputSink().getType() == TDataSinkType.OLAP_TABLE_SINK) {
@@ -242,11 +243,116 @@ private static void setParamsForOlapTableSink(List<PipelineDistributedPlan> dist
242243
fragmentParams.setNumLocalSink(fragmentParams.getLocalParams().size());
243244
LOG.info("num local sink for backend {} is {}", fragmentParams.getBackendId(),
244245
fragmentParams.getNumLocalSink());
246+
247+
// Assign per-BE starting bucket and bucket_be_id for random distribution
248+
assignRandomBucketPerBe(fragmentParams, worker.id());
245249
}
246250
}
247251
}
248252
}
249253

254+
/**
255+
* For random-distribution partitions, override load_tablet_idx and set bucket_be_id /
256+
* local_bucket_seqs so that each BE starts writing to one of its own local buckets and
257+
* can rotate within them once per-tablet bytes exceed the threshold.
258+
*
259+
* <p>local_bucket_seqs always contains all local buckets for this BE. The starting bucket
260+
* rotates across consecutive load tasks via tabletIndex; within a single load, the BE
261+
* switches to the next entry in the list only when the 200 MB threshold is reached.
262+
* Small loads (e.g. stream load) naturally stay on one bucket because the threshold is
263+
* never triggered.
264+
*/
265+
private static void assignRandomBucketPerBe(TPipelineFragmentParams fragmentParams, long beId) {
266+
org.apache.doris.thrift.TDataSink outputSink = fragmentParams.getFragment().getOutputSink();
267+
org.apache.doris.thrift.TOlapTableSink olapSink = outputSink.getOlapTableSink();
268+
if (olapSink == null || olapSink.getPartition() == null
269+
|| !olapSink.getPartition().isSetPartitions()) {
270+
return;
271+
}
272+
273+
// Check whether any partition uses random distribution (identified by load_tablet_idx being set)
274+
boolean hasRandomPartition = olapSink.getPartition().getPartitions().stream()
275+
.anyMatch(org.apache.doris.thrift.TOlapTablePartition::isSetLoadTabletIdx);
276+
if (!hasRandomPartition) {
277+
return;
278+
}
279+
280+
// Build tablet_id -> BE IDs index from location
281+
org.apache.doris.thrift.TOlapTableLocationParam location = olapSink.getLocation();
282+
if (location == null || !location.isSetTablets()) {
283+
return;
284+
}
285+
Map<Long, List<Long>> tabletToBeIds = new java.util.HashMap<>();
286+
for (org.apache.doris.thrift.TTabletLocation tabletLoc : location.getTablets()) {
287+
tabletToBeIds.put(tabletLoc.getTabletId(), tabletLoc.getNodeIds());
288+
}
289+
290+
// Deep-copy only the OlapTableSink so that per-BE modifications are isolated
291+
org.apache.doris.thrift.TOlapTableSink sinkCopy = olapSink.deepCopy();
292+
outputSink.setOlapTableSink(sinkCopy);
293+
294+
for (org.apache.doris.thrift.TOlapTablePartition tPartition
295+
: sinkCopy.getPartition().getPartitions()) {
296+
if (!tPartition.isSetLoadTabletIdx()) {
297+
continue; // hash distribution – leave as-is
298+
}
299+
300+
int tabletIndex = (int) tPartition.getLoadTabletIdx();
301+
List<org.apache.doris.thrift.TOlapTableIndexTablets> indexes = tPartition.getIndexes();
302+
if (indexes == null || indexes.isEmpty()) {
303+
continue;
304+
}
305+
List<Long> tablets = indexes.get(0).getTablets();
306+
int numBuckets = tPartition.getNumBuckets();
307+
308+
// Collect bucket indices assigned to this BE.
309+
//
310+
// For single-replica tablets, each bucket has exactly one entry in node_ids,
311+
// so the bucket naturally belongs to one BE.
312+
//
313+
// For multi-replica tablets, multiple BEs host the same bucket. To avoid all BEs
314+
// writing to the same bucket(s), we assign each bucket to exactly one BE using a
315+
// deterministic hash rule: sort node_ids, then give bucket[i] to the BE at position
316+
// (i % numReplicas) in the sorted list. This spreads buckets evenly across replica BEs
317+
// and is computed consistently on every BE without coordination.
318+
List<Integer> localBuckets = new ArrayList<>();
319+
for (int bucketIdx = 0; bucketIdx < tablets.size(); bucketIdx++) {
320+
List<Long> beIds = tabletToBeIds.get(tablets.get(bucketIdx));
321+
if (beIds == null || beIds.isEmpty()) {
322+
continue;
323+
}
324+
if (beIds.size() == 1) {
325+
// Single replica: the only BE is the owner
326+
if (beIds.get(0).equals(beId)) {
327+
localBuckets.add(bucketIdx);
328+
}
329+
} else {
330+
// Multi-replica: assign via hash so each bucket has exactly one owner BE
331+
List<Long> sorted = beIds.stream()
332+
.sorted()
333+
.collect(java.util.stream.Collectors.toList());
334+
int pos = sorted.indexOf(beId);
335+
if (pos >= 0 && bucketIdx % sorted.size() == pos) {
336+
localBuckets.add(bucketIdx);
337+
}
338+
}
339+
}
340+
341+
if (!localBuckets.isEmpty()) {
342+
int loadTabletIdx = localBuckets.get(tabletIndex % localBuckets.size());
343+
tPartition.setLoadTabletIdx(loadTabletIdx);
344+
tPartition.setBucketBeId(beId);
345+
// Send the full local bucket list; BE rotates through it in order when the
346+
// per-tablet write threshold is reached.
347+
tPartition.setLocalBucketSeqs(localBuckets);
348+
} else {
349+
// Fallback: BE has no local replica for this partition
350+
tPartition.setLoadTabletIdx(tabletIndex % numBuckets);
351+
// Leave bucket_be_id / local_bucket_seqs unset → no switching
352+
}
353+
}
354+
}
355+
250356
private static Multiset<DistributedPlanWorker> computeInstanceNumPerWorker(
251357
List<PipelineDistributedPlan> distributedPlans) {
252358
Multiset<DistributedPlanWorker> workerCounter = LinkedHashMultiset.create();

gensrc/thrift/Descriptors.thrift

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,11 @@ struct TOlapTablePartition {
285285
// tablet_id -> list of backend_ids that have version gaps (lastFailedVersion >= 0)
286286
// used by BE to exclude these backends from success counting in majority write
287287
14: optional map<i64, list<i64>> tablet_version_gap_backends
288+
// FE-assigned BE ID for local bucket rotation in FIND_TABLET_RANDOM_BUCKET mode
289+
15: optional i64 bucket_be_id
290+
// FE-assigned local bucket indices for this BE; used in FIND_TABLET_RANDOM_BUCKET mode
291+
// to rotate only within the buckets owned by this BE (handles both single and multi-replica)
292+
16: optional list<i32> local_bucket_seqs
288293
}
289294

290295
struct TOlapTablePartitionParam {

0 commit comments

Comments
 (0)