Skip to content

Commit 2d9ec45

Browse files
authored
[improvement](cloud) Add enable_recycler config to skip recycler dynamically (#63286)
1 parent 4483daf commit 2d9ec45

3 files changed

Lines changed: 110 additions & 18 deletions

File tree

cloud/src/common/config.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -443,4 +443,6 @@ CONF_Bool(enable_instance_update_watcher, "true");
443443
CONF_mBool(advance_txn_lazy_commit_during_reads, "true");
444444
CONF_mBool(wait_txn_lazy_commit_during_reads, "true");
445445

446+
// Whether to enable recycler. If false, the recycler will skip scanning instances to pending queue.
447+
CONF_mBool(enable_recycler, "true");
446448
} // namespace doris::cloud::config

cloud/src/recycler/recycler.cpp

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -249,26 +249,30 @@ void Recycler::instance_scanner_callback() {
249249
std::this_thread::sleep_for(
250250
std::chrono::seconds(config::recycler_sleep_before_scheduling_seconds));
251251
while (!stopped()) {
252-
std::vector<InstanceInfoPB> instances;
253-
get_all_instances(txn_kv_.get(), instances);
254-
// TODO(plat1ko): delete job recycle kv of non-existent instances
255-
LOG(INFO) << "Recycler get instances: " << [&instances] {
256-
std::stringstream ss;
257-
for (auto& i : instances) ss << ' ' << i.instance_id();
258-
return ss.str();
259-
}();
260-
if (!instances.empty()) {
261-
// enqueue instances
262-
std::lock_guard lock(mtx_);
263-
for (auto& instance : instances) {
264-
if (instance_filter_.filter_out(instance.instance_id())) continue;
265-
auto [_, success] = pending_instance_set_.insert(instance.instance_id());
266-
// skip instance already in pending queue
267-
if (success) {
268-
pending_instance_queue_.push_back(std::move(instance));
252+
if (config::enable_recycler) {
253+
std::vector<InstanceInfoPB> instances;
254+
get_all_instances(txn_kv_.get(), instances);
255+
// TODO(plat1ko): delete job recycle kv of non-existent instances
256+
LOG(INFO) << "Recycler get instances: " << [&instances] {
257+
std::stringstream ss;
258+
for (auto& i : instances) ss << ' ' << i.instance_id();
259+
return ss.str();
260+
}();
261+
if (!instances.empty()) {
262+
// enqueue instances
263+
std::lock_guard lock(mtx_);
264+
for (auto& instance : instances) {
265+
if (instance_filter_.filter_out(instance.instance_id())) continue;
266+
auto [_, success] = pending_instance_set_.insert(instance.instance_id());
267+
// skip instance already in pending queue
268+
if (success) {
269+
pending_instance_queue_.push_back(std::move(instance));
270+
}
269271
}
272+
pending_instance_cond_.notify_all();
270273
}
271-
pending_instance_cond_.notify_all();
274+
} else {
275+
LOG(WARNING) << "Skip recycler since enable_recycler is false";
272276
}
273277
{
274278
std::unique_lock lock(mtx_);
@@ -298,6 +302,11 @@ void Recycler::recycle_callback() {
298302
// skip instance in recycling
299303
if (recycling_instance_map_.count(instance_id)) continue;
300304
}
305+
if (!config::enable_recycler) {
306+
LOG(WARNING) << "Skip recycle instance_id=" << instance_id
307+
<< " since enable_recycler is false";
308+
continue;
309+
}
301310
auto instance_recycler = std::make_shared<InstanceRecycler>(
302311
txn_kv_, instance, _thread_pool_group, txn_lazy_committer_);
303312

cloud/test/recycler_test.cpp

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8693,4 +8693,85 @@ TEST(RecyclerTest, recycle_tablet_with_delete_file_failure) {
86938693
EXPECT_EQ(it->size(), 0) << "All recycle rowset keys should be deleted";
86948694
}
86958695
}
8696+
8697+
TEST(RecyclerTest, enable_recycler_default_true) {
8698+
EXPECT_TRUE(config::enable_recycler);
8699+
}
8700+
8701+
TEST(RecyclerTest, enable_recycler_skip_instance_scanner) {
8702+
auto txn_kv = std::make_shared<MemTxnKv>();
8703+
ASSERT_EQ(txn_kv->init(), 0);
8704+
8705+
bool old_val = config::enable_recycler;
8706+
config::enable_recycler = false;
8707+
DORIS_CLOUD_DEFER {
8708+
config::enable_recycler = old_val;
8709+
};
8710+
8711+
int64_t old_recycle_interval = config::recycle_interval_seconds;
8712+
config::recycle_interval_seconds = 0;
8713+
DORIS_CLOUD_DEFER {
8714+
config::recycle_interval_seconds = old_recycle_interval;
8715+
};
8716+
8717+
int64_t old_sleep = config::recycler_sleep_before_scheduling_seconds;
8718+
config::recycler_sleep_before_scheduling_seconds = 0;
8719+
DORIS_CLOUD_DEFER {
8720+
config::recycler_sleep_before_scheduling_seconds = old_sleep;
8721+
};
8722+
8723+
Recycler recycler(txn_kv);
8724+
std::thread t([&]() { recycler.instance_scanner_callback(); });
8725+
8726+
// Let the callback complete one iteration:
8727+
// sleep(0) -> check enable_recycler (false, skip) -> wait_for(0, timeout)
8728+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
8729+
8730+
recycler.stopped_ = true;
8731+
recycler.notifier_.notify_all();
8732+
t.join();
8733+
8734+
EXPECT_TRUE(recycler.pending_instance_queue_.empty());
8735+
}
8736+
8737+
TEST(RecyclerTest, enable_recycler_skip_recycle_callback) {
8738+
auto txn_kv = std::make_shared<MemTxnKv>();
8739+
ASSERT_EQ(txn_kv->init(), 0);
8740+
8741+
bool old_val = config::enable_recycler;
8742+
config::enable_recycler = false;
8743+
DORIS_CLOUD_DEFER {
8744+
config::enable_recycler = old_val;
8745+
};
8746+
8747+
Recycler recycler(txn_kv);
8748+
8749+
InstanceInfoPB instance;
8750+
instance.set_instance_id("test_instance");
8751+
recycler.pending_instance_queue_.push_back(instance);
8752+
recycler.pending_instance_set_.insert("test_instance");
8753+
8754+
std::thread t([&]() { recycler.recycle_callback(); });
8755+
8756+
// Wait until the callback has popped the instance from the queue.
8757+
// Can not wait on pending_instance_cond_ here because the callback does
8758+
// not notify after popping, which may cause a deadlock: both the main
8759+
// thread and the callback end up waiting on the same CV with different
8760+
// predicates and no one will wake them up.
8761+
while (true) {
8762+
{
8763+
std::lock_guard lock(recycler.mtx_);
8764+
if (recycler.pending_instance_queue_.empty()) break;
8765+
}
8766+
std::this_thread::yield();
8767+
}
8768+
8769+
recycler.stopped_ = true;
8770+
recycler.pending_instance_cond_.notify_all();
8771+
t.join();
8772+
8773+
EXPECT_TRUE(recycler.pending_instance_queue_.empty());
8774+
EXPECT_TRUE(recycler.pending_instance_set_.empty());
8775+
EXPECT_TRUE(recycler.recycling_instance_map_.empty());
8776+
}
86968777
} // namespace doris::cloud

0 commit comments

Comments
 (0)