Skip to content

Commit 19d33a6

Browse files
committed
Merge remote-tracking branch 'origin/feat/backup-phase0a-sqs' into feat/backup-phase0a-s3
2 parents 0f390b8 + 1621ede commit 19d33a6

2 files changed

Lines changed: 94 additions & 8 deletions

File tree

internal/backup/sqs.go

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -252,18 +252,23 @@ func (s *SQSEncoder) HandleSideRecord(prefix string, key, value []byte) error {
252252
}
253253

254254
// Finalize flushes every queue's _queue.json and messages.jsonl. Queues
255-
// with buffered messages but no meta record (orphans) emit a warning and
256-
// are skipped — restoring orphan messages without a queue config would
257-
// silently create a queue with default settings, which is rarely what
258-
// the operator wants.
255+
// with buffered messages but no meta record (orphans) emit a warning
256+
// and have their messages dropped — restoring orphan messages without
257+
// a queue config would silently create a queue with default settings,
258+
// which is rarely what the operator wants. However, if
259+
// --include-sqs-side-records is on and this orphan queue has buffered
260+
// side records (vis/byage/dedup/group/tombstone), those are still
261+
// flushed under the encoded-prefix directory: the most common reason
262+
// for a missing meta is a DeleteQueue that left tombstones, and
263+
// dropping exactly those records is the opposite of what the operator
264+
// asked for. Codex P2 round 8.
259265
func (s *SQSEncoder) Finalize() error {
260266
var firstErr error
261267
for _, st := range s.queues {
262268
if st.meta == nil {
263-
s.emitWarn("sqs_orphan_messages",
264-
"encoded_queue", st.encoded,
265-
"buffered_messages", len(st.messages),
266-
"hint", "no !sqs|queue|meta record matched this encoded prefix; messages dropped from the dump")
269+
if err := s.flushOrphanQueueSideRecords(st); err != nil && firstErr == nil {
270+
firstErr = err
271+
}
267272
continue
268273
}
269274
if err := s.flushQueue(st); err != nil && firstErr == nil {
@@ -273,6 +278,30 @@ func (s *SQSEncoder) Finalize() error {
273278
return firstErr
274279
}
275280

281+
// flushOrphanQueueSideRecords emits buffered side records for a queue
282+
// whose !sqs|queue|meta row never arrived. Without this branch,
283+
// --include-sqs-side-records silently drops the post-DeleteQueue
284+
// tombstones and dedup-window history operators most often opt in
285+
// for. The orphan dir is named by the encoded prefix because no
286+
// decoded queue name is available; restore tools can join it with
287+
// the messages-dropped warning to reconstruct context.
288+
func (s *SQSEncoder) flushOrphanQueueSideRecords(st *sqsQueueState) error {
289+
s.emitWarn("sqs_orphan_messages",
290+
"encoded_queue", st.encoded,
291+
"buffered_messages", len(st.messages),
292+
"buffered_side_records", len(st.internalBuf),
293+
"hint", "no !sqs|queue|meta record matched this encoded prefix; messages dropped from the dump")
294+
if !s.includeSideRecords || len(st.internalBuf) == 0 {
295+
return nil
296+
}
297+
// Use the encoded prefix as the directory name — it's the only
298+
// stable identifier available when meta is missing. Suffix it
299+
// with `.orphan` so a restore tool cannot mistake it for a real
300+
// queue dir produced from a successful meta flush.
301+
dir := filepath.Join(s.outRoot, "sqs", st.encoded+".orphan")
302+
return s.flushInternals(dir, st.internalBuf)
303+
}
304+
276305
func (s *SQSEncoder) flushQueue(st *sqsQueueState) error {
277306
dir := filepath.Join(s.outRoot, "sqs", EncodeSegment([]byte(st.name)))
278307
if err := os.MkdirAll(dir, 0o755); err != nil { //nolint:mnd // 0755 == standard dir mode

internal/backup/sqs_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,63 @@ func TestSQS_SideRecordsFlushedEvenWhenZeroMessages(t *testing.T) {
410410
}
411411
}
412412

413+
// TestSQS_OrphanQueueSideRecordsPreserved is the regression for Codex
414+
// P2 round 8: when a queue's !sqs|queue|meta record is missing (e.g.
415+
// after DeleteQueue left tombstones but the meta row was removed) and
416+
// --include-sqs-side-records is on, side records were silently dropped
417+
// alongside any orphan messages. The opt-in contract is the opposite:
418+
// side records exist precisely so deletion-era state is recoverable.
419+
// Now those records flush to a `<encoded>.orphan` directory while the
420+
// orphan-messages warning fires.
421+
func TestSQS_OrphanQueueSideRecordsPreserved(t *testing.T) {
422+
t.Parallel()
423+
enc, root := newSQSEncoder(t)
424+
enc.WithIncludeSideRecords(true)
425+
var events []string
426+
enc.WithWarnSink(func(event string, _ ...any) { events = append(events, event) })
427+
// Side record arrives without a meta row first (deletion-era).
428+
encQueue := "ZGVsZXRlZA" // base64url("deleted")
429+
visKey := append([]byte(SQSMsgVisPrefix), []byte(encQueue)...)
430+
visKey = append(visKey, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01)
431+
if err := enc.HandleSideRecord(SQSMsgVisPrefix, visKey, []byte("vis-record")); err != nil {
432+
t.Fatal(err)
433+
}
434+
if err := enc.Finalize(); err != nil {
435+
t.Fatal(err)
436+
}
437+
if len(events) == 0 || events[0] != "sqs_orphan_messages" {
438+
t.Fatalf("expected sqs_orphan_messages warning, got %v", events)
439+
}
440+
want := filepath.Join(root, "sqs", encQueue+".orphan", "_internals", "side_records.jsonl")
441+
if _, err := os.Stat(want); err != nil {
442+
t.Fatalf("expected orphan side-records file at %s: %v", want, err)
443+
}
444+
}
445+
446+
// TestSQS_OrphanQueueSideRecordsSuppressedWhenOptOut asserts that the
447+
// orphan-side-records branch is gated on --include-sqs-side-records:
448+
// without the flag, the warning still fires but no .orphan dir is
449+
// created (consistent with the default-off contract for side records).
450+
func TestSQS_OrphanQueueSideRecordsSuppressedWhenOptOut(t *testing.T) {
451+
t.Parallel()
452+
enc, root := newSQSEncoder(t)
453+
// includeSideRecords is off by default — HandleSideRecord drops
454+
// the record at intake, so the buffer is empty by Finalize. We
455+
// exercise the path anyway to confirm no .orphan dir is created.
456+
encQueue := "b3B0LW91dA" // base64url("opt-out")
457+
visKey := append([]byte(SQSMsgVisPrefix), []byte(encQueue)...)
458+
visKey = append(visKey, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02)
459+
if err := enc.HandleSideRecord(SQSMsgVisPrefix, visKey, []byte("vis")); err != nil {
460+
t.Fatal(err)
461+
}
462+
if err := enc.Finalize(); err != nil {
463+
t.Fatal(err)
464+
}
465+
if _, err := os.Stat(filepath.Join(root, "sqs", encQueue+".orphan")); !os.IsNotExist(err) {
466+
t.Fatalf("orphan dir created without --include-sqs-side-records: stat err=%v", err)
467+
}
468+
}
469+
413470
func TestSQS_DefaultDoesNotEmitInternals(t *testing.T) {
414471
t.Parallel()
415472
enc, root := newSQSEncoder(t)

0 commit comments

Comments
 (0)