Skip to content

Commit 4a0c58b

Browse files
gavinchougavinchou
andauthored
[fix](cloud) Drain txn lazy committer workers before destruction (#63876)
## What Fix shutdown ordering in `TxnLazyCommitter` by explicitly stopping worker pools before member destruction can invalidate state used by worker callbacks. ## Why Lazy commit worker jobs keep a back pointer to `TxnLazyCommitter` and call back into `remove()`. They can also access the parallel commit pool and resource manager during `commit()`. With the default destructor, `running_tasks_`, `mutex_`, and `parallel_commit_pool_` are destroyed before `worker_pool_` is joined, which can lead to shutdown-time use-after-destruction. ## How - Add an explicit `TxnLazyCommitter` destructor. - Mark the committer as stopped before draining workers. - Stop and join the lazy commit worker pool before destroying task tracking state. - Stop the parallel commit pool after lazy workers are quiesced. - Make failed or post-shutdown submissions complete with an error instead of leaving waiters blocked. ## Tests - `sh format_code.sh cloud/src/meta-service/txn_lazy_committer.h` - `sh format_code.sh cloud/src/meta-service/txn_lazy_committer.cpp` - `sh run-cloud-ut.sh --run --fdb "fdb_cluster0:cluster0@10.26.20.4:4500"` - Build passed. - `txn_lazy_commit_test` passed 24/24 in the full run. - The full run had unrelated storage vault/HDFS failures in `meta_service_test`. - After tightening the submit/shutdown race: - `sh run-cloud-ut.sh --run --fdb "fdb_cluster0:cluster0@10.26.20.4:4500" --filter "txn_lazy_commit_test:*.*"` - Build passed; 22/24 passed, 2 tests failed due FDB `Timeout` while committing setup transactions. Co-authored-by: gavinchou <gavinchou@apache.org>
1 parent e072997 commit 4a0c58b

2 files changed

Lines changed: 54 additions & 12 deletions

File tree

cloud/src/meta-service/txn_lazy_committer.cpp

Lines changed: 46 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -633,14 +633,20 @@ TxnLazyCommitTask::TxnLazyCommitTask(const std::string& instance_id, int64_t txn
633633
DCHECK(txn_id > 0);
634634
}
635635

636+
void TxnLazyCommitTask::finish(MetaServiceCode code, std::string msg) {
637+
{
638+
std::unique_lock lock(mutex_);
639+
finished_ = true;
640+
code_ = code;
641+
msg_ = std::move(msg);
642+
}
643+
cond_.notify_all();
644+
}
645+
636646
void TxnLazyCommitTask::commit() {
637647
StopWatch sw;
638648
DORIS_CLOUD_DEFER {
639-
{
640-
std::unique_lock lock(mutex_);
641-
this->finished_ = true;
642-
}
643-
this->cond_.notify_all();
649+
finish(code_, msg_);
644650
g_bvar_txn_lazy_committer_committing_duration << sw.elapsed_us();
645651
};
646652

@@ -965,6 +971,25 @@ TxnLazyCommitter::TxnLazyCommitter(std::shared_ptr<TxnKv> txn_kv,
965971
parallel_commit_pool_->start();
966972
}
967973

974+
TxnLazyCommitter::~TxnLazyCommitter() {
975+
{
976+
std::unique_lock<std::mutex> lock(mutex_);
977+
stopped_ = true;
978+
}
979+
980+
if (worker_pool_ != nullptr) {
981+
worker_pool_->stop();
982+
}
983+
if (parallel_commit_pool_ != nullptr) {
984+
parallel_commit_pool_->stop();
985+
}
986+
987+
{
988+
std::unique_lock<std::mutex> lock(mutex_);
989+
running_tasks_.clear();
990+
}
991+
}
992+
968993
/**
969994
* @brief Submit a lazy commit txn task
970995
*
@@ -978,6 +1003,12 @@ std::shared_ptr<TxnLazyCommitTask> TxnLazyCommitter::submit(const std::string& i
9781003
std::shared_ptr<TxnLazyCommitTask> task;
9791004
{
9801005
std::unique_lock<std::mutex> lock(mutex_);
1006+
if (stopped_) {
1007+
task = std::make_shared<TxnLazyCommitTask>(instance_id, txn_id, txn_kv_, this);
1008+
task->finish(MetaServiceCode::UNDEFINED_ERR, "txn lazy committer is stopped");
1009+
return task;
1010+
}
1011+
9811012
auto iter = running_tasks_.find(txn_id);
9821013
if (iter != running_tasks_.end()) {
9831014
return iter->second;
@@ -986,13 +1017,17 @@ std::shared_ptr<TxnLazyCommitTask> TxnLazyCommitter::submit(const std::string& i
9861017
task = std::make_shared<TxnLazyCommitTask>(instance_id, txn_id, txn_kv_, this);
9871018
running_tasks_.emplace(txn_id, task);
9881019
g_bvar_txn_lazy_committer_submitted << 1;
989-
}
9901020

991-
worker_pool_->submit([task]() {
992-
task->commit();
993-
task->txn_lazy_committer_->remove(task->txn_id_);
994-
g_bvar_txn_lazy_committer_finished << 1;
995-
});
1021+
int ret = worker_pool_->submit([task]() {
1022+
task->commit();
1023+
task->txn_lazy_committer_->remove(task->txn_id_);
1024+
g_bvar_txn_lazy_committer_finished << 1;
1025+
});
1026+
if (ret != 0) {
1027+
running_tasks_.erase(txn_id);
1028+
task->finish(MetaServiceCode::UNDEFINED_ERR, "failed to submit txn lazy commit task");
1029+
}
1030+
}
9961031
DCHECK(task != nullptr);
9971032
return task;
9981033
}

cloud/src/meta-service/txn_lazy_committer.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ class TxnLazyCommitTask {
4545
private:
4646
friend class TxnLazyCommitter;
4747

48+
// Marks the task as finished with the final result and wakes all waiters.
49+
// `code` is returned by wait() as the task status, and `msg` carries the
50+
// corresponding error detail or an empty string on success.
51+
void finish(MetaServiceCode code, std::string msg);
52+
4853
std::pair<MetaServiceCode, std::string> commit_partition(
4954
int64_t db_id, int64_t partition_id,
5055
const std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>& tmp_rowset_metas,
@@ -66,6 +71,7 @@ class TxnLazyCommitter {
6671
public:
6772
TxnLazyCommitter(std::shared_ptr<TxnKv> txn_kv);
6873
TxnLazyCommitter(std::shared_ptr<TxnKv> txn_kv, std::shared_ptr<ResourceManager> resource_mgr);
74+
~TxnLazyCommitter();
6975
std::shared_ptr<TxnLazyCommitTask> submit(const std::string& instance_id, int64_t txn_id);
7076
void remove(int64_t txn_id);
7177

@@ -82,5 +88,6 @@ class TxnLazyCommitter {
8288
std::mutex mutex_;
8389
// <txn_id, TxnLazyCommitTask>
8490
std::unordered_map<int64_t, std::shared_ptr<TxnLazyCommitTask>> running_tasks_;
91+
bool stopped_ = false;
8592
};
86-
} // namespace doris::cloud
93+
} // namespace doris::cloud

0 commit comments

Comments
 (0)