Skip to content

Commit 4311778

Browse files
committed
1
1 parent 73890b1 commit 4311778

File tree

9 files changed

+426
-0
lines changed

9 files changed

+426
-0
lines changed

cloud/src/common/config.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,12 @@ CONF_mBool(enable_mvcc_meta_check, "false");
158158

159159
CONF_mInt64(mow_job_key_check_expiration_diff_seconds, "600"); // 10min
160160

161+
// KV cache config
162+
CONF_mBool(enable_tablet_index_cache, "true");
163+
CONF_mInt64(ms_tablet_index_cache_capacity, "10000");
164+
CONF_mInt64(recycler_tablet_index_cache_capacity, "10000");
165+
CONF_mInt64(tablet_index_cache_ttl_seconds, "0"); // 0 means no TTL
166+
161167
CONF_String(test_s3_ak, "");
162168
CONF_String(test_s3_sk, "");
163169
CONF_String(test_s3_endpoint, "");

cloud/src/common/kv_cache.h

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#pragma once
19+
20+
#include <array>
21+
#include <chrono>
22+
#include <list>
23+
#include <memory>
24+
#include <mutex>
25+
#include <tuple>
26+
#include <unordered_map>
27+
28+
namespace doris::cloud {
29+
30+
// Sharded LRU Cache to reduce lock contention
31+
// KeyTuple: std::tuple type, corresponding to BasicKeyInfo::base_type in keys.h
32+
// ValuePB: protobuf message type
33+
template <typename KeyTuple, typename ValuePB, size_t NumShards = 16>
34+
class KvCache {
35+
public:
36+
explicit KvCache(size_t capacity, int64_t ttl_seconds = 0)
37+
: shard_capacity_(capacity / NumShards + 1), ttl_seconds_(ttl_seconds) {
38+
for (auto& shard : shards_) {
39+
shard = std::make_unique<Shard>(shard_capacity_, ttl_seconds);
40+
}
41+
}
42+
43+
// Query cache, returns true and fills value if hit
44+
bool get(const KeyTuple& key, ValuePB* value) { return get_shard(key)->get(key, value); }
45+
46+
// Write to cache
47+
void put(const KeyTuple& key, const ValuePB& value) { get_shard(key)->put(key, value); }
48+
49+
// Invalidate single entry
50+
void invalidate(const KeyTuple& key) { get_shard(key)->invalidate(key); }
51+
52+
void clear() {
53+
for (auto& shard : shards_) {
54+
shard->clear();
55+
}
56+
}
57+
58+
size_t size() const {
59+
size_t total = 0;
60+
for (const auto& shard : shards_) {
61+
total += shard->size();
62+
}
63+
return total;
64+
}
65+
66+
private:
67+
struct Entry {
68+
KeyTuple key;
69+
ValuePB value;
70+
int64_t expire_time;
71+
};
72+
73+
struct KeyHash {
74+
size_t operator()(const KeyTuple& k) const {
75+
return std::apply(
76+
[](const auto&... args) {
77+
size_t seed = 0;
78+
((seed ^= std::hash<std::decay_t<decltype(args)>> {}(args) + 0x9e3779b9 +
79+
(seed << 6) + (seed >> 2)),
80+
...);
81+
return seed;
82+
},
83+
k);
84+
}
85+
};
86+
87+
class Shard {
88+
public:
89+
explicit Shard(size_t capacity, int64_t ttl_seconds)
90+
: capacity_(capacity), ttl_seconds_(ttl_seconds) {}
91+
92+
bool get(const KeyTuple& key, ValuePB* value) {
93+
std::lock_guard lock(mu_);
94+
auto it = map_.find(key);
95+
if (it == map_.end()) {
96+
return false;
97+
}
98+
// Check TTL expiration
99+
if (ttl_seconds_ > 0 && it->second->expire_time < now_seconds()) {
100+
list_.erase(it->second);
101+
map_.erase(it);
102+
return false;
103+
}
104+
list_.splice(list_.begin(), list_, it->second);
105+
*value = it->second->value;
106+
return true;
107+
}
108+
109+
void put(const KeyTuple& key, const ValuePB& value) {
110+
std::lock_guard lock(mu_);
111+
int64_t expire_time = ttl_seconds_ > 0 ? now_seconds() + ttl_seconds_ : 0;
112+
auto it = map_.find(key);
113+
if (it != map_.end()) {
114+
it->second->value = value;
115+
it->second->expire_time = expire_time;
116+
list_.splice(list_.begin(), list_, it->second);
117+
return;
118+
}
119+
if (map_.size() >= capacity_) {
120+
map_.erase(list_.back().key);
121+
list_.pop_back();
122+
}
123+
list_.push_front({key, value, expire_time});
124+
map_[key] = list_.begin();
125+
}
126+
127+
void invalidate(const KeyTuple& key) {
128+
std::lock_guard lock(mu_);
129+
auto it = map_.find(key);
130+
if (it != map_.end()) {
131+
list_.erase(it->second);
132+
map_.erase(it);
133+
}
134+
}
135+
136+
void clear() {
137+
std::lock_guard lock(mu_);
138+
map_.clear();
139+
list_.clear();
140+
}
141+
142+
size_t size() const {
143+
std::lock_guard lock(mu_);
144+
return map_.size();
145+
}
146+
147+
private:
148+
static int64_t now_seconds() {
149+
return std::chrono::duration_cast<std::chrono::seconds>(
150+
std::chrono::steady_clock::now().time_since_epoch())
151+
.count();
152+
}
153+
154+
size_t capacity_;
155+
int64_t ttl_seconds_;
156+
mutable std::mutex mu_;
157+
std::list<Entry> list_;
158+
std::unordered_map<KeyTuple, typename std::list<Entry>::iterator, KeyHash> map_;
159+
};
160+
161+
Shard* get_shard(const KeyTuple& key) {
162+
size_t hash = KeyHash {}(key);
163+
return shards_[hash % NumShards].get();
164+
}
165+
166+
size_t shard_capacity_;
167+
int64_t ttl_seconds_;
168+
std::array<std::unique_ptr<Shard>, NumShards> shards_;
169+
};
170+
171+
} // namespace doris::cloud
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#pragma once
19+
20+
#include "common/kv_cache.h"
21+
#include "gen_cpp/cloud.pb.h"
22+
23+
namespace doris::cloud {
24+
25+
struct CacheConfig {
26+
size_t tablet_index_capacity = 10000;
27+
int64_t tablet_index_ttl_seconds = 0;
28+
};
29+
30+
class KvCacheManager {
31+
public:
32+
using TabletIndexCache = KvCache<std::tuple<std::string, int64_t>, TabletIndexPB>;
33+
34+
explicit KvCacheManager(const CacheConfig& config)
35+
: tablet_index_cache_(std::make_unique<TabletIndexCache>(
36+
config.tablet_index_capacity, config.tablet_index_ttl_seconds)) {}
37+
38+
TabletIndexCache* tablet_index_cache() { return tablet_index_cache_.get(); }
39+
40+
private:
41+
std::unique_ptr<TabletIndexCache> tablet_index_cache_;
42+
};
43+
44+
} // namespace doris::cloud

cloud/src/meta-service/meta_service.cpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
#include "common/bvars.h"
5353
#include "common/config.h"
5454
#include "common/encryption_util.h"
55+
#include "common/kv_cache_manager.h"
5556
#include "common/logging.h"
5657
#include "common/stats.h"
5758
#include "common/stopwatch.h"
@@ -78,6 +79,8 @@ using namespace std::chrono;
7879

7980
namespace doris::cloud {
8081

82+
static KvCacheManager* g_ms_cache_manager = nullptr;
83+
8184
MetaServiceImpl::MetaServiceImpl(std::shared_ptr<TxnKv> txn_kv,
8285
std::shared_ptr<ResourceManager> resource_mgr,
8386
std::shared_ptr<RateLimiter> rate_limiter,
@@ -90,6 +93,12 @@ MetaServiceImpl::MetaServiceImpl(std::shared_ptr<TxnKv> txn_kv,
9093
snapshot_manager_(std::move(snapshot_manager)) {
9194
rate_limiter_->init(this);
9295
delete_bitmap_lock_white_list_->init();
96+
CacheConfig config;
97+
config.tablet_index_capacity = config::ms_tablet_index_cache_capacity;
98+
config.tablet_index_ttl_seconds = config::tablet_index_cache_ttl_seconds;
99+
if (config::enable_tablet_index_cache) {
100+
g_ms_cache_manager = new KvCacheManager(config);
101+
}
93102
}
94103

95104
MetaServiceImpl::~MetaServiceImpl() = default;
@@ -191,6 +200,15 @@ bool is_dropped_tablet(Transaction* txn, const std::string& instance_id, int64_t
191200

192201
void get_tablet_idx(MetaServiceCode& code, std::string& msg, Transaction* txn,
193202
const std::string& instance_id, int64_t tablet_id, TabletIndexPB& tablet_idx) {
203+
// Cache lookup
204+
auto cache_key = std::make_tuple(instance_id, tablet_id);
205+
if (g_ms_cache_manager &&
206+
g_ms_cache_manager->tablet_index_cache()->get(cache_key, &tablet_idx)) {
207+
LOG(INFO) << "finish get tablet index from cache, tablet_id= " << tablet_id;
208+
return;
209+
}
210+
211+
// FDB read
194212
std::string key, val;
195213
meta_tablet_idx_key({instance_id, tablet_id}, &key);
196214
TxnErrorCode err = txn->get(key, &val);
@@ -216,6 +234,11 @@ void get_tablet_idx(MetaServiceCode& code, std::string& msg, Transaction* txn,
216234
<< " idx_pb_tablet_id=" << tablet_idx.tablet_id() << " key=" << hex(key);
217235
return;
218236
}
237+
238+
// Cache put
239+
if (g_ms_cache_manager) {
240+
g_ms_cache_manager->tablet_index_cache()->put(cache_key, tablet_idx);
241+
}
219242
}
220243

221244
void MetaServiceImpl::get_version(::google::protobuf::RpcController* controller,

cloud/src/recycler/recycler.cpp

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
#include <variant>
4646

4747
#include "common/defer.h"
48+
#include "common/kv_cache_manager.h"
4849
#include "common/stopwatch.h"
4950
#include "meta-service/meta_service.h"
5051
#include "meta-service/meta_service_helper.h"
@@ -408,6 +409,10 @@ int Recycler::start(brpc::Server* server) {
408409
g_bvar_recycler_task_max_concurrency.set_value(config::recycle_concurrency);
409410
S3Environment::getInstance();
410411

412+
if(config::enable_tablet_index_cache) {
413+
init_recycler_cache();
414+
}
415+
411416
if (config::enable_checker) {
412417
checker_ = std::make_unique<Checker>(txn_kv_);
413418
int ret = checker_->start();
@@ -2806,7 +2811,22 @@ int InstanceRecycler::recycle_tablets(int64_t table_id, int64_t index_id,
28062811
}
28072812
for (auto& k : tablet_idx_keys) {
28082813
txn->remove(k);
2814+
// Invalidate cache for removed tablet_idx_keys
2815+
if (g_recycler_cache_manager) {
2816+
// Extract tablet_id from key: meta_tablet_idx_key({instance_id, tablet_id})
2817+
// The key format is known, we can parse tablet_id from it
2818+
std::string_view k1 = k;
2819+
k1.remove_prefix(1);
2820+
// 0x01 "meta" ${instance_id} "tablet_index" ${tablet_id}
2821+
std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
2822+
decode_key(&k1, &out);
2823+
DCHECK_EQ(out.size(), 4) << k1;
2824+
auto tablet_id = std::get<int64_t>(std::get<0>(out[3]));
2825+
g_recycler_cache_manager->tablet_index_cache()->invalidate(
2826+
std::make_tuple(instance_id_, tablet_id));
2827+
}
28092828
}
2829+
28102830
for (auto& k : restore_job_keys) {
28112831
txn->remove(k);
28122832
}

cloud/src/recycler/util.cpp

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,24 @@
2121

2222
#include <cstdint>
2323

24+
#include "common/config.h"
25+
#include "common/kv_cache_manager.h"
2426
#include "common/util.h"
2527
#include "meta-service/meta_service_schema.h"
2628
#include "meta-store/keys.h"
2729
#include "meta-store/txn_kv.h"
2830
#include "meta-store/txn_kv_error.h"
2931

3032
namespace doris::cloud {
33+
34+
KvCacheManager* g_recycler_cache_manager = nullptr;
35+
36+
void init_recycler_cache() {
37+
CacheConfig config;
38+
config.tablet_index_capacity = config::recycler_tablet_index_cache_capacity;
39+
config.tablet_index_ttl_seconds = config::tablet_index_cache_ttl_seconds;
40+
g_recycler_cache_manager = new KvCacheManager(config);
41+
}
3142
namespace config {
3243
extern int32_t recycle_job_lease_expired_ms;
3344
} // namespace config
@@ -237,6 +248,14 @@ int lease_instance_recycle_job(TxnKv* txn_kv, std::string_view key, const std::s
237248
// ret: 0: success, 1: tablet not found, -1: failed
238249
int get_tablet_idx(TxnKv* txn_kv, const std::string& instance_id, int64_t tablet_id,
239250
TabletIndexPB& tablet_idx) {
251+
// Cache lookup
252+
auto cache_key = std::make_tuple(instance_id, tablet_id);
253+
if (g_recycler_cache_manager &&
254+
g_recycler_cache_manager->tablet_index_cache()->get(cache_key, &tablet_idx)) {
255+
LOG(INFO) << "finish get tablet index from cache, tablet_id= " << tablet_id;
256+
return 0;
257+
}
258+
240259
std::unique_ptr<Transaction> txn;
241260
TxnErrorCode err = txn_kv->create_txn(&txn);
242261
if (err != TxnErrorCode::TXN_OK) {
@@ -266,6 +285,11 @@ int get_tablet_idx(TxnKv* txn_kv, const std::string& instance_id, int64_t tablet
266285
<< " idx_pb_tablet_id=" << tablet_idx.tablet_id() << " key=" << hex(key);
267286
return -1;
268287
}
288+
289+
// Cache put
290+
if (g_recycler_cache_manager) {
291+
g_recycler_cache_manager->tablet_index_cache()->put(cache_key, tablet_idx);
292+
}
269293
return 0;
270294
}
271295

cloud/src/recycler/util.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,4 +97,10 @@ int get_tablet_idx(TxnKv* txn_kv, const std::string& instance_id, int64_t tablet
9797

9898
int get_tablet_meta(TxnKv* txn_kv, const std::string& instance_id, int64_t tablet_id,
9999
TabletMetaCloudPB& tablet_meta);
100+
101+
void init_recycler_cache();
102+
103+
class KvCacheManager;
104+
extern KvCacheManager* g_recycler_cache_manager;
105+
100106
} // namespace doris::cloud

0 commit comments

Comments
 (0)