Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions db_stress_tool/db_stress_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,7 @@ Status StressTest::CommitTxn(Transaction& txn, ThreadState* thread) {
} else {
assert(txn_db_);
s = txn.Prepare();
const bool prepared = s.ok();
std::shared_ptr<const Snapshot> timestamped_snapshot;
if (s.ok()) {
if (thread && FLAGS_create_timestamped_snapshot_one_in &&
Expand Down Expand Up @@ -968,6 +969,47 @@ Status StressTest::CommitTxn(Transaction& txn, ThreadState* thread) {
s = txn.Commit();
}
}
if (prepared && !s.ok()) {
Status rollback_s = txn.Rollback();
int rollback_recovery_retries = 0;
int resume_busy_count = 0;
std::string last_resume_status;
if (!rollback_s.ok() && IsErrorInjectedAndRetryable(rollback_s) &&
db_ != nullptr) {
constexpr int kMaxRollbackAfterRecoveryRetries = 100;
constexpr int kRollbackAfterRecoveryRetryIntervalMicros = 10 * 1000;
for (; rollback_recovery_retries < kMaxRollbackAfterRecoveryRetries &&
!rollback_s.ok();
++rollback_recovery_retries) {
const Status resume_s = db_->Resume();
if (resume_s.ok()) {
last_resume_status = resume_s.ToString();
rollback_s = txn.Rollback();
if (rollback_s.ok() || !IsErrorInjectedAndRetryable(rollback_s)) {
break;
}
} else if (!resume_s.IsBusy()) {
last_resume_status = resume_s.ToString();
break;
} else {
++resume_busy_count;
last_resume_status = resume_s.ToString();
}
clock_->SleepForMicroseconds(
kRollbackAfterRecoveryRetryIntervalMicros);
}
}
if (!rollback_s.ok()) {
fprintf(stderr,
"Rollback after failed prepared transaction commit failed: "
"txn=%s, commit_status=%s, rollback_status=%s, "
"recovery_retries=%d, resume_busy_count=%d, "
"last_resume_status=%s\n",
txn.GetName().c_str(), s.ToString().c_str(),
rollback_s.ToString().c_str(), rollback_recovery_retries,
resume_busy_count, last_resume_status.c_str());
}
}
if (thread && FLAGS_create_timestamped_snapshot_one_in > 0 &&
thread->rand.OneInOpt(50000)) {
uint64_t now = db_stress_env->NowNanos();
Expand Down
16 changes: 15 additions & 1 deletion include/rocksdb/utilities/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,11 @@ class Transaction {
// TransactionOptions.expiration. Status::TxnNotPrepared() may be returned if
// TransactionOptions.skip_prepare is false and Prepare is not called on this
// transaction before Commit.
//
// For TransactionDB transactions using 2PC, if Commit() returns non-OK after
// Prepare() succeeds, the transaction may still need to be resolved. Unless
// the application resolves it another way, call Rollback() before destroying
// the transaction or closing the DB.
virtual Status Commit() = 0;

// In addition to Commit(), also creates a snapshot of the db after all
Expand All @@ -259,7 +264,16 @@ class Transaction {
std::shared_ptr<const Snapshot>* snapshot = nullptr);

// Discard all batched writes in this transaction.
// FIXME: what happens if this isn't called before destruction?
//
// Transactions should normally be completed with Commit() or Rollback()
// before destruction and before closing the DB. Destroying a Transaction
// object releases its in-memory resources, but it is not a substitute for
// resolving a prepared transaction.
//
// Rollback() can return write errors when it needs to write rollback state,
// such as for prepared transactions. If it returns a retryable I/O error,
// recover the DB (for example by calling DB::Resume() when applicable) and
// retry Rollback() before closing the DB.
virtual Status Rollback() = 0;

// Records the state of the transaction for future calls to
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed WritePrepared TransactionDB cleanup after retryable commit or rollback write failures so prepared transactions remain rollbackable instead of leaving unresolved prepared state.
6 changes: 6 additions & 0 deletions utilities/transactions/pessimistic_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,8 @@ Status PessimisticTransaction::Commit() {
if (!s.ok()) {
ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
"Commit write failed");
// Keep the transaction rollbackable after the commit marker write fails.
txn_state_.store(PREPARED);
return s;
}

Expand Down Expand Up @@ -1003,6 +1005,10 @@ Status PessimisticTransaction::Rollback() {
log_number_);
Clear();
txn_state_.store(ROLLEDBACK);
} else {
// Rollback writes can fail under retryable IO errors. Preserve the state
// so callers can retry after error recovery.
txn_state_.store(PREPARED);
}
} else if (txn_state_ == STARTED) {
if (log_number_ > 0) {
Expand Down
56 changes: 56 additions & 0 deletions utilities/transactions/write_prepared_transaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2360,6 +2360,62 @@ TEST_P(WritePreparedTransactionTest, Rollback) {
}
}

TEST_P(WritePreparedTransactionTest, RollbackPreparedAfterCommitWriteFailure) {
// This covers the db_stress failure mode where a transaction is already in
// PreparedHeap and the commit marker write hits a retryable IO error. The
// test injects that commit failure and a retryable rollback failure, then
// verifies rollback can still be retried to remove the prepared entry before
// DB teardown.
options.max_bgerror_resume_count = 0;
ASSERT_OK(ReOpen());
fault_fs->SetFileTypesExcludedFromFaultInjection({FileType::kInfoLogFile});

WriteOptions woptions;
woptions.sync = true;
TransactionOptions txn_options;
std::unique_ptr<Transaction> txn(db->BeginTransaction(woptions, txn_options));
ASSERT_NE(nullptr, txn);
ASSERT_OK(txn->SetName("commit_write_failure"));
ASSERT_OK(txn->Put(Slice("key"), Slice("value")));
ASSERT_OK(txn->Prepare());

fault_fs->SetThreadLocalErrorContext(FaultInjectionIOType::kWrite,
/*seed=*/0, /*one_in=*/1,
/*retryable=*/true,
/*has_data_loss=*/false);
fault_fs->EnableThreadLocalErrorInjection(FaultInjectionIOType::kWrite);
const Status commit_s = txn->Commit();
fault_fs->DisableThreadLocalErrorInjection(FaultInjectionIOType::kWrite);

ASSERT_NOK(commit_s);
ASSERT_TRUE(FaultInjectionTestFS::IsInjectedError(commit_s))
<< commit_s.ToString();
ASSERT_EQ(1, fault_fs->GetAndResetInjectedThreadLocalErrorCount(
FaultInjectionIOType::kWrite));

ASSERT_OK(db->Resume());

fault_fs->SetThreadLocalErrorContext(FaultInjectionIOType::kWrite,
/*seed=*/1, /*one_in=*/1,
/*retryable=*/true,
/*has_data_loss=*/false);
fault_fs->EnableThreadLocalErrorInjection(FaultInjectionIOType::kWrite);
const Status rollback_s = txn->Rollback();
fault_fs->DisableThreadLocalErrorInjection(FaultInjectionIOType::kWrite);

ASSERT_NOK(rollback_s);
ASSERT_TRUE(FaultInjectionTestFS::IsInjectedError(rollback_s))
<< rollback_s.ToString();
ASSERT_EQ(1, fault_fs->GetAndResetInjectedThreadLocalErrorCount(
FaultInjectionIOType::kWrite));

ASSERT_OK(db->Resume());
ASSERT_OK(txn->Rollback());
txn.reset();

ASSERT_OK(ReOpenNoDelete());
}

TEST_P(WritePreparedTransactionTest, DisableGCDuringRecovery) {
// Use large buffer to avoid memtable flush after 1024 insertions
options.write_buffer_size = 1024 * 1024;
Expand Down
Loading