Skip to content

Commit 10a9442

Browse files
authored
Merge pull request #30080 from vbotbuildovich/backport-pr-30071-v26.1.x-497
[v26.1.x] group/tx/stm: limit local snapshot to max_removable_local_log_offset
2 parents e47ff1b + b2a1256 commit 10a9442

4 files changed

Lines changed: 285 additions & 5 deletions

File tree

src/v/kafka/server/group_tx_tracker_stm.cc

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -162,10 +162,32 @@ group_tx_tracker_stm::apply_local_snapshot(
162162

163163
ss::future<raft::stm_snapshot>
164164
group_tx_tracker_stm::take_local_snapshot(ssx::semaphore_units apply_units) {
165-
// Copy over the snapshot state for a consistent view.
166-
auto offset = last_applied_offset();
165+
// Snapshot at max_removable_local_log_offset with open transaction state
166+
// stripped. This prevents a bug where:
167+
//
168+
// 1. Snapshot captures an open tx (begin_offset in per_group_state)
169+
// 2. The tx later commits, max_removable advances
170+
// 3. Compaction removes the commit batch from the log
171+
// 4. On restart, the stale open tx is loaded but its commit is gone
172+
// -> max_removable stuck permanently
173+
//
174+
// By definition, all open txs have begin_offset > max_removable, so they
175+
// are all past the snapshot offset. On replay from offset+1, their fence
176+
// batches are guaranteed to still be in the log (compaction is bounded by
177+
// max_removable while the STM is live) and will re-establish the open tx
178+
// state. Group existence (keys in _all_txs) must be preserved because
179+
// maybe_add_tx_begin_offset() ignores fences for unknown groups, and the
180+
// group_metadata batches that created them may be before the snapshot
181+
// offset.
182+
auto offset = max_removable_local_log_offset();
183+
184+
all_txs_t snap_txs;
185+
for (const auto& [gid, _] : _all_txs) {
186+
snap_txs[gid] = per_group_state{};
187+
}
188+
167189
snapshot snap{
168-
.transactions{_all_txs},
190+
.transactions{std::move(snap_txs)},
169191
.blocked_groups{
170192
std::from_range, _group_blocks | std::views::filter([](const auto& e) {
171193
return e.second.is_blocked;

src/v/kafka/server/group_tx_tracker_stm.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ class group_tx_tracker_stm final
136136
}
137137

138138
private:
139-
static constexpr int8_t supported_local_snapshot_version = 1;
139+
static constexpr int8_t supported_local_snapshot_version = 2;
140140
struct snapshot
141141
: serde::envelope<snapshot, serde::version<2>, serde::compat_version<0>> {
142142
all_txs_t transactions;

src/v/kafka/server/tests/BUILD

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -735,7 +735,7 @@ redpanda_cc_btest(
735735

736736
redpanda_cc_gtest(
737737
name = "group_tx_compaction_test",
738-
timeout = "moderate",
738+
timeout = "short",
739739
srcs = [
740740
"group_tx_compaction_test.cc",
741741
],
@@ -747,6 +747,7 @@ redpanda_cc_gtest(
747747
"//src/v/model/tests:random",
748748
"//src/v/raft",
749749
"//src/v/redpanda/tests:fixture",
750+
"//src/v/ssx:semaphore",
750751
"//src/v/storage",
751752
"//src/v/test_utils:gtest",
752753
"//src/v/test_utils:scoped_config",

src/v/kafka/server/tests/group_tx_compaction_test.cc

Lines changed: 257 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include "model/record_batch_types.h"
1414
#include "model/tests/randoms.h"
1515
#include "redpanda/tests/fixture.h"
16+
#include "ssx/semaphore.h"
1617
#include "storage/types.h"
1718
#include "test_utils/async.h"
1819
#include "test_utils/scoped_config.h"
@@ -514,3 +515,259 @@ INSTANTIATE_TEST_SUITE_P(
514515
.num_tx_per_group = num_tx_per_group,
515516
.num_rolls = num_rolls,
516517
.tx_workload_type = workload_parameters::mixed}));
518+
519+
// Regression test for a bug where compaction removes a commit batch that was
520+
// written after a local STM snapshot was taken while a transaction was open.
521+
//
522+
// The sequence that triggers the bug:
523+
// 1. Transaction begins (fence at offset F), STM records open tx
524+
// 2. Local snapshot is taken at offset >= F but < commit offset C
525+
// (snapshot captures the open tx state)
526+
// 3. Transaction commits (commit at offset C)
527+
// 4. max_removable_offset advances past C (all txs closed)
528+
// 5. Compaction runs with the advanced max_removable_offset, removing the
529+
// commit batch at C from the log segment
530+
// 6. On restart, STM loads the snapshot (tx at F is open), replays the
531+
// compacted log where the commit at C is gone
532+
// 7. The open tx at F is never resolved -> max_removable_offset stuck at
533+
// prev(F) permanently
534+
//
535+
// The fix limits the local snapshot offset to max_removable_local_log_offset -
536+
// the offset before the earliest open transaction - so the snapshot never
537+
// captures open tx state.
538+
//
539+
// Instead of a full node restart, we simulate recovery by: taking the snapshot,
540+
// compacting, then re-applying the snapshot to the STM and replaying batches
541+
// from the compacted log. This is exactly what the STM does on startup.
542+
TEST_F_CORO(
543+
group_manager_fixture, test_stale_open_tx_after_snapshot_and_compaction) {
544+
auto stm = group_tx_stm();
545+
auto log = consumer_offsets_log();
546+
547+
auto wait_until_stm_apply = [&] {
548+
return tests::cooperative_spin_wait_with_timeout(5s, [&] {
549+
return consumer_offsets_log()->offsets().dirty_offset
550+
== group_tx_stm()->last_applied_offset();
551+
});
552+
};
553+
554+
// Step 1: Join a group so the STM starts tracking it.
555+
kafka::group_id gid{"snapshot-compaction-test"};
556+
{
557+
kafka::join_group_request jreq;
558+
jreq.data = kafka::join_group_request_data{
559+
.group_id = gid,
560+
.session_timeout_ms = 300s,
561+
.member_id = kafka::unknown_member_id,
562+
.protocol_type = kafka::protocol_type{"test"},
563+
.protocols = chunked_vector<kafka::join_group_request_protocol>{
564+
{kafka::protocol_name("test"), bytes()}}};
565+
jreq.ntp = offsets_ntp;
566+
auto jresult = co_await join_group(std::move(jreq));
567+
ASSERT_EQ_CORO(jresult.data.error_code, kafka::error_code::none);
568+
569+
kafka::sync_group_request sreq;
570+
sreq.ntp = offsets_ntp;
571+
sreq.data = kafka::sync_group_request_data{
572+
.group_id = gid,
573+
.generation_id = jresult.data.generation_id,
574+
.member_id = jresult.data.member_id,
575+
};
576+
auto sresult = co_await sync_group(std::move(sreq));
577+
ASSERT_EQ_CORO(sresult.data.error_code, kafka::error_code::none);
578+
}
579+
co_await wait_until_stm_apply();
580+
581+
auto pid = model::producer_identity{42, 0};
582+
583+
// Step 2: Run several committed transactions to build up log data so that
584+
// compaction has material to work with.
585+
for (int i = 0; i < 20; i++) {
586+
auto seq = model::tx_seq{i};
587+
auto bresult = co_await begin_tx(
588+
cluster::begin_group_tx_request{
589+
offsets_ntp, gid, pid, seq, no_timeout, model::partition_id{0}});
590+
ASSERT_EQ_CORO(bresult.ec, cluster::tx::errc::none);
591+
592+
kafka::txn_offset_commit_request oreq;
593+
oreq.ntp = offsets_ntp;
594+
oreq.data.transactional_id = "tx.id";
595+
oreq.data.group_id = gid;
596+
oreq.data.producer_id = kafka::producer_id{pid.id};
597+
oreq.data.producer_epoch = pid.epoch;
598+
kafka::txn_offset_commit_request_topic tdata;
599+
tdata.name = test_ntp.tp.topic;
600+
tdata.partitions.push_back(
601+
{.partition_index = model::partition_id{0},
602+
.committed_offset = model::offset{i}});
603+
oreq.data.topics.push_back(std::move(tdata));
604+
auto oresult = co_await tx_offset_commit(std::move(oreq));
605+
ASSERT_FALSE_CORO(oresult.data.errored());
606+
607+
auto cresult = co_await commit_tx(
608+
cluster::commit_group_tx_request{
609+
offsets_ntp, pid, seq, gid, no_timeout});
610+
ASSERT_EQ_CORO(cresult.ec, cluster::tx::errc::none);
611+
}
612+
613+
co_await wait_until_stm_apply();
614+
// All txs committed - mrlo should be at committed_offset.
615+
ASSERT_EQ_CORO(
616+
stm->max_removable_local_log_offset(), log->offsets().committed_offset);
617+
618+
// Step 3: Roll the log to create a segment boundary, then begin a new
619+
// transaction. The fence batch pins max_removable_offset.
620+
co_await log->force_roll();
621+
622+
auto open_tx_seq = model::tx_seq{20};
623+
{
624+
auto bresult = co_await begin_tx(
625+
cluster::begin_group_tx_request{
626+
offsets_ntp,
627+
gid,
628+
pid,
629+
open_tx_seq,
630+
no_timeout,
631+
model::partition_id{0}});
632+
ASSERT_EQ_CORO(bresult.ec, cluster::tx::errc::none);
633+
}
634+
co_await wait_until_stm_apply();
635+
636+
// mrlo is now pinned at the offset before the open tx's fence.
637+
auto mrlo_during_open_tx = stm->max_removable_local_log_offset();
638+
ASSERT_LT_CORO(mrlo_during_open_tx, log->offsets().committed_offset);
639+
640+
// Step 4: Capture and persist a local snapshot while the tx is open.
641+
// We save the snapshot data so we can re-persist it just before restart —
642+
// the force_roll in step 6 triggers a background snapshot that would
643+
// otherwise overwrite this one with a clean snapshot (all txs committed).
644+
auto stale_snapshot = co_await stm->take_local_snapshot(
645+
ssx::semaphore_units{});
646+
co_await stm->write_local_snapshot();
647+
648+
// Step 5: Commit the transaction and write more data so that mrlo advances
649+
// past the commit offset, allowing compaction to remove it.
650+
{
651+
kafka::txn_offset_commit_request oreq;
652+
oreq.ntp = offsets_ntp;
653+
oreq.data.transactional_id = "tx.id";
654+
oreq.data.group_id = gid;
655+
oreq.data.producer_id = kafka::producer_id{pid.id};
656+
oreq.data.producer_epoch = pid.epoch;
657+
kafka::txn_offset_commit_request_topic tdata;
658+
tdata.name = test_ntp.tp.topic;
659+
tdata.partitions.push_back(
660+
{.partition_index = model::partition_id{0},
661+
.committed_offset = model::offset{20}});
662+
oreq.data.topics.push_back(std::move(tdata));
663+
auto oresult = co_await tx_offset_commit(std::move(oreq));
664+
ASSERT_FALSE_CORO(oresult.data.errored());
665+
}
666+
{
667+
auto cresult = co_await commit_tx(
668+
cluster::commit_group_tx_request{
669+
offsets_ntp, pid, open_tx_seq, gid, no_timeout});
670+
ASSERT_EQ_CORO(cresult.ec, cluster::tx::errc::none);
671+
}
672+
673+
// Run a few more committed transactions so compaction has newer data with
674+
// the same keys, causing the older commit batch to be deduplicated away.
675+
for (int i = 21; i < 40; i++) {
676+
auto seq = model::tx_seq{i};
677+
auto bresult = co_await begin_tx(
678+
cluster::begin_group_tx_request{
679+
offsets_ntp, gid, pid, seq, no_timeout, model::partition_id{0}});
680+
ASSERT_EQ_CORO(bresult.ec, cluster::tx::errc::none);
681+
682+
kafka::txn_offset_commit_request oreq;
683+
oreq.ntp = offsets_ntp;
684+
oreq.data.transactional_id = "tx.id";
685+
oreq.data.group_id = gid;
686+
oreq.data.producer_id = kafka::producer_id{pid.id};
687+
oreq.data.producer_epoch = pid.epoch;
688+
kafka::txn_offset_commit_request_topic tdata;
689+
tdata.name = test_ntp.tp.topic;
690+
tdata.partitions.push_back(
691+
{.partition_index = model::partition_id{0},
692+
.committed_offset = model::offset{i}});
693+
oreq.data.topics.push_back(std::move(tdata));
694+
auto oresult = co_await tx_offset_commit(std::move(oreq));
695+
ASSERT_FALSE_CORO(oresult.data.errored());
696+
697+
auto cresult = co_await commit_tx(
698+
cluster::commit_group_tx_request{
699+
offsets_ntp, pid, seq, gid, no_timeout});
700+
ASSERT_EQ_CORO(cresult.ec, cluster::tx::errc::none);
701+
}
702+
703+
co_await wait_until_stm_apply();
704+
auto mrlo_before_compaction = stm->max_removable_local_log_offset();
705+
ASSERT_EQ_CORO(mrlo_before_compaction, log->offsets().committed_offset);
706+
707+
// Step 6: Roll and compact. Compaction is allowed up to
708+
// max_removable_offset which is now past the commit batch, so the commit
709+
// batch can be removed.
710+
co_await log->flush();
711+
co_await log->force_roll();
712+
713+
ss::abort_source as;
714+
RPTEST_REQUIRE_EVENTUALLY_CORO(30s, [&]() {
715+
auto collect_offset
716+
= log->stm_hookset()->max_removable_local_log_offset();
717+
return log->apply_segment_ms().then([&, collect_offset] {
718+
auto cfg = storage::housekeeping_config::make_config(
719+
model::timestamp::max(),
720+
std::nullopt,
721+
collect_offset,
722+
collect_offset,
723+
collect_offset,
724+
std::nullopt,
725+
std::chrono::milliseconds{0},
726+
std::chrono::milliseconds{0},
727+
as);
728+
return log->housekeeping(std::move(cfg))
729+
.handle_exception_type(
730+
[](const storage::segment_closed_exception&) {})
731+
.then([&] {
732+
// Compact until the old segments are fully compacted
733+
// (2 segments: one compacted + one active).
734+
return log->segment_count() == 2;
735+
});
736+
});
737+
});
738+
739+
// Step 7: Re-persist the snapshot captured in step 4 (while the tx was
740+
// open). This simulates the real-world scenario where the last snapshot on
741+
// disk was taken while a transaction was open — background snapshotting
742+
// during compaction would otherwise overwrite it with a clean one.
743+
co_await stm->apply_local_snapshot(
744+
stale_snapshot.header, stale_snapshot.data.copy());
745+
co_await stm->write_local_snapshot();
746+
747+
// restart() resets them, avoid UAF.
748+
stm = nullptr;
749+
log = nullptr;
750+
// restart() uses blocking .get() calls, so wrap in ss::async.
751+
co_await ss::async([this] { restart(should_wipe::no); });
752+
co_await wait_for_leader(offsets_ntp);
753+
RPTEST_REQUIRE_EVENTUALLY_CORO(10s, [this] {
754+
auto& gm = app._group_manager;
755+
auto [ec, _] = gm.local().list_groups();
756+
return ec == kafka::error_code::none
757+
&& gm.local().attached_partitions_count() == 1;
758+
});
759+
760+
// Refresh references after restart.
761+
stm = group_tx_stm();
762+
log = consumer_offsets_log();
763+
co_await wait_until_stm_apply();
764+
765+
// Step 9: Verify that max_removable_offset is at committed_offset, not
766+
// stuck at the pre-commit value from the stale snapshot.
767+
auto mrlo_after_restart = stm->max_removable_local_log_offset();
768+
ASSERT_EQ_CORO(mrlo_after_restart, log->offsets().committed_offset)
769+
<< "max_removable_offset is stuck at " << mrlo_after_restart
770+
<< " instead of committed_offset " << log->offsets().committed_offset
771+
<< ". This indicates a stale open transaction from the snapshot was not "
772+
"resolved after compaction removed the commit batch.";
773+
}

0 commit comments

Comments
 (0)