Skip to content

Commit b2a1256

Browse files
bharathvvbotbuildovich
authored andcommitted
group/tx/stm: limit local snapshot to max_removable_local_log_offset
The group_tx_tracker_stm snapshot could capture open transaction state (begin_offsets/producer_states) at an offset where the corresponding commit batch had not yet been written. If compaction later removed that commit batch — which is allowed once max_removable_offset advances past it — the open transaction could never be resolved on restart, permanently blocking max_removable_offset and preventing further compaction. The sequence: 1. Snapshot taken while a tx is open (fence at F, snapshot offset >= F) 2. Tx commits at offset C, max_removable advances past C 3. Compaction removes the commit batch at C 4. On restart, snapshot loads stale open tx at F, replay cannot find the commit -> max_removable stuck at prev(F) forever Fix: snapshot at max_removable_local_log_offset with an empty transactions map. Since this STM's sole purpose is tracking open transactions for max_removable_local_log_offset, and closed transactions leave no state, all meaningful state can be reconstructed from log replay. Open transactions are re-discovered from fence batches in the log, which are guaranteed to be present since compaction is bounded by max_removable while the STM is live. Also adds a regression test that reproduces the scenario by taking a snapshot during an open tx, committing, compacting, re-persisting the stale snapshot, and restarting. To fix existing setups that have stale snapshots, this commit also bumps supported_local_snapshot_version, this invalidates saved snapshots upon upgrade and applies everything from log and reconstructs the correct snapshots the next time with the newer logic. (cherry picked from commit 45fff0d)
1 parent a9d1ebc commit b2a1256

2 files changed

Lines changed: 26 additions & 4 deletions

File tree

src/v/kafka/server/group_tx_tracker_stm.cc

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -161,10 +161,32 @@ group_tx_tracker_stm::apply_local_snapshot(
161161

162162
ss::future<raft::stm_snapshot>
163163
group_tx_tracker_stm::take_local_snapshot(ssx::semaphore_units apply_units) {
164-
// Copy over the snapshot state for a consistent view.
165-
auto offset = last_applied_offset();
164+
// Snapshot at max_removable_local_log_offset with open transaction state
165+
// stripped. This prevents a bug where:
166+
//
167+
// 1. Snapshot captures an open tx (begin_offset in per_group_state)
168+
// 2. The tx later commits, max_removable advances
169+
// 3. Compaction removes the commit batch from the log
170+
// 4. On restart, the stale open tx is loaded but its commit is gone
171+
// -> max_removable stuck permanently
172+
//
173+
// By definition, all open txs have begin_offset > max_removable, so they
174+
// are all past the snapshot offset. On replay from offset+1, their fence
175+
// batches are guaranteed to still be in the log (compaction is bounded by
176+
// max_removable while the STM is live) and will re-establish the open tx
177+
// state. Group existence (keys in _all_txs) must be preserved because
178+
// maybe_add_tx_begin_offset() ignores fences for unknown groups, and the
179+
// group_metadata batches that created them may be before the snapshot
180+
// offset.
181+
auto offset = max_removable_local_log_offset();
182+
183+
all_txs_t snap_txs;
184+
for (const auto& [gid, _] : _all_txs) {
185+
snap_txs[gid] = per_group_state{};
186+
}
187+
166188
snapshot snap{
167-
.transactions{_all_txs},
189+
.transactions{std::move(snap_txs)},
168190
.blocked_groups{
169191
std::from_range, _group_blocks | std::views::filter([](const auto& e) {
170192
return e.second.is_blocked;

src/v/kafka/server/group_tx_tracker_stm.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ class group_tx_tracker_stm final
136136
}
137137

138138
private:
139-
static constexpr int8_t supported_local_snapshot_version = 1;
139+
static constexpr int8_t supported_local_snapshot_version = 2;
140140
struct snapshot
141141
: serde::envelope<snapshot, serde::version<2>, serde::compat_version<0>> {
142142
all_txs_t transactions;

0 commit comments

Comments
 (0)