Skip to content

Commit d609f36

Browse files
committed
adaptive random bucket load: per-BE bucket assignment with threshold-based rotation
1 parent c494892 commit d609f36

9 files changed

Lines changed: 282 additions & 11 deletions

File tree

be/src/exec/sink/vtablet_finder.cpp

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <gen_cpp/FrontendService_types.h>
2323
#include <glog/logging.h>
2424

25+
#include <algorithm>
2526
#include <string>
2627
#include <utility>
2728

@@ -82,8 +83,69 @@ Status OlapTabletFinder::find_tablets(RuntimeState* state, Block* block, int row
8283

8384
if (_find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_ROW) {
8485
_vpartition->find_tablets(block, qualified_rows, partitions, tablet_index);
86+
} else if (_find_tablet_mode == FindTabletMode::FIND_TABLET_RANDOM_BUCKET) {
87+
// Use per-batch caching: all rows in the same partition go to the same tablet this batch.
88+
_vpartition->find_tablets(block, qualified_rows, partitions, tablet_index,
89+
&_partition_to_tablet_map);
90+
91+
// Count actual rows per partition so we don't credit each partition with the
92+
// full block size when a block spans multiple partitions.
93+
flat_hash_map<VOlapTablePartition*, int64_t> partition_row_counts;
94+
for (uint32_t qi : qualified_rows) {
95+
partition_row_counts[partitions[qi]]++;
96+
}
97+
98+
int64_t bytes_per_row = rows > 0 ? block->bytes() / rows : 0;
99+
for (auto& [partition, _] : _partition_to_tablet_map) {
100+
if (partition->indexes.empty()) {
101+
continue;
102+
}
103+
const auto& seqs = partition->local_bucket_seqs;
104+
105+
// Maintain _partition_seq_pos so rotation is O(1) after the first batch.
106+
// On first encounter, find the current bucket's position in seqs (one-time O(n)).
107+
// On subsequent batches we use the cached position directly.
108+
int32_t& seq_pos = _partition_seq_pos[partition]; // default-initialised to 0
109+
if (seq_pos == 0 && !seqs.empty()) {
110+
// Sync seq_pos with the FE-assigned starting bucket. If load_tablet_idx happens
111+
// to already be seqs[0] this is a no-op (seq_pos stays 0), otherwise we find
112+
// the correct starting position once.
113+
int32_t cur_bucket_val =
114+
static_cast<int32_t>(partition->load_tablet_idx % partition->num_buckets);
115+
if (seqs[0] != cur_bucket_val) {
116+
auto it = std::find(seqs.begin(), seqs.end(), cur_bucket_val);
117+
if (it != seqs.end()) {
118+
seq_pos = static_cast<int32_t>(it - seqs.begin());
119+
}
120+
}
121+
}
122+
123+
int64_t cur_bucket = seqs.empty()
124+
? partition->load_tablet_idx % partition->num_buckets
125+
: seqs[seq_pos];
126+
int64_t cur_tablet_id = partition->indexes[0].tablets[cur_bucket];
127+
128+
// Credit only the rows that actually went to this partition.
129+
int64_t part_rows = 0;
130+
if (auto it = partition_row_counts.find(partition); it != partition_row_counts.end()) {
131+
part_rows = it->second;
132+
}
133+
_tablet_written_bytes[cur_tablet_id] += bytes_per_row * part_rows;
134+
135+
// Switch to the next local bucket when the threshold is reached.
136+
// local_bucket_seqs carries the exact set of buckets assigned to this BE by the FE.
137+
if (!seqs.empty() &&
138+
_tablet_written_bytes[cur_tablet_id] >= BUCKET_SWITCH_THRESHOLD_BYTES) {
139+
seq_pos = (seq_pos + 1) % static_cast<int32_t>(seqs.size());
140+
partition->load_tablet_idx = seqs[seq_pos];
141+
VLOG(2) << "FIND_TABLET_RANDOM_BUCKET: partition=" << partition->id
142+
<< " rotate bucket " << cur_bucket << " -> " << seqs[seq_pos]
143+
<< " after " << _tablet_written_bytes[cur_tablet_id] << " bytes";
144+
}
145+
}
146+
_partition_to_tablet_map.clear();
85147
} else {
86-
// for random distribution
148+
// FIND_TABLET_EVERY_BATCH / FIND_TABLET_EVERY_SINK
87149
_vpartition->find_tablets(block, qualified_rows, partitions, tablet_index,
88150
&_partition_to_tablet_map);
89151
if (_find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_BATCH) {

be/src/exec/sink/vtablet_finder.h

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,14 @@ class OlapTabletFinder {
3737
// FIND_TABLET_EVERY_SINK is used for random distribution info when load_to_single_tablet set to true,
3838
// which indicates that we should only compute tablet index in the corresponding partition once for the
3939
// whole time in olap table sink
40-
enum FindTabletMode { FIND_TABLET_EVERY_ROW, FIND_TABLET_EVERY_BATCH, FIND_TABLET_EVERY_SINK };
40+
// FIND_TABLET_RANDOM_BUCKET is used for random distribution info where FE assigns a starting
41+
// bucket per BE; BE rotates within its local buckets once per-tablet bytes exceed a threshold
42+
enum FindTabletMode {
43+
FIND_TABLET_EVERY_ROW,
44+
FIND_TABLET_EVERY_BATCH,
45+
FIND_TABLET_EVERY_SINK,
46+
FIND_TABLET_RANDOM_BUCKET
47+
};
4148

4249
OlapTabletFinder(VOlapTablePartitionParam* vpartition, FindTabletMode mode)
4350
: _vpartition(vpartition), _find_tablet_mode(mode), _filter_bitmap(1024) {};
@@ -64,11 +71,19 @@ class OlapTabletFinder {
6471

6572
Bitmap& filter_bitmap() { return _filter_bitmap; }
6673

74+
// Threshold for switching to the next local bucket in FIND_TABLET_RANDOM_BUCKET mode.
75+
static constexpr int64_t BUCKET_SWITCH_THRESHOLD_BYTES = 200LL * 1024 * 1024;
76+
6777
private:
6878
VOlapTablePartitionParam* _vpartition = nullptr;
6979
FindTabletMode _find_tablet_mode;
7080
std::map<VOlapTablePartition*, int64_t> _partition_to_tablet_map;
7181
flat_hash_set<int64_t> _partition_ids;
82+
// tablet_id -> cumulative bytes written; used to decide when to rotate to the next local bucket
83+
flat_hash_map<int64_t, int64_t> _tablet_written_bytes;
84+
// partition -> current index into local_bucket_seqs; initialized lazily on first batch.
85+
// Storing the position (not the bucket value) makes rotation O(1) after the first lookup.
86+
flat_hash_map<VOlapTablePartition*, int32_t> _partition_seq_pos;
7287

7388
int64_t _num_filtered_rows = 0;
7489
int64_t _num_immutable_partition_filtered_rows = 0;

be/src/exec/sink/writer/vtablet_writer.cpp

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1538,12 +1538,25 @@ Status VTabletWriter::_init(RuntimeState* state, RuntimeProfile* profile) {
15381538
if (table_sink.__isset.send_batch_parallelism && table_sink.send_batch_parallelism > 1) {
15391539
_send_batch_parallelism = table_sink.send_batch_parallelism;
15401540
}
1541-
// if distributed column list is empty, we can ensure that tablet is with random distribution info
1542-
// and if load_to_single_tablet is set and set to true, we should find only one tablet in one partition
1543-
// for the whole olap table sink
1541+
// If distributed column list is empty, the table uses random distribution.
1542+
// Mode priority (highest to lowest):
1543+
// 1. FIND_TABLET_RANDOM_BUCKET: FE set local_bucket_seqs on at least one partition,
1544+
// meaning enable_adaptive_random_bucket_load is ON. This overrides load_to_single_tablet
1545+
// because the local-bucket strategy already constrains writes to a small set of tablets.
1546+
// 2. FIND_TABLET_EVERY_SINK: load_to_single_tablet=true (legacy single-tablet mode).
1547+
// 3. FIND_TABLET_EVERY_BATCH: default round-robin per batch.
15441548
auto find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_ROW;
15451549
if (table_sink.partition.distributed_columns.empty()) {
1546-
if (table_sink.__isset.load_to_single_tablet && table_sink.load_to_single_tablet) {
1550+
bool use_adaptive_random_bucket = false;
1551+
for (const auto& part : table_sink.partition.partitions) {
1552+
if (part.__isset.local_bucket_seqs && !part.local_bucket_seqs.empty()) {
1553+
use_adaptive_random_bucket = true;
1554+
break;
1555+
}
1556+
}
1557+
if (use_adaptive_random_bucket) {
1558+
find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_RANDOM_BUCKET;
1559+
} else if (table_sink.__isset.load_to_single_tablet && table_sink.load_to_single_tablet) {
15471560
find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_SINK;
15481561
} else {
15491562
find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_BATCH;

be/src/exec/sink/writer/vtablet_writer_v2.cpp

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -158,12 +158,25 @@ Status VTabletWriterV2::_init(RuntimeState* state, RuntimeProfile* profile) {
158158
_location = _pool->add(new OlapTableLocationParam(table_sink.location));
159159
_nodes_info = _pool->add(new DorisNodesInfo(table_sink.nodes_info));
160160

161-
// if distributed column list is empty, we can ensure that tablet is with random distribution info
162-
// and if load_to_single_tablet is set and set to true, we should find only one tablet in one partition
163-
// for the whole olap table sink
161+
// If distributed column list is empty, the table uses random distribution.
162+
// Mode priority (highest to lowest):
163+
// 1. FIND_TABLET_RANDOM_BUCKET: FE set local_bucket_seqs on at least one partition,
164+
// meaning enable_adaptive_random_bucket_load is ON. This overrides load_to_single_tablet
165+
// because the local-bucket strategy already constrains writes to a small set of tablets.
166+
// 2. FIND_TABLET_EVERY_SINK: load_to_single_tablet=true (legacy single-tablet mode).
167+
// 3. FIND_TABLET_EVERY_BATCH: default round-robin per batch.
164168
auto find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_ROW;
165169
if (table_sink.partition.distributed_columns.empty()) {
166-
if (table_sink.__isset.load_to_single_tablet && table_sink.load_to_single_tablet) {
170+
bool use_adaptive_random_bucket = false;
171+
for (const auto& part : table_sink.partition.partitions) {
172+
if (part.__isset.local_bucket_seqs && !part.local_bucket_seqs.empty()) {
173+
use_adaptive_random_bucket = true;
174+
break;
175+
}
176+
}
177+
if (use_adaptive_random_bucket) {
178+
find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_RANDOM_BUCKET;
179+
} else if (table_sink.__isset.load_to_single_tablet && table_sink.load_to_single_tablet) {
167180
find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_SINK;
168181
} else {
169182
find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_BATCH;

be/src/storage/tablet_info.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -700,10 +700,15 @@ Status VOlapTablePartitionParam::generate_partition_from(const TOlapTablePartiti
700700
part_result = _obj_pool.add(new VOlapTablePartition(&_partition_block));
701701
part_result->id = t_part.id;
702702
part_result->is_mutable = t_part.is_mutable;
703-
// only load_to_single_tablet = true will set load_tablet_idx
704703
if (t_part.__isset.load_tablet_idx) {
705704
part_result->load_tablet_idx = t_part.load_tablet_idx;
706705
}
706+
if (t_part.__isset.bucket_be_id) {
707+
part_result->bucket_be_id = t_part.bucket_be_id;
708+
}
709+
if (t_part.__isset.local_bucket_seqs) {
710+
part_result->local_bucket_seqs = t_part.local_bucket_seqs;
711+
}
707712

708713
if (_is_in_partition) {
709714
for (const auto& keys : t_part.in_keys) {

be/src/storage/tablet_info.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,11 @@ struct VOlapTablePartition {
164164
bool is_mutable;
165165
// -1 indicates partition with hash distribution
166166
int64_t load_tablet_idx = -1;
167+
// BE ID assigned by FE for local bucket rotation; -1 means not set
168+
int64_t bucket_be_id = -1;
169+
// Bucket indices (0-based) that FE assigned to this BE for rotation.
170+
// Empty means not set (fallback to full round-robin).
171+
std::vector<int32_t> local_bucket_seqs;
167172
int total_replica_num = 0;
168173
int load_required_replica_num = 0;
169174
// tablet_id -> set of backend_ids that have version gaps

fe/fe-common/src/main/java/org/apache/doris/common/Config.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3610,4 +3610,15 @@ public static int metaServiceRpcRetryTimes() {
36103610
+ "obtaining partition version information when calculating the delete bitmap. Enabled "
36113611
+ "by default."})
36123612
public static boolean calc_delete_bitmap_get_versions_waiting_for_pending_txns = true;
3613+
3614+
@ConfField(mutable = true, masterOnly = true, description = {
3615+
"Whether to enable adaptive random bucket load. When enabled, the FE assigns each BE only the "
3616+
+ "buckets whose primary replica it hosts (local_bucket_seqs), and the BE rotates across "
3617+
+ "those buckets once per-tablet write volume exceeds the threshold (default 200 MB). "
3618+
+ "This reduces import memory pressure and improves throughput for random-distribution "
3619+
+ "tables. When enabled, load_to_single_tablet is also subsumed by this strategy.",
3620+
"是否启用自适应随机桶导入。开启后 FE 为每个 BE 仅分配其持有主副本的桶(local_bucket_seqs),"
3621+
+ "BE 在单个 tablet 写入量超过阈值(默认 200 MB)后在本地桶之间轮转。"
3622+
+ "可降低导入内存压力并提升随机分桶表的吞吐量。开启后 load_to_single_tablet 也走新逻辑。"})
3623+
public static boolean enable_adaptive_random_bucket_load = true;
36133624
}

0 commit comments

Comments
 (0)