Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 52 additions & 14 deletions src/v/cloud_topics/level_zero/stm/tests/ctp_stm_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -818,18 +818,38 @@ TEST_F_CORO(
// This is where the bug manifests: Node 0 has stale in-memory window [11,
// 12]
node0.raft()->unblock_new_leadership();

// Wait for all nodes to catch up before transferring leadership.
// The replication above may have achieved majority without the target
// node, and the transfer will fail if the target hasn't caught up.
co_await wait_for_committed_offset(node1.raft()->committed_offset(), 10s);

vlog(
ct::cd_log.info,
"Transferring leadership back to Node {}",
initial_leader_id);
co_await node1.raft()->transfer_leadership(
raft::transfer_leadership_request{
.group = node1.raft()->group(),
.target = initial_leader_id,
.timeout = 10s});

co_await wait_for_leader(10s);
auto final_leader_id = *get_leader();
// Retry the transfer since it can transiently fail if the target
// node's follower state hasn't been fully updated yet.
auto final_leader_id = model::node_id{};
for (int attempt = 0; attempt < 5; ++attempt) {
co_await node1.raft()->transfer_leadership(
raft::transfer_leadership_request{
.group = node1.raft()->group(),
.target = initial_leader_id,
.timeout = 10s});

co_await wait_for_leader(10s);
final_leader_id = *get_leader();
if (final_leader_id == initial_leader_id) {
break;
}
vlog(
ct::cd_log.info,
"Transfer attempt {} landed on node {}, retrying",
attempt,
final_leader_id);
}
vlog(ct::cd_log.info, "Final leader: {}", final_leader_id);
ASSERT_EQ_CORO(final_leader_id, initial_leader_id)
<< "Leadership should have transferred back to original leader";
Expand Down Expand Up @@ -931,14 +951,32 @@ TEST_F_CORO(

// Step 6: Transfer leadership back to Node0.
node0.raft()->unblock_new_leadership();
co_await node1.raft()->transfer_leadership(
raft::transfer_leadership_request{
.group = node1.raft()->group(),
.target = initial_leader_id,
.timeout = 10s});

co_await wait_for_leader(10s);
ASSERT_EQ_CORO(*get_leader(), initial_leader_id);
// Wait for all nodes to catch up before transferring leadership.
co_await wait_for_committed_offset(node1.raft()->committed_offset(), 10s);

// Retry the transfer since it can transiently fail if the target
// node's follower state hasn't been fully updated yet.
auto final_leader_id = model::node_id{};
for (int attempt = 0; attempt < 5; ++attempt) {
co_await node1.raft()->transfer_leadership(
raft::transfer_leadership_request{
.group = node1.raft()->group(),
.target = initial_leader_id,
.timeout = 10s});

co_await wait_for_leader(10s);
final_leader_id = *get_leader();
if (final_leader_id == initial_leader_id) {
break;
}
vlog(
ct::cd_log.info,
"Transfer attempt {} landed on node {}, retrying",
attempt,
final_leader_id);
}
ASSERT_EQ_CORO(final_leader_id, initial_leader_id);

// Step 7: Try to fence epoch 5 on the returned leader.
// Applied window is now [7, 8] so this must be rejected.
Expand Down
Loading