|
9 | 9 | "os" |
10 | 10 | "path/filepath" |
11 | 11 | "sort" |
| 12 | + "strconv" |
12 | 13 | "strings" |
13 | 14 | "unicode/utf8" |
14 | 15 |
|
@@ -108,6 +109,18 @@ type sqsQueueState struct { |
108 | 109 | name string // decoded queue name; populated on meta arrival |
109 | 110 | meta *sqsQueueMetaPublic |
110 | 111 | messages []sqsMessageRecord |
| 112 | + // activeGen captures the queue's current generation, parsed |
| 113 | + // from a !sqs|queue|gen|<encoded> record. PurgeQueue and |
| 114 | + // DeleteQueue bump the generation so the reaper can later |
| 115 | + // drop residual !sqs|msg|data|<queue><oldGen><...> rows; if |
| 116 | + // the snapshot is taken before reaping completes those stale |
| 117 | + // rows are still in the keyspace and must NOT be emitted to |
| 118 | + // messages.jsonl. Zero means "no gen record observed yet" and |
| 119 | + // disables the filter (the live cluster always writes a gen |
| 120 | + // record on CreateQueue, so a missing gen at backup time |
| 121 | + // means we have an orphan-only queue and the orphan branch |
| 122 | + // already drops messages). Codex P1 round 10. |
| 123 | + activeGen uint64 |
111 | 124 | // internalBuf accumulates side records in their on-disk shape if |
112 | 125 | // includeSideRecords is on. Each line is the encoded prefix + |
113 | 126 | // hex(rest-of-key) + value (b64) — implementation-grade detail |
@@ -254,6 +267,28 @@ func (s *SQSEncoder) HandleQueueMeta(key, value []byte) error { |
254 | 267 | return nil |
255 | 268 | } |
256 | 269 |
|
| 270 | +// HandleQueueGen processes one !sqs|queue|gen|<encoded> record. The |
| 271 | +// value is a base-10 decimal string holding the queue's current |
| 272 | +// generation (mirrors adapter/sqs_catalog.go's CreateQueue Put: the |
| 273 | +// live cluster writes strconv.FormatUint(gen, 10)). Capturing |
| 274 | +// activeGen lets flushQueue drop messages tagged with older |
| 275 | +// generations — those are residual rows left by PurgeQueue / |
| 276 | +// DeleteQueue that the reaper has not yet cleaned, and emitting |
| 277 | +// them to messages.jsonl would resurrect purged messages on |
| 278 | +// restore. Codex P1 round 10. |
| 279 | +func (s *SQSEncoder) HandleQueueGen(key, value []byte) error { |
| 280 | + encoded, err := stripPrefixSegment(key, []byte(SQSQueueGenPrefix)) |
| 281 | + if err != nil { |
| 282 | + return err |
| 283 | + } |
| 284 | + gen, err := strconv.ParseUint(strings.TrimSpace(string(value)), 10, 64) //nolint:mnd // 10 == decimal radix |
| 285 | + if err != nil { |
| 286 | + return errors.Wrap(ErrSQSMalformedKey, err.Error()) |
| 287 | + } |
| 288 | + s.queueState(encoded).activeGen = gen |
| 289 | + return nil |
| 290 | +} |
| 291 | + |
257 | 292 | // HandleMessageData processes one !sqs|msg|data|<encQueue><gen><encMsgID> |
258 | 293 | // record. The encoded queue segment is parsed out of the key and used as |
259 | 294 | // the per-queue routing key; the message is buffered until Finalize so it |
@@ -359,19 +394,24 @@ func (s *SQSEncoder) flushQueue(st *sqsQueueState) error { |
359 | 394 | if err := writeFileAtomic(filepath.Join(dir, "_queue.json"), mustMarshalIndent(st.meta)); err != nil { |
360 | 395 | return err |
361 | 396 | } |
362 | | - if len(st.messages) > 0 { |
363 | | - sortMessagesForEmit(st.messages) |
364 | | - jl, err := openJSONL(filepath.Join(dir, "messages.jsonl")) |
365 | | - if err != nil { |
366 | | - return err |
367 | | - } |
368 | | - for i := range st.messages { |
369 | | - if err := jl.enc.Encode(st.messages[i]); err != nil { |
370 | | - _ = closeJSONL(jl) |
371 | | - return errors.WithStack(err) |
372 | | - } |
373 | | - } |
374 | | - if err := closeJSONL(jl); err != nil { |
| 397 | + // Drop messages tagged with stale generations: PurgeQueue and |
| 398 | + // DeleteQueue bump the queue's gen but the reaper deletes the |
| 399 | + // affected !sqs|msg|data| rows asynchronously. A snapshot taken |
| 400 | + // mid-reap would otherwise resurrect purged messages on restore. |
| 401 | + // activeGen == 0 means we did not see a !sqs|queue|gen| record |
| 402 | + // for this queue; preserving the legacy behaviour (no filter) |
| 403 | + // is the safe fallback because every CreateQueue writes a gen. |
| 404 | + // Codex P1 round 10. |
| 405 | + visibleMessages, dropped := filterStaleGenMessages(st.messages, st.activeGen) |
| 406 | + if dropped > 0 { |
| 407 | + s.emitWarn("sqs_stale_generation_messages_dropped", |
| 408 | + "queue", st.name, |
| 409 | + "active_gen", st.activeGen, |
| 410 | + "dropped", dropped, |
| 411 | + "hint", "messages with mismatched queue_generation suppressed; reaper had not finished cleanup at snapshot time") |
| 412 | + } |
| 413 | + if len(visibleMessages) > 0 { |
| 414 | + if err := writeMessagesJSONL(dir, visibleMessages); err != nil { |
375 | 415 | return err |
376 | 416 | } |
377 | 417 | } |
@@ -677,6 +717,50 @@ func decodeSQSMessageValue(value []byte) (sqsMessageRecord, error) { |
677 | 717 | }, nil |
678 | 718 | } |
679 | 719 |
|
| 720 | +// writeMessagesJSONL emits the buffered (visible) messages to |
| 721 | +// messages.jsonl in send-order. Split out of flushQueue to keep that |
| 722 | +// function's cyclomatic complexity under the project's linter ceiling. |
| 723 | +func writeMessagesJSONL(dir string, msgs []sqsMessageRecord) error { |
| 724 | + sortMessagesForEmit(msgs) |
| 725 | + jl, err := openJSONL(filepath.Join(dir, "messages.jsonl")) |
| 726 | + if err != nil { |
| 727 | + return err |
| 728 | + } |
| 729 | + for i := range msgs { |
| 730 | + if err := jl.enc.Encode(msgs[i]); err != nil { |
| 731 | + _ = closeJSONL(jl) |
| 732 | + return errors.WithStack(err) |
| 733 | + } |
| 734 | + } |
| 735 | + if err := closeJSONL(jl); err != nil { |
| 736 | + return err |
| 737 | + } |
| 738 | + return nil |
| 739 | +} |
| 740 | + |
| 741 | +// filterStaleGenMessages partitions in into (visible, droppedCount). |
| 742 | +// A message is visible if activeGen is zero (no gen record observed |
| 743 | +// for this queue, which means we cannot make a confident call) OR |
| 744 | +// its QueueGeneration matches activeGen. Anything else is residual |
| 745 | +// state from a PurgeQueue / DeleteQueue that the reaper has not yet |
| 746 | +// removed; emitting it would resurrect purged messages on restore. |
| 747 | +// Codex P1 round 10. |
| 748 | +func filterStaleGenMessages(in []sqsMessageRecord, activeGen uint64) ([]sqsMessageRecord, int) { |
| 749 | + if activeGen == 0 { |
| 750 | + return in, 0 |
| 751 | + } |
| 752 | + out := make([]sqsMessageRecord, 0, len(in)) |
| 753 | + dropped := 0 |
| 754 | + for _, m := range in { |
| 755 | + if m.QueueGeneration == activeGen { |
| 756 | + out = append(out, m) |
| 757 | + continue |
| 758 | + } |
| 759 | + dropped++ |
| 760 | + } |
| 761 | + return out, dropped |
| 762 | +} |
| 763 | + |
680 | 764 | func sortMessagesForEmit(msgs []sqsMessageRecord) { |
681 | 765 | sort.SliceStable(msgs, func(i, j int) bool { |
682 | 766 | a, b := msgs[i], msgs[j] |
|
0 commit comments