Skip to content

Commit f6499b1

Browse files
visualYJDketor
authored andcommitted
[fix][store] Mark rollback when lock miss.
1 parent 65d3300 commit f6499b1

4 files changed

Lines changed: 96 additions & 36 deletions

File tree

src/engine/mono_store_engine.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -677,11 +677,12 @@ butil::Status MonoStoreEngine::TxnWriter::TxnCheckTxnStatus(std::shared_ptr<Cont
677677
butil::Status MonoStoreEngine::TxnWriter::TxnCheckSecondaryLocks(std::shared_ptr<Context> ctx, store::RegionPtr region,
678678
int64_t start_ts,
679679
const std::vector<std::string>& keys) {
680-
return TxnEngineHelper::TxnCheckSecondaryLocks(txn_writer_raw_engine_, ctx, region, start_ts, keys);
680+
return TxnEngineHelper::TxnCheckSecondaryLocks(txn_writer_raw_engine_, mono_engine_, ctx, region, start_ts, keys);
681681
}
682682

683683
butil::Status MonoStoreEngine::TxnWriter::TxnResolveLock(std::shared_ptr<Context> ctx, int64_t start_ts,
684-
int64_t commit_ts, const std::vector<std::string>& keys, const std::map<int64_t, int64_t>& txn_infos) {
684+
int64_t commit_ts, const std::vector<std::string>& keys,
685+
const std::map<int64_t, int64_t>& txn_infos) {
685686
return TxnEngineHelper::ResolveLock(txn_writer_raw_engine_, mono_engine_, ctx, start_ts, commit_ts, keys, txn_infos);
686687
}
687688

src/engine/raft_store_engine.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -658,11 +658,12 @@ butil::Status RaftStoreEngine::TxnWriter::TxnCheckTxnStatus(std::shared_ptr<Cont
658658
butil::Status RaftStoreEngine::TxnWriter::TxnCheckSecondaryLocks(std::shared_ptr<Context> ctx, store::RegionPtr region,
659659
int64_t start_ts,
660660
const std::vector<std::string>& keys) {
661-
return TxnEngineHelper::TxnCheckSecondaryLocks(txn_writer_raw_engine_, ctx, region, start_ts, keys);
661+
return TxnEngineHelper::TxnCheckSecondaryLocks(txn_writer_raw_engine_, raft_engine_, ctx, region, start_ts, keys);
662662
}
663663

664664
butil::Status RaftStoreEngine::TxnWriter::TxnResolveLock(std::shared_ptr<Context> ctx, int64_t start_ts,
665-
int64_t commit_ts, const std::vector<std::string>& keys, const std::map<int64_t, int64_t>& txn_infos) {
665+
int64_t commit_ts, const std::vector<std::string>& keys,
666+
const std::map<int64_t, int64_t>& txn_infos) {
666667
return TxnEngineHelper::ResolveLock(txn_writer_raw_engine_, raft_engine_, ctx, start_ts, commit_ts, keys, txn_infos);
667668
}
668669

src/engine/txn_engine_helper.cc

Lines changed: 82 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -3394,7 +3394,7 @@ butil::Status TxnEngineHelper::CheckTxnStatus(RawEnginePtr raw_engine, std::shar
33943394
Helper::StringToHex(primary_key))
33953395
<< ", region_epoch: " << ctx->RegionEpoch().ShortDebugString() << ", lock_ts: " << lock_ts
33963396
<< ", caller_start_ts: " << caller_start_ts << ", current_ts: " << current_ts
3397-
<< ", force_sync_commit: " << force_sync_commit;
3397+
<< ", force_sync_commit: " << force_sync_commit << ", rollback_if_not_exist: " << rollback_if_not_exist;
33983398

33993399
// we need to do if primay_key is in this region'range in service before apply to raft state machine
34003400
// use reader to get if the lock is exists, if lock is exists, check if the lock is expired its ttl, if expired do
@@ -3460,9 +3460,9 @@ butil::Status TxnEngineHelper::CheckTxnStatus(RawEnginePtr raw_engine, std::shar
34603460
if (force_sync_commit) {
34613461
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_txn_detail)
34623462
<< fmt::format("[txn][region({})] CheckTxnStatus,", region->Id())
3463-
<< "fallback is set, check_txn_status treats it as a non-async-commit txn" << "primary_key: " << primary_key
3464-
<< ", lock_info: " << lock_info.ShortDebugString() << ", lock_ts: " << lock_ts
3465-
<< ", caller_start_ts: " << caller_start_ts << ", current_ts: " << current_ts;
3463+
<< "fallback is set, check_txn_status treats it as a non-async-commit txn"
3464+
<< "primary_key: " << primary_key << ", lock_info: " << lock_info.ShortDebugString()
3465+
<< ", lock_ts: " << lock_ts << ", caller_start_ts: " << caller_start_ts << ", current_ts: " << current_ts;
34663466
} else {
34673467
*txn_result->mutable_locked() = lock_info;
34683468
// async-commit locks can't be resolved until they expire.
@@ -3552,6 +3552,7 @@ butil::Status TxnEngineHelper::CheckTxnStatus(RawEnginePtr raw_engine, std::shar
35523552
// lock is expired, do rollback
35533553
std::vector<std::string> keys_to_rollback_with_data;
35543554
std::vector<std::string> keys_to_rollback_without_data;
3555+
std::vector<std::string> keys_miss_lock_to_rollback;
35553556
if (lock_info.short_value().empty()) {
35563557
keys_to_rollback_with_data.push_back(primary_key);
35573558
} else {
@@ -3560,8 +3561,8 @@ butil::Status TxnEngineHelper::CheckTxnStatus(RawEnginePtr raw_engine, std::shar
35603561
DINGO_LOG(INFO) << fmt::format("[txn][region({})] CheckTxnStatus,", region->Id())
35613562
<< ", do rollback, primary_key: " << Helper::StringToHex(primary_key) << ", lock_ts: " << lock_ts
35623563
<< ", lock_info: " << lock_info.ShortDebugString();
3563-
auto ret =
3564-
DoRollback(raw_engine, raft_engine, ctx, keys_to_rollback_with_data, keys_to_rollback_without_data, lock_ts);
3564+
auto ret = DoRollback(raw_engine, raft_engine, ctx, keys_to_rollback_with_data, keys_to_rollback_without_data,
3565+
keys_miss_lock_to_rollback, lock_ts);
35653566
if (!ret.ok()) {
35663567
DINGO_LOG(ERROR) << fmt::format("[txn][region({})] CheckTxnStatus,", region->Id())
35673568
<< ", rollback failed, primary_key: " << Helper::StringToHex(primary_key)
@@ -3693,6 +3694,7 @@ butil::Status TxnEngineHelper::BatchRollback(RawEnginePtr raw_engine, std::share
36933694

36943695
std::vector<std::string> keys_to_rollback_with_data;
36953696
std::vector<std::string> keys_to_rollback_without_data;
3697+
std::vector<std::string> keys_miss_lock_to_rollback;
36963698
for (const auto &key : keys) {
36973699
pb::store::LockInfo lock_info;
36983700
auto ret = txn_reader.GetLockInfo(key, lock_info);
@@ -3702,14 +3704,44 @@ butil::Status TxnEngineHelper::BatchRollback(RawEnginePtr raw_engine, std::share
37023704
<< ", status: " << ret.error_str();
37033705
}
37043706

3705-
// if lock is not exist, nothing to do
3707+
// when concurrency prewrite, prewrite secondary key success,but prewrite primary meet write conflict,rollback
3708+
// transaction,primary key lock not exist. we need record rollback for primary key for speeding up transaction
3709+
// conflict handling.
37063710
if (lock_info.primary_lock().empty()) {
3707-
DINGO_LOG(WARNING) << fmt::format("[txn][region({})] BatchRollback", region->Id())
3708-
<< ", txn_not_found with lock_info empty, key: " << Helper::StringToHex(key)
3709-
<< ", start_ts: " << start_ts;
3711+
DINGO_LOG(INFO) << fmt::format("[txn][region({})] BatchRollback", region->Id())
3712+
<< ", txn_not_found with lock_info empty, key: " << Helper::StringToHex(key)
3713+
<< ", start_ts: " << start_ts;
37103714

3711-
// auto *txn_not_found = txn_result->mutable_txn_not_found();
3712-
// txn_not_found->set_start_ts(start_ts);
3715+
// the lock is not exists, check if it is rollbacked or committed
3716+
// try to get if there is a rollback to lock_ts
3717+
pb::store::WriteInfo write_info;
3718+
auto ret1 = txn_reader.GetRollbackInfo(start_ts, key, write_info);
3719+
if (!ret1.ok()) {
3720+
DINGO_LOG(FATAL) << fmt::format("[txn][region({})] BatchRollback, ", region->Id())
3721+
<< ", get rollback info failed, key: " << Helper::StringToHex(key)
3722+
<< ", start_ts: " << start_ts << ", status: " << ret1.error_str();
3723+
}
3724+
3725+
if (write_info.start_ts() == start_ts) {
3726+
// has been rollbacked.
3727+
continue;
3728+
}
3729+
3730+
// if there is not a rollback to lock_ts, try to get the commit_ts
3731+
int64_t commit_ts = 0;
3732+
auto ret2 =
3733+
txn_reader.GetWriteInfo(start_ts, Constant::kMaxVer, start_ts, key, false, true, true, write_info, commit_ts);
3734+
if (!ret2.ok()) {
3735+
DINGO_LOG(FATAL) << fmt::format("[txn][region({})] BatchRollback,", region->Id())
3736+
<< ", get write info failed, key: " << Helper::StringToHex(key) << ", lock_ts: " << start_ts
3737+
<< ", status: " << ret2.error_str();
3738+
}
3739+
3740+
if (commit_ts == 0) {
3741+
keys_miss_lock_to_rollback.push_back(key);
3742+
DINGO_LOG(INFO) << fmt::format("[txn][region({})] BatchRollback,start_ts:{} key:{} MarkRollBackOnMissingLock.",
3743+
region->Id(), start_ts, Helper::StringToHex(key));
3744+
}
37133745
continue;
37143746
}
37153747

@@ -3752,8 +3784,8 @@ butil::Status TxnEngineHelper::BatchRollback(RawEnginePtr raw_engine, std::share
37523784
}
37533785

37543786
// do rollback
3755-
auto ret =
3756-
DoRollback(raw_engine, raft_engine, ctx, keys_to_rollback_with_data, keys_to_rollback_without_data, start_ts);
3787+
auto ret = DoRollback(raw_engine, raft_engine, ctx, keys_to_rollback_with_data, keys_to_rollback_without_data,
3788+
keys_miss_lock_to_rollback, start_ts);
37573789
if (!ret.ok()) {
37583790
DINGO_LOG(ERROR) << fmt::format("[txn][region({})] BatchRollback, ", region->Id())
37593791
<< ", rollback failed, status: " << ret.error_str();
@@ -3769,14 +3801,17 @@ bvar::LatencyRecorder g_txn_do_rollback_latency("dingo_txn_do_rollback");
37693801
butil::Status TxnEngineHelper::DoRollback(RawEnginePtr /*raw_engine*/, std::shared_ptr<Engine> raft_engine,
37703802
std::shared_ptr<Context> ctx,
37713803
std::vector<std::string> &keys_to_rollback_with_data,
3772-
std::vector<std::string> &keys_to_rollback_without_data, int64_t start_ts) {
3804+
std::vector<std::string> &keys_to_rollback_without_data,
3805+
std::vector<std::string> &keys_miss_lock_to_rollback, int64_t start_ts) {
37733806
BvarLatencyGuard bvar_guard(&g_txn_do_rollback_latency);
37743807

37753808
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_txn_detail)
37763809
<< "[txn]Rollback start_ts: " << start_ts << ", keys_count_with_data: " << keys_to_rollback_with_data.size()
3777-
<< ", keys_count_without_data: " << keys_to_rollback_without_data.size();
3810+
<< ", keys_count_without_data: " << keys_to_rollback_without_data.size()
3811+
<< ", keys_miss_lock_to_rollback:" << keys_miss_lock_to_rollback.size();
37783812

3779-
if (keys_to_rollback_without_data.empty() && keys_to_rollback_with_data.empty()) {
3813+
if (keys_to_rollback_without_data.empty() && keys_to_rollback_with_data.empty() &&
3814+
keys_miss_lock_to_rollback.empty()) {
37803815
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_txn_detail) << "[txn]Rollback nothing to do, start_ts: " << start_ts;
37813816
return butil::Status::OK();
37823817
}
@@ -3818,6 +3853,18 @@ butil::Status TxnEngineHelper::DoRollback(RawEnginePtr /*raw_engine*/, std::shar
38183853
kv_puts_write.emplace_back(kv);
38193854
}
38203855

3856+
for (const auto &key : keys_miss_lock_to_rollback) {
3857+
// add write
3858+
pb::store::WriteInfo write_info;
3859+
write_info.set_start_ts(start_ts);
3860+
write_info.set_op(::dingodb::pb::store::Op::Rollback);
3861+
3862+
pb::common::KeyValue kv;
3863+
kv.set_key(mvcc::Codec::EncodeKey(key, start_ts));
3864+
kv.set_value(write_info.SerializeAsString());
3865+
kv_puts_write.emplace_back(kv);
3866+
}
3867+
38213868
if (kv_puts_write.empty() && kv_deletes_lock.empty() && kv_deletes_data.empty()) {
38223869
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_txn_detail) << "[txn]Rollback nothing to do, start_ts: " << start_ts;
38233870
return butil::Status::OK();
@@ -3908,9 +3955,9 @@ bvar::LatencyRecorder g_txn_check_secondary_locks_latency("dingo_txn_check_secon
39083955
///
39093956
/// If all prewritten locks exist, the lock information is returned.
39103957
/// Otherwise, it returns the commit timestamp of the transaction.
3911-
butil::Status TxnEngineHelper::TxnCheckSecondaryLocks(RawEnginePtr raw_engine, std::shared_ptr<Context> ctx,
3912-
store::RegionPtr region, int64_t start_ts,
3913-
const std::vector<std::string> &keys) {
3958+
butil::Status TxnEngineHelper::TxnCheckSecondaryLocks(RawEnginePtr raw_engine, std::shared_ptr<Engine> raft_engine,
3959+
std::shared_ptr<Context> ctx, store::RegionPtr region,
3960+
int64_t start_ts, const std::vector<std::string> &keys) {
39143961
BvarLatencyGuard bvar_guard(&g_txn_check_secondary_locks_latency);
39153962

39163963
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_txn_detail)
@@ -3992,14 +4039,19 @@ butil::Status TxnEngineHelper::TxnCheckSecondaryLocks(RawEnginePtr raw_engine, s
39924039
}
39934040

39944041
if (commit_ts == 0) {
3995-
// it seems there is a lock previously exists, but it is not committed, and there is no rollback, there must
3996-
// be some error, return TxnNotFound
4042+
auto ret3 = MarkRollBackOnMissingLock(raw_engine, raft_engine, ctx, key, start_ts);
4043+
if (!ret3.ok()) {
4044+
DINGO_LOG(ERROR) << fmt::format("[txn][region({})] CheckTxnStatus", region->Id())
4045+
<< ", MarkRollBackOnMissingLock failed";
4046+
return butil::Status(pb::error::Errno::EINTERNAL, "MarkRollBackOnMissingLock failed");
4047+
}
4048+
39974049
auto *txn_not_found = txn_result->mutable_txn_not_found();
39984050
txn_not_found->set_primary_key(key);
39994051
txn_not_found->set_start_ts(start_ts);
4000-
DINGO_LOG(ERROR)
4052+
DINGO_LOG(WARNING)
40014053
<< fmt::format("[txn][region({})] CheckSecondaryLocks,", region->Id())
4002-
<< ", cannot found the transaction, maybe some error ocurred, return txn_not_found, secondary_key: "
4054+
<< ", cannot found the transaction, mark rollback on missing lock, return txn_not_found, secondary_key: "
40034055
<< Helper::StringToHex(key) << ", start_ts: " << start_ts
40044056
<< ", lock_info: " << lock_info.ShortDebugString();
40054057
return butil::Status::OK();
@@ -4026,6 +4078,7 @@ butil::Status TxnEngineHelper::BatchResolveLock(RawEnginePtr raw_engine, std::sh
40264078
std::vector<pb::store::LockInfo> lock_infos_to_commit;
40274079
std::vector<std::string> keys_to_rollback_with_data;
40284080
std::vector<std::string> keys_to_rollback_without_data;
4081+
std::vector<std::string> keys_miss_lock_to_rollback;
40294082

40304083
auto stream = Stream::New(FLAGS_stream_message_max_limit_size);
40314084
std::vector<pb::store::LockInfo> tmp_lock_infos;
@@ -4087,8 +4140,8 @@ butil::Status TxnEngineHelper::BatchResolveLock(RawEnginePtr raw_engine, std::sh
40874140
DINGO_LOG(INFO) << fmt::format("[txn][region({})] BatchResolveLock, ", region->Id()) << "primary key:" << key
40884141
<< ", do rollback without data, start_ts: " << start_ts;
40894142
}
4090-
auto ret =
4091-
DoRollback(raw_engine, raft_engine, ctx, keys_to_rollback_with_data, keys_to_rollback_without_data, start_ts);
4143+
auto ret = DoRollback(raw_engine, raft_engine, ctx, keys_to_rollback_with_data, keys_to_rollback_without_data,
4144+
keys_miss_lock_to_rollback, start_ts);
40924145
if (!ret.ok()) {
40934146
DINGO_LOG(ERROR) << fmt::format("[txn][region({})] BatchResolveLock, ", region->Id())
40944147
<< ", rollback failed, start_ts: " << start_ts << ", status: " << ret.error_str();
@@ -4153,6 +4206,7 @@ butil::Status TxnEngineHelper::ResolveLock(RawEnginePtr raw_engine, std::shared_
41534206
std::vector<pb::store::LockInfo> lock_infos_to_commit;
41544207
std::vector<std::string> keys_to_rollback_with_data;
41554208
std::vector<std::string> keys_to_rollback_without_data;
4209+
std::vector<std::string> keys_miss_lock_to_rollback;
41564210

41574211
// if keys is not empty, we only do resolve lock for these keys
41584212
if (!keys.empty()) {
@@ -4266,8 +4320,8 @@ butil::Status TxnEngineHelper::ResolveLock(RawEnginePtr raw_engine, std::shared_
42664320
DINGO_LOG(INFO) << fmt::format("[txn][region({})] ResolveLock, ", region->Id()) << "primary key:" << key
42674321
<< ", do rollback without data, start_ts: " << start_ts;
42684322
}
4269-
auto ret =
4270-
DoRollback(raw_engine, raft_engine, ctx, keys_to_rollback_with_data, keys_to_rollback_without_data, start_ts);
4323+
auto ret = DoRollback(raw_engine, raft_engine, ctx, keys_to_rollback_with_data, keys_to_rollback_without_data,
4324+
keys_miss_lock_to_rollback, start_ts);
42714325
if (!ret.ok()) {
42724326
DINGO_LOG(ERROR) << fmt::format("[txn][region({})] ResolveLock, ", region->Id())
42734327
<< ", rollback failed, start_ts: " << start_ts << ", status: " << ret.error_str();

src/engine/txn_engine_helper.h

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,8 @@ class TxnEngineHelper {
158158

159159
static butil::Status DoRollback(RawEnginePtr raw_engine, std::shared_ptr<Engine> raft_engine,
160160
std::shared_ptr<Context> ctx, std::vector<std::string> &keys_to_rollback_with_data,
161-
std::vector<std::string> &keys_to_rollback_without_data, int64_t start_ts);
161+
std::vector<std::string> &keys_to_rollback_without_data,
162+
std::vector<std::string> &keys_miss_lock_to_rollback, int64_t start_ts);
162163

163164
static butil::Status MarkRollBackOnMissingLock(RawEnginePtr raw_engine, std::shared_ptr<Engine> raft_engine,
164165
std::shared_ptr<Context> ctx, std::string primary_key,
@@ -200,15 +201,18 @@ class TxnEngineHelper {
200201
int64_t caller_start_ts, int64_t current_ts, bool force_sync_commit,
201202
bool rollback_if_not_exist);
202203

203-
static butil::Status TxnCheckSecondaryLocks(RawEnginePtr raw_engine, std::shared_ptr<Context> ctx,
204-
store::RegionPtr region, int64_t start_ts,
204+
static butil::Status TxnCheckSecondaryLocks(RawEnginePtr raw_engine, std::shared_ptr<Engine> raft_engine,
205+
std::shared_ptr<Context> ctx, store::RegionPtr region, int64_t start_ts,
205206
const std::vector<std::string> &keys);
206207

207208
static butil::Status ResolveLock(RawEnginePtr raw_engine, std::shared_ptr<Engine> raft_engine,
208209
std::shared_ptr<Context> ctx, int64_t start_ts, int64_t commit_ts,
209210
const std::vector<std::string> &keys, const std::map<int64_t, int64_t> &txn_infos);
210211

211-
static butil::Status BatchResolveLock(RawEnginePtr raw_engine, std::shared_ptr<Engine> raft_engine, std::shared_ptr<Context> ctx, store::RegionPtr region, const std::map<int64_t, int64_t> &txn_infos, pb::store::TxnResultInfo* txn_result);
212+
static butil::Status BatchResolveLock(RawEnginePtr raw_engine, std::shared_ptr<Engine> raft_engine,
213+
std::shared_ptr<Context> ctx, store::RegionPtr region,
214+
const std::map<int64_t, int64_t> &txn_infos,
215+
pb::store::TxnResultInfo *txn_result);
212216

213217
static butil::Status HeartBeat(RawEnginePtr raw_engine, std::shared_ptr<Engine> raft_engine,
214218
std::shared_ptr<Context> ctx, const std::string &primary_lock, int64_t start_ts,

0 commit comments

Comments
 (0)