Skip to content

Commit 3f21f73

Browse files
authored
backup: Phase 0b M5-2 - SQS side-record derivation (vis/byage/dedup) (#892)
## Summary Phase 0b M5-2 implementation per the merged design doc [`docs/design/2026_05_30_proposed_sqs_side_record_derivation.md`](https://github.com/bootjp/elastickv/blob/main/docs/design/2026_05_30_proposed_sqs_side_record_derivation.md) (#885). Resolves the SQS side-record decision gate the M5-1 encoder (#846) explicitly deferred. Without this PR, restoring a backup silently breaks: FIFO dedup gating (the next send after restore replays an already-acked message), the by-age reaper (retention is silently ignored, queues grow unboundedly), and visibility (ReceiveMessage finds nothing). ## What lands The encoder now emits three derived side-record families inline during M5-1's per-message walk: | Family | Always? | Value | Why required | |---|---|---|---| | `!sqs\|msg\|vis\|...` | yes | `[]byte(messageID)` | ReceiveMessage scans this prefix | | `!sqs\|msg\|byage\|...` | yes | `[]byte(messageID)` | reaper honors `MessageRetentionPeriod` | | `!sqs\|msg\|dedup\|...` | FIFO + non-empty `MessageDedupID` | `sqsFifoDedupRecord` JSON (4 fields) | gates FIFO sends within 5-minute window | `!sqs|msg|group|` rows are **never** emitted: the live `loadFifoGroupLock` treats key presence alone as "lock held" with no graceful empty-owner decode, so any value (even zeroed) would permanently block every group post-restore (gemini critical #885). Test `TestSQSEncodeSideRecordsNoGroupRows` pins this. Scope is classic queues only (`partition_count = 1`). M5-1's existing `ErrSQSEncodeUnsupportedPartitioned` gate at `encodeQueue` means the side-record walk is never reached for partitioned queues. Partitioned-FIFO support requires coordinated decoder + encoder lift (deferred to M5-3 per the design doc's "Deferred (partitioned-FIFO)" section). ## Test plan `go test -race -count=1 ./internal/backup/` — all green, including: - `TestSQSEncodeSideRecordsCrossCheckClassic` — byte-identical key + value cross-check against the live adapter constructors (`sqsMsgVisKey`, `sqsMsgByAgeKey`, `sqsMsgDedupKey`, `sqsFifoDedupRecord`) for a 2-message FIFO fixture. The gold-standard pattern the parent encoder design (§"Encoder cross-check") mandates. - `TestSQSEncodeSideRecordsStandardQueueOmitsFIFOFamilies` — non-FIFO queue: vis + byage emitted, dedup NOT. - `TestSQSEncodeSideRecordsEmptyDedupOmitsDedupRow` — FIFO + empty `message_deduplication_id`: dedup row NOT emitted. - `TestSQSEncodeSideRecordsNoGroupRows` — FIFO queue across 3 distinct `message_group_id` values: 0 `!sqs|msg|group|` rows emitted (regression pin for gemini critical #885). The two restore round-trip tests listed in the design doc test plan (`TestSQSEncodeFifoRestoreRoundTrip`, `TestSQSEncodeReaperFindsRestoredMessage`) are deferred to the M6 CLI / Jepsen integration layer rather than this package — they require single-node cluster boot that is heavier infra than this package's existing pattern. The cross-check + three negative tests above pin the contract M5-2 is responsible for at the encoder layer; cluster-level restore validation belongs at the integration level. `make lint` clean (verified via golangci-lint --fix on the touched files). ## Self-review (5 passes) 1. **Data loss** — Full reconstruction eliminates the silent-redeliver / invisible-message / retention-leak classes M5-1-only restores would produce. `addSideRecords` surfaces every `b.Add` error and is wired to the existing per-message walk's error path; no swallowed errors. 2. **Concurrency / distributed failures** — Pure offline derivation. No Raft, no HLC, no locks; restored cluster's first sends/receives go through the normal OCC path against the restored state. 3. **Performance** — One `b.Add` per emitted side row, no extra disk reads (records already loaded by M5-1). Snapshot grows ~2-3× message count, all small fixed-width keys. `addMessage`'s existing loop body inflates by ~5 lines, no extra iterations. 4. **Data consistency** — Group-row prohibition pinned by `TestSQSEncodeSideRecordsNoGroupRows`. Dedup gating on `(FIFO && non-empty MessageDedupID)` pinned by two tests. `visibleAt = available_at_millis` matches the parent design's "SQS: vis zeroed by default" contract. Cross-check test validates byte-identical output against the live writer for a shared fixture. 5. **Test coverage** — One cross-check (classic; partitioned variant deferred to M5-3) plus three negative tests cover the three conditional-emission rules the design enumerates. Round-trip tests deferred to M6/Jepsen as discussed above. ## Risk Low. The encoder is offline; restoration is non-destructive (new keyspace on a fresh cluster). The change is additive to M5-1's existing per-message walk and gated on conditions verified by the negative tests; existing M5-1 round-trip tests continue to pass (verified locally). The single semantic-changing behavior is "M5-1-only restores silently break FIFO/reaper/receive" → "restore now works"; no caller of M5-1 depends on the prior broken state. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **New Features** * SQS message backups now include preservation of visibility metadata, age-based attributes, and FIFO deduplication records for complete message restoration. * **Tests** * Added comprehensive test coverage validating SQS side record encoding across standard and FIFO queue scenarios. <!-- review_stack_entry_start --> [![Review Change Stack](https://storage.googleapis.com/coderabbit_public_assets/review-stack-in-coderabbit-ui.svg)](https://app.coderabbit.ai/change-stack/bootjp/elastickv/pull/892?utm_source=github_walkthrough&utm_medium=github&utm_campaign=change_stack) <!-- review_stack_entry_end --> <!-- end of auto-generated comment: release notes by coderabbit.ai -->
2 parents d298cbb + 3f9be73 commit 3f21f73

3 files changed

Lines changed: 439 additions & 0 deletions

File tree

internal/backup/encode_sqs.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,9 @@ func (e *SQSRecordEncoder) encodeQueueMessages(b *snapshotBuilder, root *os.Root
211211
if err != nil {
212212
return err
213213
}
214+
if err := e.addSideRecords(b, meta.Name, &meta, &records[i]); err != nil {
215+
return err
216+
}
214217
if seq > maxSeq {
215218
maxSeq = seq
216219
}

internal/backup/encode_sqs_side.go

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
package backup
2+
3+
import (
4+
"crypto/sha256"
5+
"encoding/binary"
6+
"encoding/hex"
7+
"encoding/json"
8+
9+
"github.com/cockroachdb/errors"
10+
)
11+
12+
// sqsFifoDedupWindowMillis mirrors adapter/sqs_fifo.go (5 minutes). Restored
13+
// dedup rows that fall outside this window from the backup's send timestamp
14+
// will be expired the next time the live FIFO send path inspects them.
15+
const sqsFifoDedupWindowMillis int64 = 5 * 60 * 1000
16+
17+
// sqsSideKeyAllocBytes mirrors the live adapter's sqsKeyCapLarge tuning
18+
// (adapter/sqs_messages.go:68): a 64-byte tail after the prefix is large
19+
// enough to hold the BE-u64 generation + visibleAt + base64url(messageID)
20+
// for typical queue / message IDs without forcing a re-allocation.
21+
const sqsSideKeyAllocBytes = 64
22+
23+
// sqsFifoDedupRecord mirrors the live struct at adapter/sqs_fifo.go:25.
24+
// Duplicated here (rather than imported) so the encoder package can run
25+
// without a circular dependency on adapter, matching the pattern M3b-3
26+
// used for DynamoDB GSI helpers.
27+
type sqsFifoDedupRecord struct {
28+
MessageID string `json:"message_id"`
29+
SendTimestampMs int64 `json:"send_timestamp_ms"`
30+
ExpiresAtMillis int64 `json:"expires_at_millis"`
31+
OriginalSequence uint64 `json:"original_sequence,omitempty"`
32+
}
33+
34+
// sqsMsgVisKeyBytes reproduces adapter/sqs_messages.go sqsMsgVisKey:
35+
// prefix + base64url(queue) + BE-u64(gen) + BE-u64(visibleAt) +
36+
// base64url(messageID). Negative visibleAt clamps to zero, matching the
37+
// live uint64MaxZero helper.
38+
func sqsMsgVisKeyBytes(queueName string, gen uint64, visibleAtMillis int64, messageID string) []byte {
39+
out := make([]byte, 0, len(SQSMsgVisPrefix)+sqsSideKeyAllocBytes)
40+
out = append(out, SQSMsgVisPrefix...)
41+
out = append(out, encodeSQSSegment(queueName)...)
42+
out = binary.BigEndian.AppendUint64(out, gen)
43+
out = binary.BigEndian.AppendUint64(out, sqsClampNonNegativeMillis(visibleAtMillis))
44+
return append(out, encodeSQSSegment(messageID)...)
45+
}
46+
47+
// sqsMsgByAgeKeyBytes reproduces adapter/sqs_keys.go sqsMsgByAgeKey:
48+
// prefix + base64url(queue) + BE-u64(gen) + BE-u64(sendTs) +
49+
// base64url(messageID). Negative sendTs clamps to zero.
50+
func sqsMsgByAgeKeyBytes(queueName string, gen uint64, sendTimestampMs int64, messageID string) []byte {
51+
out := make([]byte, 0, len(SQSMsgByAgePrefix)+sqsSideKeyAllocBytes)
52+
out = append(out, SQSMsgByAgePrefix...)
53+
out = append(out, encodeSQSSegment(queueName)...)
54+
out = binary.BigEndian.AppendUint64(out, gen)
55+
out = binary.BigEndian.AppendUint64(out, sqsClampNonNegativeMillis(sendTimestampMs))
56+
return append(out, encodeSQSSegment(messageID)...)
57+
}
58+
59+
// sqsMsgDedupKeyBytes reproduces adapter/sqs_keys.go sqsMsgDedupKey:
60+
// prefix + base64url(queue) + BE-u64(sqsRestoreGeneration) +
61+
// base64url(dedupID). The live adapter signature accepts a variable gen,
62+
// but every M5-2 call site uses sqsRestoreGeneration (a fresh restore
63+
// has exactly one live generation, with no superseded counters to
64+
// reference), so the parameter is hardcoded here to satisfy unparam.
65+
func sqsMsgDedupKeyBytes(queueName, dedupID string) []byte {
66+
out := make([]byte, 0, len(SQSMsgDedupPrefix)+sqsSideKeyAllocBytes)
67+
out = append(out, SQSMsgDedupPrefix...)
68+
out = append(out, encodeSQSSegment(queueName)...)
69+
out = binary.BigEndian.AppendUint64(out, sqsRestoreGeneration)
70+
return append(out, encodeSQSSegment(dedupID)...)
71+
}
72+
73+
// sqsClampNonNegativeMillis mirrors adapter/sqs_messages.go uint64MaxZero:
74+
// wall-clock millis should never be negative, but a negative int64 would
75+
// silently overflow under a direct uint64() cast and produce a far-future
76+
// key, so clamp to zero defensively.
77+
func sqsClampNonNegativeMillis(v int64) uint64 {
78+
if v < 0 {
79+
return 0
80+
}
81+
return uint64(v)
82+
}
83+
84+
// encodeFifoDedupRecordBytes mirrors adapter/sqs_fifo.go encodeFifoDedupRecord:
85+
// straight json.Marshal of the four-field struct. Wrapped with WithStack so
86+
// callers get a uniform stack-trace at the error site.
87+
func encodeFifoDedupRecordBytes(r *sqsFifoDedupRecord) ([]byte, error) {
88+
b, err := json.Marshal(r)
89+
if err != nil {
90+
return nil, errors.WithStack(err)
91+
}
92+
return b, nil
93+
}
94+
95+
// resolveDedupID mirrors adapter/sqs_fifo.go resolveFifoDedupID exactly: an
96+
// explicit MessageDeduplicationId wins; otherwise on ContentBasedDeduplication
97+
// queues the dedup-id is sha256(body) hex-encoded; otherwise empty (= no row).
98+
//
99+
// This is the critical CBD-correctness path: the live adapter writes only the
100+
// USER-SUPPLIED MessageDeduplicationId into the stored sqsStoredMessage
101+
// (sqs_messages.go:680), NOT the resolved one. So a CBD-FIFO message round-
102+
// trips through the dump as message_deduplication_id="", and a naive
103+
// non-empty-only gate would silently lose dedup protection on restore for
104+
// every CBD queue. We have to redo the live derivation at restore time.
105+
func resolveDedupID(rec *sqsMessageRecord, meta *sqsQueueMetaPublic) string {
106+
if rec.MessageDedupID != "" {
107+
return rec.MessageDedupID
108+
}
109+
if meta.ContentBasedDeduplication {
110+
sum := sha256.Sum256(rec.Body)
111+
return hex.EncodeToString(sum[:])
112+
}
113+
return ""
114+
}
115+
116+
// addSideRecords emits the vis + byage + (conditional) dedup rows that the
117+
// live adapter would have written alongside the !sqs|msg|data| record M5-1
118+
// already stages. No !sqs|msg|group| rows are emitted at any time — see
119+
// docs/design/2026_05_30_proposed_sqs_side_record_derivation.md "Families"
120+
// table for why (loadFifoGroupLock treats key presence alone as "lock held";
121+
// any value would permanently block every group post-restore).
122+
//
123+
// Emission rules:
124+
// - vis: always emitted, visibleAt = rec.AvailableAtMillis. For delayed
125+
// messages captured before their delay expired this is in the future;
126+
// the live ReceiveMessage path honors this exactly as it would for a
127+
// fresh send (the message is invisible until the scheduled time).
128+
// - byage: always emitted, sendTs = rec.SendTimestampMillis (required by
129+
// the reaper to honor MessageRetentionPeriod after restore).
130+
// - dedup: FIFO + resolveDedupID(rec, meta) non-empty. ExpiresAtMillis =
131+
// SendTimestampMs + sqsFifoDedupWindowMillis. CBD queues get a SHA-256
132+
// derived dedup-id (matches adapter/sqs_fifo.go resolveFifoDedupID).
133+
func (e *SQSRecordEncoder) addSideRecords(b *snapshotBuilder, queueName string, meta *sqsQueueMetaPublic, rec *sqsMessageRecord) error {
134+
msgIDBytes := []byte(rec.MessageID)
135+
136+
visKey := sqsMsgVisKeyBytes(queueName, sqsRestoreGeneration, rec.AvailableAtMillis, rec.MessageID)
137+
if err := b.Add(visKey, msgIDBytes, 0); err != nil {
138+
return err
139+
}
140+
141+
byAgeKey := sqsMsgByAgeKeyBytes(queueName, sqsRestoreGeneration, rec.SendTimestampMillis, rec.MessageID)
142+
if err := b.Add(byAgeKey, msgIDBytes, 0); err != nil {
143+
return err
144+
}
145+
146+
if !meta.FIFO {
147+
return nil
148+
}
149+
dedupID := resolveDedupID(rec, meta)
150+
if dedupID == "" {
151+
return nil
152+
}
153+
dedupRec := &sqsFifoDedupRecord{
154+
MessageID: rec.MessageID,
155+
SendTimestampMs: rec.SendTimestampMillis,
156+
ExpiresAtMillis: rec.SendTimestampMillis + sqsFifoDedupWindowMillis,
157+
OriginalSequence: rec.SequenceNumber,
158+
}
159+
val, err := encodeFifoDedupRecordBytes(dedupRec)
160+
if err != nil {
161+
return err
162+
}
163+
dedupKey := sqsMsgDedupKeyBytes(queueName, dedupID)
164+
return b.Add(dedupKey, val, 0)
165+
}

0 commit comments

Comments
 (0)