Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions src/v/kafka/server/group_tx_tracker_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,32 @@ group_tx_tracker_stm::apply_local_snapshot(

ss::future<raft::stm_snapshot>
group_tx_tracker_stm::take_local_snapshot(ssx::semaphore_units apply_units) {
// Copy over the snapshot state for a consistent view.
auto offset = last_applied_offset();
// Snapshot at max_removable_local_log_offset with open transaction state
// stripped. This prevents a bug where:
//
// 1. Snapshot captures an open tx (begin_offset in per_group_state)
// 2. The tx later commits, max_removable advances
// 3. Compaction removes the commit batch from the log
// 4. On restart, the stale open tx is loaded but its commit is gone
// -> max_removable stuck permanently
//
// By definition, all open txs have begin_offset > max_removable, so they
// are all past the snapshot offset. On replay from offset+1, their fence
// batches are guaranteed to still be in the log (compaction is bounded by
// max_removable while the STM is live) and will re-establish the open tx
// state. Group existence (keys in _all_txs) must be preserved because
// maybe_add_tx_begin_offset() ignores fences for unknown groups, and the
// group_metadata batches that created them may be before the snapshot
// offset.
auto offset = max_removable_local_log_offset();

all_txs_t snap_txs;
for (const auto& [gid, _] : _all_txs) {
snap_txs[gid] = per_group_state{};
}

snapshot snap{
.transactions{_all_txs},
.transactions{std::move(snap_txs)},
.blocked_groups{
std::from_range, _group_blocks | std::views::filter([](const auto& e) {
return e.second.is_blocked;
Expand Down
2 changes: 1 addition & 1 deletion src/v/kafka/server/group_tx_tracker_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ class group_tx_tracker_stm final
}

private:
static constexpr int8_t supported_local_snapshot_version = 1;
static constexpr int8_t supported_local_snapshot_version = 2;
struct snapshot
: serde::envelope<snapshot, serde::version<2>, serde::compat_version<0>> {
all_txs_t transactions;
Expand Down
3 changes: 2 additions & 1 deletion src/v/kafka/server/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ redpanda_cc_btest(

redpanda_cc_gtest(
name = "group_tx_compaction_test",
timeout = "moderate",
timeout = "short",
Comment thread
bharathv marked this conversation as resolved.
srcs = [
"group_tx_compaction_test.cc",
],
Expand All @@ -747,6 +747,7 @@ redpanda_cc_gtest(
"//src/v/model/tests:random",
"//src/v/raft",
"//src/v/redpanda/tests:fixture",
"//src/v/ssx:semaphore",
"//src/v/storage",
"//src/v/test_utils:gtest",
"//src/v/test_utils:scoped_config",
Expand Down
257 changes: 257 additions & 0 deletions src/v/kafka/server/tests/group_tx_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "model/record_batch_types.h"
#include "model/tests/randoms.h"
#include "redpanda/tests/fixture.h"
#include "ssx/semaphore.h"
#include "storage/types.h"
#include "test_utils/async.h"
#include "test_utils/scoped_config.h"
Expand Down Expand Up @@ -514,3 +515,259 @@ INSTANTIATE_TEST_SUITE_P(
.num_tx_per_group = num_tx_per_group,
.num_rolls = num_rolls,
.tx_workload_type = workload_parameters::mixed}));

// Regression test for a bug where compaction removes a commit batch that was
// written after a local STM snapshot was taken while a transaction was open.
//
// The sequence that triggers the bug:
// 1. Transaction begins (fence at offset F), STM records open tx
// 2. Local snapshot is taken at offset >= F but < commit offset C
// (snapshot captures the open tx state)
// 3. Transaction commits (commit at offset C)
// 4. max_removable_offset advances past C (all txs closed)
// 5. Compaction runs with the advanced max_removable_offset, removing the
// commit batch at C from the log segment
// 6. On restart, STM loads the snapshot (tx at F is open), replays the
// compacted log where the commit at C is gone
// 7. The open tx at F is never resolved -> max_removable_offset stuck at
// prev(F) permanently
//
// The fix limits the local snapshot offset to max_removable_local_log_offset -
// the offset before the earliest open transaction - so the snapshot never
// captures open tx state.
//
// Instead of a full node restart, we simulate recovery by: taking the snapshot,
// compacting, then re-applying the snapshot to the STM and replaying batches
// from the compacted log. This is exactly what the STM does on startup.
TEST_F_CORO(
group_manager_fixture, test_stale_open_tx_after_snapshot_and_compaction) {
auto stm = group_tx_stm();
auto log = consumer_offsets_log();

auto wait_until_stm_apply = [&] {
return tests::cooperative_spin_wait_with_timeout(5s, [&] {
return consumer_offsets_log()->offsets().dirty_offset
== group_tx_stm()->last_applied_offset();
});
};

// Step 1: Join a group so the STM starts tracking it.
kafka::group_id gid{"snapshot-compaction-test"};
{
kafka::join_group_request jreq;
jreq.data = kafka::join_group_request_data{
.group_id = gid,
.session_timeout_ms = 300s,
.member_id = kafka::unknown_member_id,
.protocol_type = kafka::protocol_type{"test"},
.protocols = chunked_vector<kafka::join_group_request_protocol>{
{kafka::protocol_name("test"), bytes()}}};
jreq.ntp = offsets_ntp;
auto jresult = co_await join_group(std::move(jreq));
ASSERT_EQ_CORO(jresult.data.error_code, kafka::error_code::none);

kafka::sync_group_request sreq;
sreq.ntp = offsets_ntp;
sreq.data = kafka::sync_group_request_data{
.group_id = gid,
.generation_id = jresult.data.generation_id,
.member_id = jresult.data.member_id,
};
auto sresult = co_await sync_group(std::move(sreq));
ASSERT_EQ_CORO(sresult.data.error_code, kafka::error_code::none);
}
co_await wait_until_stm_apply();

auto pid = model::producer_identity{42, 0};

// Step 2: Run several committed transactions to build up log data so that
// compaction has material to work with.
for (int i = 0; i < 20; i++) {
auto seq = model::tx_seq{i};
auto bresult = co_await begin_tx(
cluster::begin_group_tx_request{
offsets_ntp, gid, pid, seq, no_timeout, model::partition_id{0}});
ASSERT_EQ_CORO(bresult.ec, cluster::tx::errc::none);

kafka::txn_offset_commit_request oreq;
oreq.ntp = offsets_ntp;
oreq.data.transactional_id = "tx.id";
oreq.data.group_id = gid;
oreq.data.producer_id = kafka::producer_id{pid.id};
oreq.data.producer_epoch = pid.epoch;
kafka::txn_offset_commit_request_topic tdata;
tdata.name = test_ntp.tp.topic;
tdata.partitions.push_back(
{.partition_index = model::partition_id{0},
.committed_offset = model::offset{i}});
oreq.data.topics.push_back(std::move(tdata));
auto oresult = co_await tx_offset_commit(std::move(oreq));
ASSERT_FALSE_CORO(oresult.data.errored());

auto cresult = co_await commit_tx(
cluster::commit_group_tx_request{
offsets_ntp, pid, seq, gid, no_timeout});
ASSERT_EQ_CORO(cresult.ec, cluster::tx::errc::none);
}

co_await wait_until_stm_apply();
// All txs committed - mrlo should be at committed_offset.
ASSERT_EQ_CORO(
stm->max_removable_local_log_offset(), log->offsets().committed_offset);

// Step 3: Roll the log to create a segment boundary, then begin a new
// transaction. The fence batch pins max_removable_offset.
co_await log->force_roll();

auto open_tx_seq = model::tx_seq{20};
{
auto bresult = co_await begin_tx(
cluster::begin_group_tx_request{
offsets_ntp,
gid,
pid,
open_tx_seq,
no_timeout,
model::partition_id{0}});
ASSERT_EQ_CORO(bresult.ec, cluster::tx::errc::none);
}
co_await wait_until_stm_apply();

// mrlo is now pinned at the offset before the open tx's fence.
auto mrlo_during_open_tx = stm->max_removable_local_log_offset();
ASSERT_LT_CORO(mrlo_during_open_tx, log->offsets().committed_offset);

// Step 4: Capture and persist a local snapshot while the tx is open.
// We save the snapshot data so we can re-persist it just before restart —
// the force_roll in step 6 triggers a background snapshot that would
// otherwise overwrite this one with a clean snapshot (all txs committed).
auto stale_snapshot = co_await stm->take_local_snapshot(
ssx::semaphore_units{});
co_await stm->write_local_snapshot();

// Step 5: Commit the transaction and write more data so that mrlo advances
// past the commit offset, allowing compaction to remove it.
{
kafka::txn_offset_commit_request oreq;
oreq.ntp = offsets_ntp;
oreq.data.transactional_id = "tx.id";
oreq.data.group_id = gid;
oreq.data.producer_id = kafka::producer_id{pid.id};
oreq.data.producer_epoch = pid.epoch;
kafka::txn_offset_commit_request_topic tdata;
tdata.name = test_ntp.tp.topic;
tdata.partitions.push_back(
{.partition_index = model::partition_id{0},
.committed_offset = model::offset{20}});
oreq.data.topics.push_back(std::move(tdata));
auto oresult = co_await tx_offset_commit(std::move(oreq));
ASSERT_FALSE_CORO(oresult.data.errored());
}
{
auto cresult = co_await commit_tx(
cluster::commit_group_tx_request{
offsets_ntp, pid, open_tx_seq, gid, no_timeout});
ASSERT_EQ_CORO(cresult.ec, cluster::tx::errc::none);
}

// Run a few more committed transactions so compaction has newer data with
// the same keys, causing the older commit batch to be deduplicated away.
for (int i = 21; i < 40; i++) {
auto seq = model::tx_seq{i};
auto bresult = co_await begin_tx(
cluster::begin_group_tx_request{
offsets_ntp, gid, pid, seq, no_timeout, model::partition_id{0}});
ASSERT_EQ_CORO(bresult.ec, cluster::tx::errc::none);

kafka::txn_offset_commit_request oreq;
oreq.ntp = offsets_ntp;
oreq.data.transactional_id = "tx.id";
oreq.data.group_id = gid;
oreq.data.producer_id = kafka::producer_id{pid.id};
oreq.data.producer_epoch = pid.epoch;
kafka::txn_offset_commit_request_topic tdata;
tdata.name = test_ntp.tp.topic;
tdata.partitions.push_back(
{.partition_index = model::partition_id{0},
.committed_offset = model::offset{i}});
oreq.data.topics.push_back(std::move(tdata));
auto oresult = co_await tx_offset_commit(std::move(oreq));
ASSERT_FALSE_CORO(oresult.data.errored());

auto cresult = co_await commit_tx(
cluster::commit_group_tx_request{
offsets_ntp, pid, seq, gid, no_timeout});
ASSERT_EQ_CORO(cresult.ec, cluster::tx::errc::none);
}

co_await wait_until_stm_apply();
auto mrlo_before_compaction = stm->max_removable_local_log_offset();
ASSERT_EQ_CORO(mrlo_before_compaction, log->offsets().committed_offset);

// Step 6: Roll and compact. Compaction is allowed up to
// max_removable_offset which is now past the commit batch, so the commit
// batch can be removed.
co_await log->flush();
co_await log->force_roll();

ss::abort_source as;
RPTEST_REQUIRE_EVENTUALLY_CORO(30s, [&]() {
auto collect_offset
= log->stm_hookset()->max_removable_local_log_offset();
return log->apply_segment_ms().then([&, collect_offset] {
auto cfg = storage::housekeeping_config::make_config(
model::timestamp::max(),
std::nullopt,
collect_offset,
collect_offset,
collect_offset,
std::nullopt,
std::chrono::milliseconds{0},
std::chrono::milliseconds{0},
as);
return log->housekeeping(std::move(cfg))
.handle_exception_type(
[](const storage::segment_closed_exception&) {})
.then([&] {
// Compact until the old segments are fully compacted
// (2 segments: one compacted + one active).
return log->segment_count() == 2;
});
});
});

// Step 7: Re-persist the snapshot captured in step 4 (while the tx was
// open). This simulates the real-world scenario where the last snapshot on
// disk was taken while a transaction was open — background snapshotting
// during compaction would otherwise overwrite it with a clean one.
co_await stm->apply_local_snapshot(
stale_snapshot.header, stale_snapshot.data.copy());
co_await stm->write_local_snapshot();

// restart() resets them, avoid UAF.
stm = nullptr;
log = nullptr;
// restart() uses blocking .get() calls, so wrap in ss::async.
co_await ss::async([this] { restart(should_wipe::no); });
co_await wait_for_leader(offsets_ntp);
RPTEST_REQUIRE_EVENTUALLY_CORO(10s, [this] {
auto& gm = app._group_manager;
auto [ec, _] = gm.local().list_groups();
return ec == kafka::error_code::none
&& gm.local().attached_partitions_count() == 1;
});

// Refresh references after restart.
stm = group_tx_stm();
log = consumer_offsets_log();
co_await wait_until_stm_apply();

// Step 9: Verify that max_removable_offset is at committed_offset, not
// stuck at the pre-commit value from the stale snapshot.
auto mrlo_after_restart = stm->max_removable_local_log_offset();
ASSERT_EQ_CORO(mrlo_after_restart, log->offsets().committed_offset)
<< "max_removable_offset is stuck at " << mrlo_after_restart
<< " instead of committed_offset " << log->offsets().committed_offset
<< ". This indicates a stale open transaction from the snapshot was not "
"resolved after compaction removed the commit batch.";
}
Loading