Skip to content

Commit 6c665ac

Browse files
committed
test
1 parent 69ab80b commit 6c665ac

4 files changed

Lines changed: 16 additions & 3 deletions

File tree

be/src/common/config.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -726,6 +726,8 @@ DEFINE_mInt32(memory_gc_sleep_time_ms, "500");
726726
// max write buffer size before flush, default 200MB
727727
DEFINE_mInt64(write_buffer_size, "209715200");
728728
DEFINE_mBool(enable_adaptive_write_buffer_size, "true");
729+
// Whether random bucket load rotates to the next local bucket when memtable flushes.
730+
DEFINE_mBool(enable_adaptive_random_bucket_load_bucket_rotation, "true");
729731
// max buffer size used in memtable for the aggregated table, default 400MB
730732
DEFINE_mInt64(write_buffer_size_for_agg, "104857600");
731733
DEFINE_mInt64(min_write_buffer_size_for_partial_update, "1048576");

be/src/common/config.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -785,6 +785,8 @@ DECLARE_mInt32(memory_gc_sleep_time_ms);
785785
// max write buffer size before flush, default 200MB
786786
DECLARE_mInt64(write_buffer_size);
787787
DECLARE_mBool(enable_adaptive_write_buffer_size);
788+
// Whether random bucket load rotates to the next local bucket when memtable flushes.
789+
DECLARE_mBool(enable_adaptive_random_bucket_load_bucket_rotation);
788790
// max buffer size used in memtable for the aggregated table, default 400MB
789791
DECLARE_mInt64(write_buffer_size_for_agg);
790792

be/src/exec/sink/vtablet_finder.cpp

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include <utility>
2828

2929
#include "common/compiler_util.h" // IWYU pragma: keep
30+
#include "common/config.h"
3031
#include "common/status.h"
3132
#include "core/block/block.h"
3233
#include "runtime/runtime_state.h"
@@ -57,7 +58,7 @@ std::shared_ptr<AdaptiveRandomBucketState> AdaptiveRandomBucketStatePool::get_or
5758
if (auto state = _pool[load_id].lock()) {
5859
return state;
5960
}
60-
auto state = std::make_shared<AdaptiveRandomBucketState>();
61+
auto state = std::make_shared<AdaptiveRandomBucketState>(load_id);
6162
_pool[load_id] = state;
6263
return state;
6364
}
@@ -97,7 +98,8 @@ void AdaptiveRandomBucketState::init_partition(VOlapTablePartition* part) {
9798
}
9899
}
99100
_partition_states.emplace(part->id, std::move(state));
100-
LOG(INFO) << "FIND_TABLET_RANDOM_BUCKET: partition=" << part->id
101+
LOG(INFO) << "FIND_TABLET_RANDOM_BUCKET: load_id=" << _load_id
102+
<< ", partition=" << part->id
101103
<< " shared start bucket=" << current_bucket
102104
<< ", local_bucket_seqs=" << format_bucket_seqs(part->local_bucket_seqs);
103105
}
@@ -112,6 +114,9 @@ int64_t AdaptiveRandomBucketState::current_bucket(int64_t partition_id) {
112114
}
113115

114116
void AdaptiveRandomBucketState::rotate_by_tablet(int64_t tablet_id) {
117+
if (!config::enable_adaptive_random_bucket_load_bucket_rotation) {
118+
return;
119+
}
115120
std::lock_guard<std::mutex> lock(_mutex);
116121
auto partition_it = _tablet_to_partition.find(tablet_id);
117122
if (partition_it == _tablet_to_partition.end()) {
@@ -132,7 +137,8 @@ void AdaptiveRandomBucketState::rotate_by_tablet(int64_t tablet_id) {
132137
}
133138
int32_t next_pos = (state.seq_pos + 1) % static_cast<int32_t>(state.local_bucket_seqs.size());
134139
int32_t next_bucket = state.local_bucket_seqs[next_pos];
135-
LOG(INFO) << "FIND_TABLET_RANDOM_BUCKET: partition=" << state.partition_id
140+
LOG(INFO) << "FIND_TABLET_RANDOM_BUCKET: load_id=" << _load_id
141+
<< ", partition=" << state.partition_id
136142
<< " shared rotate bucket " << current_bucket << " -> " << next_bucket
137143
<< " after tablet=" << tablet_id << " memtable flushed";
138144
state.seq_pos = next_pos;

be/src/exec/sink/vtablet_finder.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ void compute_local_bucket_seqs(VOlapTablePartition* part, int64_t be_id,
4343

4444
class AdaptiveRandomBucketState {
4545
public:
46+
explicit AdaptiveRandomBucketState(UniqueId load_id) : _load_id(load_id) {}
47+
4648
void init_partition(VOlapTablePartition* part);
4749
int64_t current_bucket(int64_t partition_id);
4850
void rotate_by_tablet(int64_t tablet_id);
@@ -57,6 +59,7 @@ class AdaptiveRandomBucketState {
5759
};
5860

5961
std::mutex _mutex;
62+
UniqueId _load_id;
6063
std::unordered_map<int64_t, PartitionState> _partition_states;
6164
std::unordered_map<int64_t, int64_t> _tablet_to_partition;
6265
std::unordered_map<int64_t, int32_t> _tablet_to_bucket;

0 commit comments

Comments
 (0)