Skip to content

Commit c32e57b

Browse files
committed
group/tx/stm: limit local snapshot to max_removable_local_log_offset
The group_tx_tracker_stm snapshot could capture open transaction state (begin_offsets/producer_states) at an offset where the corresponding commit batch had not yet been written. If compaction later removed that commit batch — which is allowed once max_removable_offset advances past it — the open transaction could never be resolved on restart, permanently blocking max_removable_offset and preventing further compaction. The sequence: 1. Snapshot taken while a tx is open (fence at F, snapshot offset >= F) 2. Tx commits at offset C, max_removable advances past C 3. Compaction removes the commit batch at C 4. On restart, snapshot loads stale open tx at F, replay cannot find the commit -> max_removable stuck at prev(F) forever Fix: snapshot at max_removable_local_log_offset with an empty transactions map. Since this STM's sole purpose is tracking open transactions for max_removable_local_log_offset, and closed transactions leave no state, all meaningful state can be reconstructed from log replay. Open transactions are re-discovered from fence batches in the log, which are guaranteed to be present since compaction is bounded by max_removable while the STM is live. Also adds a regression test that reproduces the scenario by taking a snapshot during an open tx, committing, compacting, re-persisting the stale snapshot, and restarting. To fix existing setups that have stale snapshots, this commit also bumps supported_local_snapshot_version, this invalidates saved snapshots upon upgrade and applies everything from log and reconstructs the correct snapshots the next time with the newer logic.
1 parent 784303b commit c32e57b

2 files changed

Lines changed: 22 additions & 4 deletions

File tree

src/v/kafka/server/group_tx_tracker_stm.cc

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -161,10 +161,28 @@ group_tx_tracker_stm::apply_local_snapshot(
161161

162162
ss::future<raft::stm_snapshot>
163163
group_tx_tracker_stm::take_local_snapshot(ssx::semaphore_units apply_units) {
164-
// Copy over the snapshot state for a consistent view.
165-
auto offset = last_applied_offset();
164+
// Snapshot at max_removable_local_log_offset and exclude open transaction
165+
// state. This STM's sole purpose is tracking open transactions for
166+
// max_removable_local_log_offset; closed transactions leave no state.
167+
//
168+
// By definition, all open txs have begin_offset > max_removable, so they
169+
// are all past the snapshot offset. Including them risks a bug where:
170+
//
171+
// 1. Snapshot captures an open tx (begin_offset in per_group_state)
172+
// 2. The tx later commits, max_removable advances
173+
// 3. Compaction removes the commit batch from the log
174+
// 4. On restart, the stale open tx is loaded but its commit is gone
175+
// -> max_removable stuck permanently
176+
//
177+
// All state (group existence, open txs) is reconstructed from log replay.
178+
// We snapshot an empty transactions map; on replay from max_removable+1,
179+
// group metadata batches re-create groups and fence batches re-establish
180+
// open transactions. Their batches are guaranteed to be in the log since
181+
// compaction is bounded by max_removable while the STM is live.
182+
auto offset = max_removable_local_log_offset();
183+
166184
snapshot snap{
167-
.transactions{_all_txs},
185+
.transactions{},
168186
.blocked_groups{
169187
std::from_range, _group_blocks | std::views::filter([](const auto& e) {
170188
return e.second.is_blocked;

src/v/kafka/server/group_tx_tracker_stm.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ class group_tx_tracker_stm final
136136
}
137137

138138
private:
139-
static constexpr int8_t supported_local_snapshot_version = 1;
139+
static constexpr int8_t supported_local_snapshot_version = 2;
140140
struct snapshot
141141
: serde::envelope<snapshot, serde::version<2>, serde::compat_version<0>> {
142142
all_txs_t transactions;

0 commit comments

Comments
 (0)