|
18 | 18 | #include <doctest/doctest.h> |
19 | 19 | #undef FAIL |
20 | 20 |
|
| 21 | +#include <condition_variable> |
| 22 | +#include <exception> |
| 23 | +#include <mutex> |
| 24 | +#include <thread> |
| 25 | + |
21 | 26 | using MapT = ccf::kv::Map<size_t, size_t>; |
22 | 27 |
|
23 | 28 | constexpr size_t certificate_validity_period_days = 365; |
@@ -310,6 +315,55 @@ class TestPendingTx : public ccf::kv::PendingTx |
310 | 315 | } |
311 | 316 | }; |
312 | 317 |
|
| 318 | +struct PausedSignatureCommit |
| 319 | +{ |
| 320 | + std::mutex lock; |
| 321 | + std::condition_variable reserved_tx_created_cv; |
| 322 | + std::condition_variable resume_cv; |
| 323 | + bool reserved_tx_created = false; |
| 324 | + bool resume = false; |
| 325 | +}; |
| 326 | + |
| 327 | +class PausedReservedSignatureTx : public ccf::kv::PendingTx |
| 328 | +{ |
| 329 | + ccf::TxID txid; |
| 330 | + ccf::kv::Store& store; |
| 331 | + ccf::Signatures signatures; |
| 332 | + ccf::SerialisedMerkleTree serialised_tree; |
| 333 | + PausedSignatureCommit& paused; |
| 334 | + |
| 335 | +public: |
| 336 | + PausedReservedSignatureTx( |
| 337 | + ccf::TxID txid_, ccf::kv::Store& store_, PausedSignatureCommit& paused_) : |
| 338 | + txid(txid_), |
| 339 | + store(store_), |
| 340 | + signatures(ccf::Tables::SIGNATURES), |
| 341 | + serialised_tree(ccf::Tables::SERIALISED_MERKLE_TREE), |
| 342 | + paused(paused_) |
| 343 | + {} |
| 344 | + |
| 345 | + ccf::kv::PendingTxInfo call() override |
| 346 | + { |
| 347 | + auto tx = store.create_reserved_tx(txid); |
| 348 | + auto sig = tx.rw(signatures); |
| 349 | + auto tree = tx.rw(serialised_tree); |
| 350 | + |
| 351 | + sig->put(ccf::PrimarySignature(ccf::kv::test::PrimaryNodeId, txid.seqno)); |
| 352 | + tree->put({}); |
| 353 | + |
| 354 | + { |
| 355 | + std::lock_guard<std::mutex> guard(paused.lock); |
| 356 | + paused.reserved_tx_created = true; |
| 357 | + } |
| 358 | + paused.reserved_tx_created_cv.notify_one(); |
| 359 | + |
| 360 | + std::unique_lock<std::mutex> guard(paused.lock); |
| 361 | + paused.resume_cv.wait(guard, [this]() { return paused.resume; }); |
| 362 | + |
| 363 | + return tx.commit_reserved(); |
| 364 | + } |
| 365 | +}; |
| 366 | + |
313 | 367 | TEST_CASE( |
314 | 368 | "Batches containing but not ending on a committable transaction should not " |
315 | 369 | "halt replication") |
@@ -457,6 +511,87 @@ TEST_CASE( |
457 | 511 | } |
458 | 512 | } |
459 | 513 |
|
| 514 | +TEST_CASE( |
| 515 | + "Reserved signature tx returns no-replicate if rolled back before " |
| 516 | + "commit_reserved") |
| 517 | +{ |
| 518 | + ccf::kv::Store store; |
| 519 | + auto encryptor = std::make_shared<ccf::kv::NullTxEncryptor>(); |
| 520 | + store.set_encryptor(encryptor); |
| 521 | + auto consensus = std::make_shared<ccf::kv::test::PrimaryStubConsensus>(); |
| 522 | + store.set_consensus(consensus); |
| 523 | + constexpr auto store_term = 2; |
| 524 | + store.initialise_term(store_term); |
| 525 | + |
| 526 | + MapT table("public:table"); |
| 527 | + |
| 528 | + INFO("Commit two normal transactions before emitting a signature"); |
| 529 | + { |
| 530 | + auto tx = store.create_tx(); |
| 531 | + auto* txv = tx.rw(table); |
| 532 | + txv->put(0, 1); |
| 533 | + REQUIRE(tx.commit() == ccf::kv::CommitResult::SUCCESS); |
| 534 | + } |
| 535 | + |
| 536 | + { |
| 537 | + auto tx = store.create_tx(); |
| 538 | + auto* txv = tx.rw(table); |
| 539 | + txv->put(0, 2); |
| 540 | + REQUIRE(tx.commit() == ccf::kv::CommitResult::SUCCESS); |
| 541 | + } |
| 542 | + |
| 543 | + REQUIRE(store.current_version() == 2); |
| 544 | + |
| 545 | + auto txid = store.next_txid(); |
| 546 | + REQUIRE(txid == ccf::TxID(store_term, 3)); |
| 547 | + |
| 548 | + PausedSignatureCommit paused; |
| 549 | + std::optional<ccf::kv::CommitResult> worker_result; |
| 550 | + |
| 551 | + std::thread worker([&]() { |
| 552 | + worker_result = store.commit( |
| 553 | + txid, |
| 554 | + std::make_unique<PausedReservedSignatureTx>(txid, store, paused), |
| 555 | + true); |
| 556 | + }); |
| 557 | + |
| 558 | + { |
| 559 | + std::unique_lock<std::mutex> guard(paused.lock); |
| 560 | + paused.reserved_tx_created_cv.wait( |
| 561 | + guard, [&paused]() { return paused.reserved_tx_created; }); |
| 562 | + } |
| 563 | + |
| 564 | + const auto new_term = store_term + 1; |
| 565 | + |
| 566 | + INFO( |
| 567 | + "Rollback after create_reserved_tx but before commit_reserved in a new " |
| 568 | + "term"); |
| 569 | + store.rollback({store_term, 1}, new_term); |
| 570 | + REQUIRE(store.commit_view() == new_term); |
| 571 | + REQUIRE(store.current_txid() == ccf::TxID(store_term, 1)); |
| 572 | + |
| 573 | + { |
| 574 | + std::lock_guard<std::mutex> guard(paused.lock); |
| 575 | + paused.resume = true; |
| 576 | + } |
| 577 | + paused.resume_cv.notify_one(); |
| 578 | + worker.join(); |
| 579 | + |
| 580 | + REQUIRE(worker_result.has_value()); |
| 581 | + REQUIRE(worker_result.value() == ccf::kv::CommitResult::FAIL_NO_REPLICATE); |
| 582 | + REQUIRE(store.current_txid() == ccf::TxID(store_term, 1)); |
| 583 | + |
| 584 | + INFO("A normal transaction can still commit after the failed signature path"); |
| 585 | + { |
| 586 | + auto tx = store.create_tx(); |
| 587 | + auto* txv = tx.rw(table); |
| 588 | + txv->put(0, 3); |
| 589 | + REQUIRE(tx.commit() == ccf::kv::CommitResult::SUCCESS); |
| 590 | + } |
| 591 | + |
| 592 | + REQUIRE(store.current_txid() == ccf::TxID(new_term, 2)); |
| 593 | +} |
| 594 | + |
460 | 595 | TEST_CASE( |
461 | 596 | "Check that rollback during replicate does not cause replication halts") |
462 | 597 | { |
|
0 commit comments