Skip to content

Commit a59320d

Browse files
committed
backup: drop stale-generation SQS messages (PR #714, round 5)
Codex P1 round 10 (commit 4290820): HandleMessageData was appending every !sqs|msg|data|... row to the queue's buffer without checking whether the message's QueueGeneration matched the queue's current generation. PurgeQueue and DeleteQueue bump the generation but the affected rows are removed asynchronously by the reaper; a snapshot taken mid-cleanup still carries those stale rows and the encoder would emit them to messages.jsonl, resurrecting purged messages on restore. - Add HandleQueueGen to capture activeGen from !sqs|queue|gen|<encoded> records (decimal-string value, mirrors adapter/sqs_catalog.go's CreateQueue Put). - Track activeGen on sqsQueueState. - flushQueue calls filterStaleGenMessages: drops messages whose QueueGeneration != activeGen and emits an sqs_stale_generation_messages_dropped warning so operators can correlate dump shape with mid-reap snapshot timing. - Safe fallback: activeGen == 0 (no gen record observed) keeps the legacy behaviour, so a backup that lacks the gen record does not silently lose every message. Tests: - TestSQS_StaleGenerationMessagesDropped: live + stale message, asserts only live survives and the warning fires. - TestSQS_StaleGenerationFilterDisabledWithoutGenRecord: no gen record => no filter; preserves the legacy semantics. flushQueue's body is also extracted into writeMessagesJSONL to keep cyclomatic complexity under the project linter's ceiling.
1 parent 1437cdf commit a59320d

2 files changed

Lines changed: 187 additions & 13 deletions

File tree

internal/backup/sqs.go

Lines changed: 97 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"os"
1010
"path/filepath"
1111
"sort"
12+
"strconv"
1213
"strings"
1314
"unicode/utf8"
1415

@@ -108,6 +109,18 @@ type sqsQueueState struct {
108109
name string // decoded queue name; populated on meta arrival
109110
meta *sqsQueueMetaPublic
110111
messages []sqsMessageRecord
112+
// activeGen captures the queue's current generation, parsed
113+
// from a !sqs|queue|gen|<encoded> record. PurgeQueue and
114+
// DeleteQueue bump the generation so the reaper can later
115+
// drop residual !sqs|msg|data|<queue><oldGen><...> rows; if
116+
// the snapshot is taken before reaping completes those stale
117+
// rows are still in the keyspace and must NOT be emitted to
118+
// messages.jsonl. Zero means "no gen record observed yet" and
119+
// disables the filter (the live cluster always writes a gen
120+
// record on CreateQueue, so a missing gen at backup time
121+
// means we have an orphan-only queue and the orphan branch
122+
// already drops messages). Codex P1 round 10.
123+
activeGen uint64
111124
// internalBuf accumulates side records in their on-disk shape if
112125
// includeSideRecords is on. Each line is the encoded prefix +
113126
// hex(rest-of-key) + value (b64) — implementation-grade detail
@@ -254,6 +267,28 @@ func (s *SQSEncoder) HandleQueueMeta(key, value []byte) error {
254267
return nil
255268
}
256269

270+
// HandleQueueGen processes one !sqs|queue|gen|<encoded> record. The
271+
// value is a base-10 decimal string holding the queue's current
272+
// generation (mirrors adapter/sqs_catalog.go's CreateQueue Put: the
273+
// live cluster writes strconv.FormatUint(gen, 10)). Capturing
274+
// activeGen lets flushQueue drop messages tagged with older
275+
// generations — those are residual rows left by PurgeQueue /
276+
// DeleteQueue that the reaper has not yet cleaned, and emitting
277+
// them to messages.jsonl would resurrect purged messages on
278+
// restore. Codex P1 round 10.
279+
func (s *SQSEncoder) HandleQueueGen(key, value []byte) error {
280+
encoded, err := stripPrefixSegment(key, []byte(SQSQueueGenPrefix))
281+
if err != nil {
282+
return err
283+
}
284+
gen, err := strconv.ParseUint(strings.TrimSpace(string(value)), 10, 64) //nolint:mnd // 10 == decimal radix
285+
if err != nil {
286+
return errors.Wrap(ErrSQSMalformedKey, err.Error())
287+
}
288+
s.queueState(encoded).activeGen = gen
289+
return nil
290+
}
291+
257292
// HandleMessageData processes one !sqs|msg|data|<encQueue><gen><encMsgID>
258293
// record. The encoded queue segment is parsed out of the key and used as
259294
// the per-queue routing key; the message is buffered until Finalize so it
@@ -359,19 +394,24 @@ func (s *SQSEncoder) flushQueue(st *sqsQueueState) error {
359394
if err := writeFileAtomic(filepath.Join(dir, "_queue.json"), mustMarshalIndent(st.meta)); err != nil {
360395
return err
361396
}
362-
if len(st.messages) > 0 {
363-
sortMessagesForEmit(st.messages)
364-
jl, err := openJSONL(filepath.Join(dir, "messages.jsonl"))
365-
if err != nil {
366-
return err
367-
}
368-
for i := range st.messages {
369-
if err := jl.enc.Encode(st.messages[i]); err != nil {
370-
_ = closeJSONL(jl)
371-
return errors.WithStack(err)
372-
}
373-
}
374-
if err := closeJSONL(jl); err != nil {
397+
// Drop messages tagged with stale generations: PurgeQueue and
398+
// DeleteQueue bump the queue's gen but the reaper deletes the
399+
// affected !sqs|msg|data| rows asynchronously. A snapshot taken
400+
// mid-reap would otherwise resurrect purged messages on restore.
401+
// activeGen == 0 means we did not see a !sqs|queue|gen| record
402+
// for this queue; preserving the legacy behaviour (no filter)
403+
// is the safe fallback because every CreateQueue writes a gen.
404+
// Codex P1 round 10.
405+
visibleMessages, dropped := filterStaleGenMessages(st.messages, st.activeGen)
406+
if dropped > 0 {
407+
s.emitWarn("sqs_stale_generation_messages_dropped",
408+
"queue", st.name,
409+
"active_gen", st.activeGen,
410+
"dropped", dropped,
411+
"hint", "messages with mismatched queue_generation suppressed; reaper had not finished cleanup at snapshot time")
412+
}
413+
if len(visibleMessages) > 0 {
414+
if err := writeMessagesJSONL(dir, visibleMessages); err != nil {
375415
return err
376416
}
377417
}
@@ -677,6 +717,50 @@ func decodeSQSMessageValue(value []byte) (sqsMessageRecord, error) {
677717
}, nil
678718
}
679719

720+
// writeMessagesJSONL emits the buffered (visible) messages to
721+
// messages.jsonl in send-order. Split out of flushQueue to keep that
722+
// function's cyclomatic complexity under the project's linter ceiling.
723+
func writeMessagesJSONL(dir string, msgs []sqsMessageRecord) error {
724+
sortMessagesForEmit(msgs)
725+
jl, err := openJSONL(filepath.Join(dir, "messages.jsonl"))
726+
if err != nil {
727+
return err
728+
}
729+
for i := range msgs {
730+
if err := jl.enc.Encode(msgs[i]); err != nil {
731+
_ = closeJSONL(jl)
732+
return errors.WithStack(err)
733+
}
734+
}
735+
if err := closeJSONL(jl); err != nil {
736+
return err
737+
}
738+
return nil
739+
}
740+
741+
// filterStaleGenMessages partitions in into (visible, droppedCount).
742+
// A message is visible if activeGen is zero (no gen record observed
743+
// for this queue, which means we cannot make a confident call) OR
744+
// its QueueGeneration matches activeGen. Anything else is residual
745+
// state from a PurgeQueue / DeleteQueue that the reaper has not yet
746+
// removed; emitting it would resurrect purged messages on restore.
747+
// Codex P1 round 10.
748+
func filterStaleGenMessages(in []sqsMessageRecord, activeGen uint64) ([]sqsMessageRecord, int) {
749+
if activeGen == 0 {
750+
return in, 0
751+
}
752+
out := make([]sqsMessageRecord, 0, len(in))
753+
dropped := 0
754+
for _, m := range in {
755+
if m.QueueGeneration == activeGen {
756+
out = append(out, m)
757+
continue
758+
}
759+
dropped++
760+
}
761+
return out, dropped
762+
}
763+
680764
func sortMessagesForEmit(msgs []sqsMessageRecord) {
681765
sort.SliceStable(msgs, func(i, j int) bool {
682766
a, b := msgs[i], msgs[j]

internal/backup/sqs_test.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ package backup
33
import (
44
"bufio"
55
"bytes"
6+
"encoding/base64"
67
"encoding/json"
78
"os"
89
"path/filepath"
10+
"slices"
911
"testing"
1012

1113
"github.com/cockroachdb/errors"
@@ -375,6 +377,94 @@ func TestSQS_ParseMessageDataKey_RejectsEmptyMsgIDSegment(t *testing.T) {
375377
}
376378
}
377379

380+
// TestSQS_StaleGenerationMessagesDropped is the regression for Codex
381+
// P1 round 10: PurgeQueue and DeleteQueue bump the queue's generation
382+
// but the affected !sqs|msg|data|<oldGen>|... rows are removed
383+
// asynchronously by the reaper. A snapshot taken mid-cleanup would
384+
// otherwise resurrect those purged messages on restore. The encoder
385+
// now consults the !sqs|queue|gen| record and drops messages whose
386+
// QueueGeneration does not match the active gen, emitting an
387+
// `sqs_stale_generation_messages_dropped` warning for visibility.
388+
func TestSQS_StaleGenerationMessagesDropped(t *testing.T) {
389+
t.Parallel()
390+
enc, root := newSQSEncoder(t)
391+
var events []string
392+
enc.WithWarnSink(func(event string, _ ...any) { events = append(events, event) })
393+
queue := "q"
394+
if err := enc.HandleQueueMeta(EncodeQueueMetaKey(queue), encodeQueueMetaValue(t, map[string]any{
395+
"name": queue, "visibility_timeout_seconds": 30, "message_retention_seconds": 60,
396+
})); err != nil {
397+
t.Fatal(err)
398+
}
399+
encQueue := base64.RawURLEncoding.EncodeToString([]byte(queue))
400+
genKey := append([]byte(SQSQueueGenPrefix), []byte(encQueue)...)
401+
if err := enc.HandleQueueGen(genKey, []byte("7")); err != nil {
402+
t.Fatal(err)
403+
}
404+
live := encodeMessageValue(t, map[string]any{
405+
"message_id": "live",
406+
"body": []byte("ok"),
407+
"send_timestamp_millis": 100,
408+
"queue_generation": 7,
409+
})
410+
if err := enc.HandleMessageData(EncodeMsgDataKey(queue, 7, "live"), live); err != nil {
411+
t.Fatal(err)
412+
}
413+
stale := encodeMessageValue(t, map[string]any{
414+
"message_id": "ghost",
415+
"body": []byte("from-prev-gen"),
416+
"send_timestamp_millis": 50,
417+
"queue_generation": 6,
418+
})
419+
if err := enc.HandleMessageData(EncodeMsgDataKey(queue, 6, "ghost"), stale); err != nil {
420+
t.Fatal(err)
421+
}
422+
if err := enc.Finalize(); err != nil {
423+
t.Fatal(err)
424+
}
425+
recs := readMessagesJSONL(t, filepath.Join(root, "sqs", queue, "messages.jsonl"))
426+
if len(recs) != 1 {
427+
t.Fatalf("messages = %d want 1 (stale gen must drop)", len(recs))
428+
}
429+
if recs[0]["message_id"] != "live" {
430+
t.Fatalf("survived msg = %v want live", recs[0]["message_id"])
431+
}
432+
if !slices.Contains(events, "sqs_stale_generation_messages_dropped") {
433+
t.Fatalf("expected sqs_stale_generation_messages_dropped warning, got %v", events)
434+
}
435+
}
436+
437+
// TestSQS_StaleGenerationFilterDisabledWithoutGenRecord asserts the
438+
// safe fallback: a queue with no !sqs|queue|gen| record observed
439+
// keeps the legacy behavior (no filter), so a backup that lacks the
440+
// gen record does not silently lose every message.
441+
func TestSQS_StaleGenerationFilterDisabledWithoutGenRecord(t *testing.T) {
442+
t.Parallel()
443+
enc, root := newSQSEncoder(t)
444+
queue := "q"
445+
if err := enc.HandleQueueMeta(EncodeQueueMetaKey(queue), encodeQueueMetaValue(t, map[string]any{
446+
"name": queue, "visibility_timeout_seconds": 30, "message_retention_seconds": 60,
447+
})); err != nil {
448+
t.Fatal(err)
449+
}
450+
val := encodeMessageValue(t, map[string]any{
451+
"message_id": "m1",
452+
"body": []byte("payload"),
453+
"send_timestamp_millis": 1000,
454+
"queue_generation": 99,
455+
})
456+
if err := enc.HandleMessageData(EncodeMsgDataKey(queue, 99, "m1"), val); err != nil {
457+
t.Fatal(err)
458+
}
459+
if err := enc.Finalize(); err != nil {
460+
t.Fatal(err)
461+
}
462+
recs := readMessagesJSONL(t, filepath.Join(root, "sqs", queue, "messages.jsonl"))
463+
if len(recs) != 1 {
464+
t.Fatalf("messages = %d want 1 (no gen record => no filter)", len(recs))
465+
}
466+
}
467+
378468
// TestSQS_MessageBodyEmittedAsTextForUTF8 is the regression for Codex
379469
// P1 round 9 on PR #714: `body` was a `[]byte` field, so json.Encoder
380470
// rendered it as base64 in messages.jsonl. Replaying that JSONL via

0 commit comments

Comments
 (0)