Skip to content

Commit 8d2f063

Browse files
committed
introduce adaptive random bucket load routing
1 parent 98dec70 commit 8d2f063

35 files changed

Lines changed: 1290 additions & 61 deletions

be/src/cloud/cloud_delta_writer.cpp

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
#include "cloud/cloud_storage_engine.h"
2323
#include "cloud/config.h"
2424
#include "load/delta_writer/delta_writer.h"
25+
#include "load/memtable/memtable_memory_limiter.h"
26+
#include "runtime/exec_env.h"
2527
#include "runtime/thread_context.h"
2628

2729
namespace doris {
@@ -64,10 +66,18 @@ Status CloudDeltaWriter::batch_init(std::vector<CloudDeltaWriter*> writers) {
6466
return cloud::bthread_fork_join(tasks, 10);
6567
}
6668

67-
Status CloudDeltaWriter::write(const Block* block, const DorisVector<uint32_t>& row_idxs) {
69+
Status CloudDeltaWriter::write(const Block* block, const DorisVector<uint32_t>& row_idxs,
70+
bool* memtable_flushed) {
71+
if (memtable_flushed != nullptr) {
72+
*memtable_flushed = false;
73+
}
6874
if (row_idxs.empty()) [[unlikely]] {
6975
return Status::OK();
7076
}
77+
if (_req.enable_table_memtable_backpressure) {
78+
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_table_memtable_backpressure(
79+
nullptr, table_id());
80+
}
7181
std::lock_guard lock(_mtx);
7282
CHECK(_is_init || _is_cancelled);
7383
{
@@ -77,7 +87,7 @@ Status CloudDeltaWriter::write(const Block* block, const DorisVector<uint32_t>&
7787
std::this_thread::sleep_for(std::chrono::milliseconds(10));
7888
}
7989
}
80-
return _memtable_writer->write(block, row_idxs);
90+
return _memtable_writer->write(block, row_idxs, memtable_flushed);
8191
}
8292

8393
Status CloudDeltaWriter::close() {
@@ -86,6 +96,11 @@ Status CloudDeltaWriter::close() {
8696
return _memtable_writer->close();
8797
}
8898

99+
Status CloudDeltaWriter::flush_memtable_async() {
100+
std::lock_guard lock(_mtx);
101+
return BaseDeltaWriter::flush_memtable_async();
102+
}
103+
89104
Status CloudDeltaWriter::cancel_with_status(const Status& st) {
90105
std::lock_guard lock(_mtx);
91106
return BaseDeltaWriter::cancel_with_status(st);

be/src/cloud/cloud_delta_writer.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,13 @@ class CloudDeltaWriter final : public BaseDeltaWriter {
3333
const UniqueId& load_id);
3434
~CloudDeltaWriter() override;
3535

36-
Status write(const Block* block, const DorisVector<uint32_t>& row_idxs) override;
36+
Status write(const Block* block, const DorisVector<uint32_t>& row_idxs,
37+
bool* memtable_flushed = nullptr) override;
3738

3839
Status close() override;
3940

41+
Status flush_memtable_async() override;
42+
4043
Status cancel_with_status(const Status& st) override;
4144

4245
Status build_rowset() override;

be/src/cloud/cloud_tablets_channel.cpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,24 @@ Status CloudTabletsChannel::add_batch(const PTabletWriterAddBlockRequest& reques
6060
return Status::OK();
6161
}
6262

63+
if (request.is_receiver_side_random_bucket()) {
64+
std::unordered_map<int64_t, DorisVector<uint32_t>> partition_to_rowidxs;
65+
_build_partition_to_rowidxs_for_receiver_side_random_bucket(request, &partition_to_rowidxs);
66+
if (!partition_to_rowidxs.empty()) {
67+
std::unordered_set<int64_t> partition_ids;
68+
partition_ids.reserve(partition_to_rowidxs.size());
69+
for (const auto& [partition_id, _] : partition_to_rowidxs) {
70+
partition_ids.insert(partition_id);
71+
}
72+
{
73+
std::lock_guard<std::mutex> l(_tablet_writers_lock);
74+
RETURN_IF_ERROR(_init_writers_by_partition_ids(partition_ids));
75+
}
76+
}
77+
return _write_block_data_for_receiver_side_random_bucket(request, cur_seq,
78+
partition_to_rowidxs, response);
79+
}
80+
6381
std::unordered_map<int64_t, DorisVector<uint32_t>> tablet_to_rowidxs;
6482
_build_tablet_to_rowidxs(request, &tablet_to_rowidxs);
6583

be/src/common/config.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -726,6 +726,8 @@ DEFINE_mInt32(memory_gc_sleep_time_ms, "500");
726726
// max write buffer size before flush, default 200MB
727727
DEFINE_mInt64(write_buffer_size, "209715200");
728728
DEFINE_mBool(enable_adaptive_write_buffer_size, "true");
729+
// Whether random bucket load rotates to the next local bucket when memtable flushes.
730+
DEFINE_mBool(enable_adaptive_random_bucket_load_bucket_rotation, "true");
729731
// max buffer size used in memtable for the aggregated table, default 400MB
730732
DEFINE_mInt64(write_buffer_size_for_agg, "104857600");
731733
DEFINE_mInt64(min_write_buffer_size_for_partial_update, "1048576");
@@ -838,6 +840,11 @@ DEFINE_mDouble(min_flush_thread_num_per_cpu, "0.5");
838840
// Whether to enable adaptive flush thread adjustment
839841
DEFINE_mBool(enable_adaptive_flush_threads, "true");
840842

843+
// Whether to block writes when one table has too many pending flush memtables on this BE.
844+
DEFINE_mBool(enable_table_memtable_flush_backpressure, "true");
845+
// Max pending flush memtables for one table on this BE before blocking new writes.
846+
DEFINE_mInt32(table_memtable_flush_pending_count_limit, "10");
847+
841848
// config for tablet meta checkpoint
842849
DEFINE_mInt32(tablet_meta_checkpoint_min_new_rowsets_num, "10");
843850
DEFINE_mInt32(tablet_meta_checkpoint_min_interval_secs, "600");

be/src/common/config.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -785,6 +785,8 @@ DECLARE_mInt32(memory_gc_sleep_time_ms);
785785
// max write buffer size before flush, default 200MB
786786
DECLARE_mInt64(write_buffer_size);
787787
DECLARE_mBool(enable_adaptive_write_buffer_size);
788+
// Whether random bucket load rotates to the next local bucket when memtable flushes.
789+
DECLARE_mBool(enable_adaptive_random_bucket_load_bucket_rotation);
788790
// max buffer size used in memtable for the aggregated table, default 400MB
789791
DECLARE_mInt64(write_buffer_size_for_agg);
790792

@@ -894,6 +896,11 @@ DECLARE_mDouble(min_flush_thread_num_per_cpu);
894896
// Whether to enable adaptive flush thread adjustment
895897
DECLARE_mBool(enable_adaptive_flush_threads);
896898

899+
// Whether to block writes when one table has too many pending flush memtables on this BE.
900+
DECLARE_mBool(enable_table_memtable_flush_backpressure);
901+
// Max pending flush memtables for one table on this BE before blocking new writes.
902+
DECLARE_mInt32(table_memtable_flush_pending_count_limit);
903+
897904
// config for tablet meta checkpoint
898905
DECLARE_mInt32(tablet_meta_checkpoint_min_new_rowsets_num);
899906
DECLARE_mInt32(tablet_meta_checkpoint_min_interval_secs);

be/src/exec/sink/delta_writer_v2_pool.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
#include "exec/sink/delta_writer_v2_pool.h"
1919

20+
#include "exec/sink/vtablet_finder.h"
2021
#include "load/delta_writer/delta_writer_v2.h"
2122
#include "runtime/runtime_profile.h"
2223

be/src/exec/sink/vrow_distribution.cpp

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,9 @@ void VRowDistribution::_filter_block_by_skip(Block* block, RowPartTabletIds& row
289289
if (!_skip[i]) {
290290
row_ids.emplace_back(i);
291291
partition_ids.emplace_back(_partitions[i]->id);
292-
tablet_ids.emplace_back(_tablet_ids[i]);
292+
if (!_tablet_finder->is_adaptive_random_bucket()) {
293+
tablet_ids.emplace_back(_tablet_ids[i]);
294+
}
293295
}
294296
}
295297
}
@@ -312,7 +314,9 @@ Status VRowDistribution::_filter_block_by_skip_and_where_clause(
312314
if (nullable_column->get_bool_inline(i) && !_skip[i]) {
313315
row_ids.emplace_back(i);
314316
partition_ids.emplace_back(_partitions[i]->id);
315-
tablet_ids.emplace_back(_tablet_ids[i]);
317+
if (!_tablet_finder->is_adaptive_random_bucket()) {
318+
tablet_ids.emplace_back(_tablet_ids[i]);
319+
}
316320
}
317321
}
318322
} else if (const auto* const_column = check_and_get_column<ColumnConst>(*filter_column)) {
@@ -331,7 +335,9 @@ Status VRowDistribution::_filter_block_by_skip_and_where_clause(
331335
if (filter[i] != 0 && !_skip[i]) {
332336
row_ids.emplace_back(i);
333337
partition_ids.emplace_back(_partitions[i]->id);
334-
tablet_ids.emplace_back(_tablet_ids[i]);
338+
if (!_tablet_finder->is_adaptive_random_bucket()) {
339+
tablet_ids.emplace_back(_tablet_ids[i]);
340+
}
335341
}
336342
}
337343
}
@@ -342,7 +348,9 @@ Status VRowDistribution::_filter_block_by_skip_and_where_clause(
342348
Status VRowDistribution::_filter_block(Block* block,
343349
std::vector<RowPartTabletIds>& row_part_tablet_ids) {
344350
for (int i = 0; i < _schema->indexes().size(); i++) {
345-
_get_tablet_ids(block, i, _tablet_ids);
351+
if (!_tablet_finder->is_adaptive_random_bucket()) {
352+
_get_tablet_ids(block, i, _tablet_ids);
353+
}
346354
auto& where_clause = _schema->indexes()[i]->where_clause;
347355
if (where_clause != nullptr) {
348356
RETURN_IF_ERROR(_filter_block_by_skip_and_where_clause(block, where_clause,
@@ -524,7 +532,9 @@ void VRowDistribution::_reset_row_part_tablet_ids(
524532
// This is important for performance.
525533
row_ids.reserve(rows);
526534
partition_ids.reserve(rows);
527-
tablet_ids.reserve(rows);
535+
if (!_tablet_finder->is_adaptive_random_bucket()) {
536+
tablet_ids.reserve(rows);
537+
}
528538
}
529539
}
530540

be/src/exec/sink/vrow_distribution.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,12 @@ class RowPartTabletIds {
5757
std::string value;
5858
value.reserve(row_ids.size() * 15);
5959
for (int i = 0; i < row_ids.size(); i++) {
60-
value.append(fmt::format("[{}, {}, {}]", row_ids[i], partition_ids[i], tablet_ids[i]));
60+
if (i < tablet_ids.size()) {
61+
value.append(
62+
fmt::format("[{}, {}, {}]", row_ids[i], partition_ids[i], tablet_ids[i]));
63+
} else {
64+
value.append(fmt::format("[{}, {}]", row_ids[i], partition_ids[i]));
65+
}
6166
}
6267
return value;
6368
}

be/src/exec/sink/vtablet_finder.cpp

Lines changed: 77 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,89 @@
2222
#include <gen_cpp/FrontendService_types.h>
2323
#include <glog/logging.h>
2424

25+
#include <algorithm>
2526
#include <string>
2627
#include <utility>
2728

2829
#include "common/compiler_util.h" // IWYU pragma: keep
30+
#include "common/config.h"
2931
#include "common/status.h"
3032
#include "core/block/block.h"
3133
#include "runtime/runtime_state.h"
3234
#include "storage/tablet_info.h"
3335

3436
namespace doris {
37+
38+
void AdaptiveRandomBucketState::init_partition(int64_t partition_id,
39+
const std::vector<int64_t>& tablets,
40+
const std::vector<int32_t>& bucket_seqs,
41+
int32_t start_tablet_idx) {
42+
if (partition_id < 0 || tablets.empty()) {
43+
return;
44+
}
45+
std::lock_guard<std::mutex> lock(_mutex);
46+
if (_partition_states.contains(partition_id)) {
47+
return;
48+
}
49+
50+
PartitionState state;
51+
state.partition_id = partition_id;
52+
state.tablets = tablets;
53+
state.bucket_seqs = bucket_seqs;
54+
if (start_tablet_idx >= 0 && start_tablet_idx < state.tablets.size()) {
55+
state.tablet_pos = start_tablet_idx;
56+
}
57+
state.current_tablet_id = state.tablets[state.tablet_pos];
58+
59+
for (int32_t tablet_pos = 0; tablet_pos < state.tablets.size(); ++tablet_pos) {
60+
_tablet_to_partition[state.tablets[tablet_pos]] = partition_id;
61+
_tablet_to_bucket[state.tablets[tablet_pos]] = tablet_pos;
62+
}
63+
_partition_states.emplace(partition_id, std::move(state));
64+
LOG(INFO) << "FIND_TABLET_RANDOM_BUCKET: load_id=" << _load_id << ", partition=" << partition_id
65+
<< ", local tablet count=" << tablets.size()
66+
<< ", start tablet=" << _partition_states.at(partition_id).current_tablet_id;
67+
}
68+
69+
int64_t AdaptiveRandomBucketState::current_tablet(int64_t partition_id) {
70+
std::lock_guard<std::mutex> lock(_mutex);
71+
auto it = _partition_states.find(partition_id);
72+
if (it == _partition_states.end()) {
73+
return -1;
74+
}
75+
return it->second.current_tablet_id;
76+
}
77+
78+
void AdaptiveRandomBucketState::rotate_by_tablet(int64_t tablet_id) {
79+
if (!config::enable_adaptive_random_bucket_load_bucket_rotation) {
80+
return;
81+
}
82+
std::lock_guard<std::mutex> lock(_mutex);
83+
auto partition_it = _tablet_to_partition.find(tablet_id);
84+
if (partition_it == _tablet_to_partition.end()) {
85+
return;
86+
}
87+
auto state_it = _partition_states.find(partition_it->second);
88+
if (state_it == _partition_states.end()) {
89+
return;
90+
}
91+
auto bucket_it = _tablet_to_bucket.find(tablet_id);
92+
if (bucket_it == _tablet_to_bucket.end()) {
93+
return;
94+
}
95+
auto& state = state_it->second;
96+
if (bucket_it->second != state.tablet_pos) {
97+
return;
98+
}
99+
int32_t next_pos = (state.tablet_pos + 1) % static_cast<int32_t>(state.tablets.size());
100+
LOG(INFO) << "FIND_TABLET_RANDOM_BUCKET: load_id=" << _load_id
101+
<< ", partition=" << state.partition_id << " shared rotate tablet "
102+
<< state.current_tablet_id << " -> " << state.tablets[next_pos]
103+
<< " after tablet=" << tablet_id << " memtable flushed";
104+
state.tablet_pos = next_pos;
105+
state.current_tablet_id = state.tablets[next_pos];
106+
}
107+
35108
Status OlapTabletFinder::find_tablets(RuntimeState* state, Block* block, int rows,
36109
std::vector<VOlapTablePartition*>& partitions,
37110
std::vector<uint32_t>& tablet_index, std::vector<bool>& skip,
@@ -82,8 +155,11 @@ Status OlapTabletFinder::find_tablets(RuntimeState* state, Block* block, int row
82155

83156
if (_find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_ROW) {
84157
_vpartition->find_tablets(block, qualified_rows, partitions, tablet_index);
158+
} else if (_find_tablet_mode == FindTabletMode::FIND_TABLET_RANDOM_BUCKET) {
159+
// Receiver-side random bucket mode only needs partition ids on sender side.
160+
// The receiver decides the concrete tablet from its local ordered tablet list.
85161
} else {
86-
// for random distribution
162+
// FIND_TABLET_EVERY_BATCH / FIND_TABLET_EVERY_SINK
87163
_vpartition->find_tablets(block, qualified_rows, partitions, tablet_index,
88164
&_partition_to_tablet_map);
89165
if (_find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_BATCH) {

be/src/exec/sink/vtablet_finder.h

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,45 @@
1919

2020
#include <cstdint>
2121
#include <map>
22+
#include <memory>
23+
#include <mutex>
24+
#include <unordered_map>
25+
#include <vector>
2226

2327
#include "common/status.h"
2428
#include "core/block/block.h"
2529
#include "exec/common/hash_table/phmap_fwd_decl.h"
2630
#include "storage/tablet_info.h"
2731
#include "util/bitmap.h"
32+
#include "util/uid_util.h"
2833

2934
namespace doris {
3035

36+
class AdaptiveRandomBucketState {
37+
public:
38+
explicit AdaptiveRandomBucketState(UniqueId load_id) : _load_id(load_id) {}
39+
40+
void init_partition(int64_t partition_id, const std::vector<int64_t>& tablets,
41+
const std::vector<int32_t>& bucket_seqs, int32_t start_tablet_idx);
42+
int64_t current_tablet(int64_t partition_id);
43+
void rotate_by_tablet(int64_t tablet_id);
44+
45+
private:
46+
struct PartitionState {
47+
int64_t partition_id = -1;
48+
std::vector<int64_t> tablets;
49+
std::vector<int32_t> bucket_seqs;
50+
int32_t tablet_pos = 0;
51+
int64_t current_tablet_id = -1;
52+
};
53+
54+
std::mutex _mutex;
55+
UniqueId _load_id;
56+
std::unordered_map<int64_t, PartitionState> _partition_states;
57+
std::unordered_map<int64_t, int64_t> _tablet_to_partition;
58+
std::unordered_map<int64_t, int32_t> _tablet_to_bucket;
59+
};
60+
3161
class OlapTabletFinder {
3262
public:
3363
// FIND_TABLET_EVERY_ROW is used for hash distribution info, which indicates that we
@@ -37,7 +67,13 @@ class OlapTabletFinder {
3767
// FIND_TABLET_EVERY_SINK is used for random distribution info when load_to_single_tablet set to true,
3868
// which indicates that we should only compute tablet index in the corresponding partition once for the
3969
// whole time in olap table sink
40-
enum FindTabletMode { FIND_TABLET_EVERY_ROW, FIND_TABLET_EVERY_BATCH, FIND_TABLET_EVERY_SINK };
70+
// FIND_TABLET_RANDOM_BUCKET is used for V1 receiver-side random bucket mode.
71+
enum FindTabletMode {
72+
FIND_TABLET_EVERY_ROW,
73+
FIND_TABLET_EVERY_BATCH,
74+
FIND_TABLET_EVERY_SINK,
75+
FIND_TABLET_RANDOM_BUCKET
76+
};
4177

4278
OlapTabletFinder(VOlapTablePartitionParam* vpartition, FindTabletMode mode)
4379
: _vpartition(vpartition), _find_tablet_mode(mode), _filter_bitmap(1024) {};
@@ -51,7 +87,14 @@ class OlapTabletFinder {
5187
return _find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_SINK;
5288
}
5389

54-
bool is_single_tablet() { return _partition_to_tablet_map.size() == 1; }
90+
bool is_adaptive_random_bucket() const {
91+
return _find_tablet_mode == FindTabletMode::FIND_TABLET_RANDOM_BUCKET;
92+
}
93+
94+
bool is_single_tablet() {
95+
return _find_tablet_mode != FindTabletMode::FIND_TABLET_RANDOM_BUCKET &&
96+
_partition_to_tablet_map.size() == 1;
97+
}
5598

5699
// all partitions for multi find-processes of its relative writer.
57100
const flat_hash_set<int64_t>& partition_ids() { return _partition_ids; }

0 commit comments

Comments
 (0)