Skip to content

Commit 65ea67c

Browse files
committed
backup: preserve HT-FIFO queue attributes (PR #714, round 6)
Codex P1 round 12 (commit 5c6a68b): `decodeSQSQueueMetaValue` only projected legacy SQS fields into _queue.json. The HT-FIFO immutable attributes (PartitionCount, FifoThroughputLimit, DeduplicationScope) were captured by the live catalog at CreateQueue but silently dropped by the backup decoder. The adapter rejects mutating these via SetQueueAttributes (they are immutable per AWS contract), so a backup→restore roundtrip would silently recreate single-partition / default-routing / queue-scoped-dedup queues — non-fidelity preserving for any partitioned FIFO workload. Add the three fields to sqsQueueMetaPublic and the live decode struct so they round-trip through the dump. Test: TestSQS_QueueMetaPreservesHTFIFOAttributes drives a 4-partition perMessageGroupId / messageGroup-dedup queue and asserts each field is read back from _queue.json.
1 parent 5c6a68b commit 65ea67c

2 files changed

Lines changed: 57 additions & 0 deletions

File tree

internal/backup/sqs.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,14 @@ type sqsInternalRecord struct {
137137
// sqsQueueMetaPublic is the dump-format projection of the live
138138
// adapter/sqs_catalog.go sqsQueueMeta. Field names match the AWS-style
139139
// vocabulary an external restore tool would use.
140+
//
141+
// PartitionCount, FifoThroughputLimit, and DeduplicationScope mirror
142+
// the HT-FIFO attributes captured at CreateQueue time. The adapter
143+
// rejects mutating these via SetQueueAttributes (they are immutable
144+
// per AWS contract), so dropping them at backup would silently
145+
// recreate single-partition / default-routing / queue-scoped-dedup
146+
// queues on restore — non-fidelity-preserving for any partitioned
147+
// FIFO workload. Codex P1 round 12.
140148
type sqsQueueMetaPublic struct {
141149
FormatVersion uint32 `json:"format_version"`
142150
Name string `json:"name"`
@@ -148,6 +156,9 @@ type sqsQueueMetaPublic struct {
148156
ReceiveMessageWaitSeconds int64 `json:"receive_message_wait_seconds,omitempty"`
149157
MaximumMessageSize int64 `json:"maximum_message_size,omitempty"`
150158
RedrivePolicy string `json:"redrive_policy,omitempty"`
159+
PartitionCount uint32 `json:"partition_count,omitempty"`
160+
FifoThroughputLimit string `json:"fifo_throughput_limit,omitempty"`
161+
DeduplicationScope string `json:"deduplication_scope,omitempty"`
151162
}
152163

153164
// sqsMessageBody is the dump-format projection's body field. It marshals
@@ -643,6 +654,10 @@ func decodeSQSQueueMetaValue(value []byte) (*sqsQueueMetaPublic, error) {
643654
ReceiveMessageWaitSeconds int64 `json:"receive_message_wait_seconds"`
644655
MaximumMessageSize int64 `json:"maximum_message_size"`
645656
RedrivePolicy string `json:"redrive_policy"`
657+
// HT-FIFO immutable attributes — see adapter/sqs_catalog.go.
658+
PartitionCount uint32 `json:"partition_count"`
659+
FifoThroughputLimit string `json:"fifo_throughput_limit"`
660+
DeduplicationScope string `json:"deduplication_scope"`
646661
}
647662
if err := json.Unmarshal(body, &live); err != nil {
648663
return nil, errors.Wrap(ErrSQSInvalidQueueMeta, err.Error())
@@ -658,6 +673,9 @@ func decodeSQSQueueMetaValue(value []byte) (*sqsQueueMetaPublic, error) {
658673
ReceiveMessageWaitSeconds: live.ReceiveMessageWaitSeconds,
659674
MaximumMessageSize: live.MaximumMessageSize,
660675
RedrivePolicy: live.RedrivePolicy,
676+
PartitionCount: live.PartitionCount,
677+
FifoThroughputLimit: live.FifoThroughputLimit,
678+
DeduplicationScope: live.DeduplicationScope,
661679
}, nil
662680
}
663681

internal/backup/sqs_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,45 @@ func TestSQS_ParseMessageDataKey_RejectsEmptyMsgIDSegment(t *testing.T) {
377377
}
378378
}
379379

380+
// TestSQS_QueueMetaPreservesHTFIFOAttributes is the regression for
381+
// Codex P1 round 12: PartitionCount, FifoThroughputLimit, and
382+
// DeduplicationScope are immutable HT-FIFO attributes captured at
383+
// CreateQueue and rejected by SetQueueAttributes. Dropping them at
384+
// backup time would silently recreate single-partition / default-
385+
// routing / queue-scoped-dedup queues on restore — non-fidelity
386+
// preserving for any partitioned FIFO workload.
387+
func TestSQS_QueueMetaPreservesHTFIFOAttributes(t *testing.T) {
388+
t.Parallel()
389+
enc, root := newSQSEncoder(t)
390+
queue := "ht-fifo.fifo"
391+
val := encodeQueueMetaValue(t, map[string]any{
392+
"name": queue,
393+
"is_fifo": true,
394+
"content_based_dedup": true,
395+
"visibility_timeout_seconds": 30,
396+
"message_retention_seconds": 345600,
397+
"partition_count": 4,
398+
"fifo_throughput_limit": "perMessageGroupId",
399+
"deduplication_scope": "messageGroup",
400+
})
401+
if err := enc.HandleQueueMeta(EncodeQueueMetaKey(queue), val); err != nil {
402+
t.Fatal(err)
403+
}
404+
if err := enc.Finalize(); err != nil {
405+
t.Fatal(err)
406+
}
407+
got := readQueueJSON(t, filepath.Join(root, "sqs", queue, "_queue.json"))
408+
if floatField(t, got, "partition_count") != 4 {
409+
t.Fatalf("partition_count = %v want 4", got["partition_count"])
410+
}
411+
if got["fifo_throughput_limit"] != "perMessageGroupId" {
412+
t.Fatalf("fifo_throughput_limit = %v want perMessageGroupId", got["fifo_throughput_limit"])
413+
}
414+
if got["deduplication_scope"] != "messageGroup" {
415+
t.Fatalf("deduplication_scope = %v want messageGroup", got["deduplication_scope"])
416+
}
417+
}
418+
380419
// TestSQS_StaleGenerationMessagesDropped is the regression for Codex
381420
// P1 round 10: PurgeQueue and DeleteQueue bump the queue's generation
382421
// but the affected !sqs|msg|data|<oldGen>|... rows are removed

0 commit comments

Comments
 (0)