Skip to content

Commit b31f64e

Browse files
committed
test
1 parent 7b51069 commit b31f64e

4 files changed

Lines changed: 61 additions & 6 deletions

File tree

be/src/exec/sink/vtablet_finder.cpp

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,10 @@ void AdaptiveRandomBucketState::init_partition(VOlapTablePartition* part) {
8080

8181
PartitionState state;
8282
state.partition_id = part->id;
83-
state.tablets.assign(part->indexes[0].tablets.begin(), part->indexes[0].tablets.end());
83+
state.index_tablets.reserve(part->indexes.size());
84+
for (const auto& index : part->indexes) {
85+
state.index_tablets.emplace_back(index.tablets.begin(), index.tablets.end());
86+
}
8487
state.local_bucket_seqs = part->local_bucket_seqs;
8588

8689
auto it = std::find(state.local_bucket_seqs.begin(), state.local_bucket_seqs.end(),
@@ -89,12 +92,13 @@ void AdaptiveRandomBucketState::init_partition(VOlapTablePartition* part) {
8992
state.seq_pos = static_cast<int32_t>(it - state.local_bucket_seqs.begin());
9093
}
9194
int32_t current_bucket = state.local_bucket_seqs[state.seq_pos];
92-
state.current_tablet_id = state.tablets[current_bucket];
9395

94-
for (const auto& index : part->indexes) {
96+
for (int32_t index_idx = 0; index_idx < part->indexes.size(); ++index_idx) {
97+
const auto& index = part->indexes[index_idx];
9598
for (int32_t bucket = 0; bucket < index.tablets.size(); ++bucket) {
9699
_tablet_to_partition[index.tablets[bucket]] = part->id;
97100
_tablet_to_bucket[index.tablets[bucket]] = bucket;
101+
_tablet_to_index[index.tablets[bucket]] = index_idx;
98102
}
99103
}
100104
_partition_states.emplace(part->id, std::move(state));
@@ -113,6 +117,25 @@ int64_t AdaptiveRandomBucketState::current_bucket(int64_t partition_id) {
113117
return it->second.local_bucket_seqs[it->second.seq_pos];
114118
}
115119

120+
int64_t AdaptiveRandomBucketState::current_tablet_by_tablet(int64_t tablet_id) {
121+
std::lock_guard<std::mutex> lock(_mutex);
122+
auto partition_it = _tablet_to_partition.find(tablet_id);
123+
if (partition_it == _tablet_to_partition.end()) {
124+
return tablet_id;
125+
}
126+
auto state_it = _partition_states.find(partition_it->second);
127+
if (state_it == _partition_states.end()) {
128+
return tablet_id;
129+
}
130+
auto index_it = _tablet_to_index.find(tablet_id);
131+
if (index_it == _tablet_to_index.end()) {
132+
return tablet_id;
133+
}
134+
auto& state = state_it->second;
135+
int32_t current_bucket = state.local_bucket_seqs[state.seq_pos];
136+
return state.index_tablets[index_it->second][current_bucket];
137+
}
138+
116139
void AdaptiveRandomBucketState::rotate_by_tablet(int64_t tablet_id) {
117140
if (!config::enable_adaptive_random_bucket_load_bucket_rotation) {
118141
return;
@@ -142,7 +165,6 @@ void AdaptiveRandomBucketState::rotate_by_tablet(int64_t tablet_id) {
142165
<< " shared rotate bucket " << current_bucket << " -> " << next_bucket
143166
<< " after tablet=" << tablet_id << " memtable flushed";
144167
state.seq_pos = next_pos;
145-
state.current_tablet_id = state.tablets[next_bucket];
146168
}
147169

148170
// Computes local_bucket_seqs for a random-distribution partition and updates

be/src/exec/sink/vtablet_finder.h

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,22 +47,23 @@ class AdaptiveRandomBucketState {
4747

4848
void init_partition(VOlapTablePartition* part);
4949
int64_t current_bucket(int64_t partition_id);
50+
int64_t current_tablet_by_tablet(int64_t tablet_id);
5051
void rotate_by_tablet(int64_t tablet_id);
5152

5253
private:
5354
struct PartitionState {
5455
int64_t partition_id = -1;
55-
std::vector<int64_t> tablets;
56+
std::vector<std::vector<int64_t>> index_tablets;
5657
std::vector<int32_t> local_bucket_seqs;
5758
int32_t seq_pos = 0;
58-
int64_t current_tablet_id = -1;
5959
};
6060

6161
std::mutex _mutex;
6262
UniqueId _load_id;
6363
std::unordered_map<int64_t, PartitionState> _partition_states;
6464
std::unordered_map<int64_t, int64_t> _tablet_to_partition;
6565
std::unordered_map<int64_t, int32_t> _tablet_to_bucket;
66+
std::unordered_map<int64_t, int32_t> _tablet_to_index;
6667
};
6768

6869
class AdaptiveRandomBucketStatePool {
@@ -113,6 +114,13 @@ class OlapTabletFinder {
113114
}
114115
}
115116

117+
int64_t current_adaptive_random_bucket_tablet(int64_t tablet_id) {
118+
if (_adaptive_random_bucket_state == nullptr) {
119+
return tablet_id;
120+
}
121+
return _adaptive_random_bucket_state->current_tablet_by_tablet(tablet_id);
122+
}
123+
116124
Status find_tablets(RuntimeState* state, Block* block, int rows,
117125
std::vector<VOlapTablePartition*>& partitions,
118126
std::vector<uint32_t>& tablet_index, std::vector<bool>& skip,

be/src/exec/sink/writer/vtablet_writer.cpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -816,6 +816,28 @@ Status VNodeChannel::add_block(Block* block, const Payload* payload) {
816816
return Status::OK();
817817
}
818818

819+
void VNodeChannel::rewrite_tablet_ids_by_adaptive_random_bucket(
820+
PTabletWriterAddBlockRequest* request) {
821+
if (!_parent->_tablet_finder->is_adaptive_random_bucket() || request->eos()) {
822+
return;
823+
}
824+
int64_t rewrite_count = 0;
825+
for (int i = 0; i < request->tablet_ids_size(); ++i) {
826+
int64_t old_tablet_id = request->tablet_ids(i);
827+
int64_t current_tablet_id =
828+
_parent->_tablet_finder->current_adaptive_random_bucket_tablet(old_tablet_id);
829+
if (current_tablet_id != old_tablet_id) {
830+
request->set_tablet_ids(i, current_tablet_id);
831+
rewrite_count++;
832+
}
833+
}
834+
if (rewrite_count > 0) {
835+
LOG(INFO) << "FIND_TABLET_RANDOM_BUCKET: load_id=" << print_id(_parent->_load_id)
836+
<< ", node_id=" << _node_id << " rewrite pending tablet ids, rows="
837+
<< request->tablet_ids_size() << ", rewrite_count=" << rewrite_count;
838+
}
839+
}
840+
819841
static void injection_full_gc_fn() {
820842
MemoryReclamation::revoke_process_memory("injection_full_gc_fn");
821843
}
@@ -931,6 +953,7 @@ void VNodeChannel::try_send_pending_block(RuntimeState* state) {
931953
auto request = std::move(send_block.second); // doesn't need to be saved in heap
932954

933955
// tablet_ids has already set when add row
956+
rewrite_tablet_ids_by_adaptive_random_bucket(request.get());
934957
request->set_packet_seq(_next_packet_seq);
935958
auto block = mutable_block->to_block();
936959
CHECK(block.rows() == request->tablet_ids_size())

be/src/exec/sink/writer/vtablet_writer.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,8 @@ class VNodeChannel {
266266
// when there's pending block found by try_send_and_fetch_status(), we will awake a thread to send it.
267267
void try_send_pending_block(RuntimeState* state);
268268

269+
void rewrite_tablet_ids_by_adaptive_random_bucket(PTabletWriterAddBlockRequest* request);
270+
269271
void clear_all_blocks();
270272

271273
// two ways to stop channel:

0 commit comments

Comments
 (0)