Skip to content

Commit 0bd04e9

Browse files
committed
fix(sqs): assign FIFO seq on FIFO->FIFO redrive + reject mixed attribute values
Two findings from the latest Codex review on `cdb3c87` that the prior "Ready to merge" verdicts had not addressed. P1 — FIFO->FIFO redrive committed `SequenceNumber=0` and never advanced the DLQ's sqsQueueSeqKey (`sqs_redrive.go:265-272, 213`). Once `cdb3c87` enabled FIFO->FIFO redrives by enforcing type equality, the SequenceNumber path became reachable and showed: - the DLQ record carried the zero-value `SequenceNumber` (AWS surfaces this verbatim; AWS sequences start at 1, never 0), - the DLQ's per-queue sequence counter was never incremented, so a subsequent normal FIFO send to the DLQ assigned a number lower than the redriven message — non-monotonic to consumers, breaking the FIFO contract, - two concurrent FIFO->FIFO redrives both wrote `SequenceNumber=0` with no OCC conflict because `sqsQueueSeqKey(policy.DLQName)` was not on `ReadKeys`. Fix: when `dlqMeta.IsFIFO`, `redriveCandidateToDLQ` now loads the DLQ's seq counter at `readTS`, computes `nextSeq = prev+1`, threads the value through `buildDLQRecord` (where it is stamped onto the record before encoding) and `buildRedriveOps` (where the seq Put is added to `Elems` and `sqsQueueSeqKey(policy.DLQName)` is added to `ReadKeys`). Standard DLQs are unchanged: `dlqSeq=0` flows through unused. P2 — `validateOneMessageAttribute` accepted Binary attributes that also carried a non-empty `StringValue` (`sqs_messages.go:1523-1527`). AWS requires each MessageAttributeValue to populate exactly one of {StringValue, BinaryValue}; submitting both is an `InvalidParameterValue`. The previous validator only checked the type-required field was non-empty and silently let the foreign field through, where it would be persisted into the record and round-tripped on ReceiveMessage with mismatched MD5 hashes. Fix: extracted the type-specific value-pair check into `validateMessageAttributeValuePair` and added the symmetric guards — Binary rejects `StringValue != ""`, String/Number rejects `len(BinaryValue) > 0`. The split also keeps `validateOneMessageAttribute` under the cyclop budget. Tests: - `TestSQSServer_FifoFifoRedriveAssignsSequenceNumber` (`sqs_extra_test.go`): create FIFO source + FIFO DLQ, send poison, redrive on the second receive, assert the DLQ message's SequenceNumber > 0, then send a fresh FIFO message to the DLQ and assert its SequenceNumber == redrivenSeq + 1 (monotonic continuation, the invariant the fix guarantees). - `TestSQSServer_SendMessageRejectsBinaryWithStringValue` (`sqs_extra_test.go`): table-driven over the three illegal combinations (Binary + non-empty StringValue, String + non-empty BinaryValue, Number + non-empty BinaryValue); each must return HTTP 400 / InvalidAttributeValue. Per CLAUDE.md self-review: 1. Data loss — None. The poison message body / attributes flow unchanged; only the SequenceNumber field and DLQ counter change. 2. Concurrency — `sqsQueueSeqKey(policy.DLQName)` is now on ReadKeys, so two concurrent FIFO->FIFO redrives serialise on write-write-conflict, same as concurrent FIFO sends. 3. Performance — One extra `loadFifoSequence` per FIFO redrive; negligible vs. the existing `validateRedriveTargets` reads. 4. Data consistency — DLQ SequenceNumber now monotonic across both redrive and normal-send paths; no two messages can share a sequence number on the same FIFO queue. 5. Test coverage — Both fixes ship with co-located regression tests; the existing FIFO-related tests (`TestSQSServer_FifoSequenceNumberMonotonic`, `TestSQSServer_DLQRedriveOnMaxReceiveCount`, `TestSQSServer_RedrivePolicy*`) remain green.
1 parent cdb3c87 commit 0bd04e9

3 files changed

Lines changed: 230 additions & 3 deletions

File tree

adapter/sqs_extra_test.go

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1746,3 +1746,164 @@ func TestSQSServer_TagQueueRequiresTags(t *testing.T) {
17461746
t.Fatalf("untag without TagKeys: %d %v", status, out)
17471747
}
17481748
}
1749+
1750+
// TestSQSServer_FifoFifoRedriveAssignsSequenceNumber pins the round-N
1751+
// Codex P1 fix on `cdb3c87` redrive: a FIFO source whose RedrivePolicy
1752+
// targets a FIFO DLQ used to commit the DLQ record with
1753+
// SequenceNumber = 0 (zero-value, not even AWS's "starts at 1"
1754+
// invariant), and the DLQ's per-queue sequence counter
1755+
// (sqsQueueSeqKey) was never advanced. Consumers reading the DLQ saw
1756+
// 0 verbatim, and a normal FIFO send to the DLQ later produced a
1757+
// number lower than the redriven message — non-monotonic per AWS's
1758+
// FIFO contract. The fix loads the DLQ seq at readTS, increments it,
1759+
// stamps it onto the DLQ record, and includes the seq Put in the OCC
1760+
// transaction. This test asserts both halves: (a) the redriven DLQ
1761+
// record carries SequenceNumber = 1, (b) a subsequent FIFO send to
1762+
// the DLQ carries SequenceNumber = 2.
1763+
func TestSQSServer_FifoFifoRedriveAssignsSequenceNumber(t *testing.T) {
1764+
t.Parallel()
1765+
nodes, _, _ := createNode(t, 1)
1766+
defer shutdown(nodes)
1767+
node := sqsLeaderNode(t, nodes)
1768+
1769+
// FIFO DLQ.
1770+
_, out := callSQS(t, node, sqsCreateQueueTarget, map[string]any{
1771+
"QueueName": "dlq-fifo.fifo",
1772+
"Attributes": map[string]string{"FifoQueue": "true"},
1773+
})
1774+
dlqURL, _ := out["QueueUrl"].(string)
1775+
if dlqURL == "" {
1776+
t.Fatalf("FIFO DLQ create failed: %v", out)
1777+
}
1778+
1779+
// FIFO source pointing at the FIFO DLQ. Both queues are FIFO so
1780+
// the type-equality guard added in cdb3c87 lets the redrive
1781+
// proceed; this test exercises the SequenceNumber assignment that
1782+
// became reachable only with that change.
1783+
policy := `{"deadLetterTargetArn":"arn:aws:sqs:us-east-1:000000000000:dlq-fifo.fifo","maxReceiveCount":1}`
1784+
_, out = callSQS(t, node, sqsCreateQueueTarget, map[string]any{
1785+
"QueueName": "src-fifo.fifo",
1786+
"Attributes": map[string]string{
1787+
"FifoQueue": "true",
1788+
"RedrivePolicy": policy,
1789+
},
1790+
})
1791+
srcURL, _ := out["QueueUrl"].(string)
1792+
1793+
// Send a poison message to the FIFO source.
1794+
_, _ = callSQS(t, node, sqsSendMessageTarget, map[string]any{
1795+
"QueueUrl": srcURL,
1796+
"MessageBody": "poison",
1797+
"MessageGroupId": "g1",
1798+
"MessageDeduplicationId": "d-poison",
1799+
})
1800+
1801+
// First receive bumps ReceiveCount to 1 == maxReceiveCount;
1802+
// second receive (after visibility expiry) triggers the redrive.
1803+
_, _ = callSQS(t, node, sqsReceiveMessageTarget, map[string]any{
1804+
"QueueUrl": srcURL,
1805+
"MaxNumberOfMessages": 1,
1806+
"VisibilityTimeout": 1,
1807+
})
1808+
time.Sleep(1100 * time.Millisecond)
1809+
_, _ = callSQS(t, node, sqsReceiveMessageTarget, map[string]any{
1810+
"QueueUrl": srcURL,
1811+
"MaxNumberOfMessages": 1,
1812+
})
1813+
1814+
// DLQ now has the moved message. SequenceNumber must be > 0;
1815+
// pre-fix it was 0 (zero-value, never set).
1816+
_, out = callSQS(t, node, sqsReceiveMessageTarget, map[string]any{
1817+
"QueueUrl": dlqURL,
1818+
"MaxNumberOfMessages": 1,
1819+
"VisibilityTimeout": 60,
1820+
})
1821+
msgs, _ := out["Messages"].([]any)
1822+
if len(msgs) != 1 {
1823+
t.Fatalf("DLQ expected 1 redriven message, got %d (%v)", len(msgs), out)
1824+
}
1825+
moved, _ := msgs[0].(map[string]any)
1826+
movedAttrs, _ := moved["Attributes"].(map[string]any)
1827+
movedSeqStr, _ := movedAttrs["SequenceNumber"].(string)
1828+
movedSeq, _ := strconv.ParseUint(movedSeqStr, 10, 64)
1829+
if movedSeq == 0 {
1830+
t.Fatalf("redriven DLQ message has SequenceNumber=0 (regression: counter not advanced); attrs=%v", movedAttrs)
1831+
}
1832+
1833+
// Second half: a normal FIFO send to the DLQ must observe the
1834+
// advanced counter and assign movedSeq + 1, not start over from
1835+
// 1 (which would put two messages with the same sequence on the
1836+
// queue, the exact bug the fix is preventing).
1837+
follow := sendFifoMessage(t, node, dlqURL, "g-dlq", "d-follow", "follow")
1838+
if follow != movedSeq+1 {
1839+
t.Fatalf("DLQ FIFO send after redrive: SequenceNumber=%d, want %d (= %d + 1)", follow, movedSeq+1, movedSeq)
1840+
}
1841+
}
1842+
1843+
// TestSQSServer_SendMessageRejectsBinaryWithStringValue pins the
1844+
// round-N Codex P2 fix: AWS requires a MessageAttributeValue to
1845+
// populate exactly one of {StringValue, BinaryValue}. The previous
1846+
// validator only checked that BinaryValue was non-empty for Binary
1847+
// types; an attribute carrying both fields would be persisted into
1848+
// the record (and round-tripped on ReceiveMessage), which is not
1849+
// AWS behavior and would surface mismatched MD5 hashes downstream.
1850+
// The symmetric String/Number + non-empty BinaryValue case is also
1851+
// asserted.
1852+
func TestSQSServer_SendMessageRejectsBinaryWithStringValue(t *testing.T) {
1853+
t.Parallel()
1854+
nodes, _, _ := createNode(t, 1)
1855+
defer shutdown(nodes)
1856+
node := sqsLeaderNode(t, nodes)
1857+
queueURL := createSQSQueueForTest(t, node, "binary-string-mix")
1858+
1859+
cases := []struct {
1860+
name string
1861+
attrs map[string]any
1862+
}{
1863+
{
1864+
name: "Binary with non-empty StringValue",
1865+
attrs: map[string]any{
1866+
"X": map[string]any{
1867+
"DataType": "Binary",
1868+
"BinaryValue": []byte{0x01, 0x02, 0x03},
1869+
"StringValue": "stowaway",
1870+
},
1871+
},
1872+
},
1873+
{
1874+
name: "String with non-empty BinaryValue",
1875+
attrs: map[string]any{
1876+
"X": map[string]any{
1877+
"DataType": "String",
1878+
"StringValue": "ok",
1879+
"BinaryValue": []byte{0x01},
1880+
},
1881+
},
1882+
},
1883+
{
1884+
name: "Number with non-empty BinaryValue",
1885+
attrs: map[string]any{
1886+
"X": map[string]any{
1887+
"DataType": "Number",
1888+
"StringValue": "1",
1889+
"BinaryValue": []byte{0x01},
1890+
},
1891+
},
1892+
},
1893+
}
1894+
for _, tc := range cases {
1895+
t.Run(tc.name, func(t *testing.T) {
1896+
status, out := callSQS(t, node, sqsSendMessageTarget, map[string]any{
1897+
"QueueUrl": queueURL,
1898+
"MessageBody": "body",
1899+
"MessageAttributes": tc.attrs,
1900+
})
1901+
if status != http.StatusBadRequest {
1902+
t.Fatalf("expected 400, got %d (%v)", status, out)
1903+
}
1904+
if got, _ := out["__type"].(string); got != sqsErrInvalidAttributeValue {
1905+
t.Fatalf("error type: %q want %q (%v)", got, sqsErrInvalidAttributeValue, out)
1906+
}
1907+
})
1908+
}
1909+
}

adapter/sqs_messages.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1514,17 +1514,37 @@ func validateOneMessageAttribute(name string, v sqsMessageAttributeValue) error
15141514
if i := strings.Index(v.DataType, "."); i >= 0 {
15151515
base = v.DataType[:i]
15161516
}
1517+
return validateMessageAttributeValuePair(name, base, v)
1518+
}
1519+
1520+
// validateMessageAttributeValuePair enforces "exactly one value field
1521+
// populated, matching the DataType" on a MessageAttributeValue. AWS
1522+
// rejects an attribute that carries both StringValue and BinaryValue
1523+
// (or that carries the wrong one for its DataType); without these
1524+
// guards a malformed client could persist bytes into the record that
1525+
// then round-trip on ReceiveMessage, producing mismatched MD5 hashes
1526+
// downstream. Pulled out of validateOneMessageAttribute so that
1527+
// function stays under the cyclop budget.
1528+
func validateMessageAttributeValuePair(name, base string, v sqsMessageAttributeValue) error {
15171529
switch base {
15181530
case "String", "Number":
15191531
if v.StringValue == "" {
15201532
return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue,
15211533
"MessageAttribute "+name+" requires StringValue")
15221534
}
1535+
if len(v.BinaryValue) > 0 {
1536+
return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue,
1537+
"MessageAttribute "+name+" must not include BinaryValue for "+base+" type")
1538+
}
15231539
case sqsAttributeBaseTypeBinary:
15241540
if len(v.BinaryValue) == 0 {
15251541
return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue,
15261542
"MessageAttribute "+name+" requires BinaryValue")
15271543
}
1544+
if v.StringValue != "" {
1545+
return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue,
1546+
"MessageAttribute "+name+" must not include StringValue for Binary type")
1547+
}
15281548
default:
15291549
return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue,
15301550
"MessageAttribute "+name+" has unsupported DataType "+v.DataType)

adapter/sqs_redrive.go

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package adapter
33
import (
44
"context"
55
"net/http"
6+
"strconv"
67
"strings"
78
"time"
89

@@ -125,11 +126,28 @@ func (s *SQSServer) redriveCandidateToDLQ(
125126
if err != nil {
126127
return false, err
127128
}
128-
dlqRec, dlqRecordBytes, err := buildDLQRecord(srcRec, dlqMeta, srcArn)
129+
// FIFO DLQs require the redrive write to participate in the
130+
// per-queue SequenceNumber sequence, otherwise the DLQ record
131+
// is committed with SequenceNumber=0 (AWS surfaces this
132+
// verbatim, and 0 violates AWS's invariant that sequences
133+
// start at 1) and the next normal FIFO send to the DLQ assigns
134+
// a number lower than the redriven message — non-monotonic to
135+
// consumers. Load the seq snapshot at readTS, increment, and
136+
// pass it into both buildDLQRecord (encoded onto the record)
137+
// and buildRedriveOps (Put + ReadKeys fence).
138+
var dlqSeq uint64
139+
if dlqMeta.IsFIFO {
140+
prevSeq, err := s.loadFifoSequence(ctx, policy.DLQName, readTS)
141+
if err != nil {
142+
return false, err
143+
}
144+
dlqSeq = prevSeq + 1
145+
}
146+
dlqRec, dlqRecordBytes, err := buildDLQRecord(srcRec, dlqMeta, srcArn, dlqSeq)
129147
if err != nil {
130148
return false, err
131149
}
132-
req, err := s.buildRedriveOps(ctx, srcQueueName, srcGen, cand, srcDataKey, srcRec, policy, dlqMeta, dlqRec, dlqRecordBytes, readTS)
150+
req, err := s.buildRedriveOps(ctx, srcQueueName, srcGen, cand, srcDataKey, srcRec, policy, dlqMeta, dlqRec, dlqRecordBytes, dlqSeq, readTS)
133151
if err != nil {
134152
return false, err
135153
}
@@ -200,7 +218,16 @@ func (s *SQSServer) validateRedriveTargets(
200218
// buildDLQRecord assembles the DLQ-side message record. Reset
201219
// ReceiveCount and FirstReceiveMillis so the DLQ consumer sees a
202220
// fresh delivery, not the source's bounce history.
203-
func buildDLQRecord(srcRec *sqsMessageRecord, dlqMeta *sqsQueueMeta, srcArn string) (*sqsMessageRecord, []byte, error) {
221+
//
222+
// dlqSeq is the SequenceNumber to assign on the DLQ record, computed
223+
// by the caller as `loadFifoSequence(dlq) + 1` for FIFO DLQs and
224+
// passed as 0 for Standard DLQs (the field is unused in that case).
225+
// The seq must be the same value the caller will Put into the DLQ's
226+
// sqsQueueSeqKey inside the same OCC transaction (see buildRedriveOps);
227+
// otherwise the redriven message and the on-disk counter disagree
228+
// and a later FIFO send to the DLQ produces a non-monotonic
229+
// SequenceNumber.
230+
func buildDLQRecord(srcRec *sqsMessageRecord, dlqMeta *sqsQueueMeta, srcArn string, dlqSeq uint64) (*sqsMessageRecord, []byte, error) {
204231
dlqMsgID, err := newMessageIDHex()
205232
if err != nil {
206233
return nil, nil, errors.WithStack(err)
@@ -227,6 +254,7 @@ func buildDLQRecord(srcRec *sqsMessageRecord, dlqMeta *sqsQueueMeta, srcArn stri
227254
MessageGroupId: srcRec.MessageGroupId,
228255
MessageDeduplicationId: srcRec.MessageDeduplicationId,
229256
DeadLetterSourceArn: srcArn,
257+
SequenceNumber: dlqSeq,
230258
}
231259
body, err := encodeSQSMessageRecord(rec)
232260
if err != nil {
@@ -239,6 +267,14 @@ func buildDLQRecord(srcRec *sqsMessageRecord, dlqMeta *sqsQueueMeta, srcArn stri
239267
// atomically removes the source's keyspace and writes the DLQ
240268
// version. The FIFO group-lock release branch lives here so the
241269
// caller stays under the cyclomatic budget.
270+
//
271+
// dlqSeq is non-zero only when the DLQ is FIFO (per the caller's
272+
// pre-load via loadFifoSequence). When non-zero, the txn additionally
273+
// reads sqsQueueSeqKey(policy.DLQName) — guarding against a
274+
// concurrent FIFO send / redrive racing for the same sequence — and
275+
// writes the new value back. dlqRec.SequenceNumber is already set to
276+
// dlqSeq inside buildDLQRecord; this function is responsible only for
277+
// the OCC plumbing.
242278
func (s *SQSServer) buildRedriveOps(
243279
ctx context.Context,
244280
srcQueueName string,
@@ -250,6 +286,7 @@ func (s *SQSServer) buildRedriveOps(
250286
dlqMeta *sqsQueueMeta,
251287
dlqRec *sqsMessageRecord,
252288
dlqRecordBytes []byte,
289+
dlqSeq uint64,
253290
readTS uint64,
254291
) (*kv.OperationGroup[kv.OP], error) {
255292
now := dlqRec.SendTimestampMillis
@@ -270,6 +307,15 @@ func (s *SQSServer) buildRedriveOps(
270307
{Op: kv.Put, Key: dlqVisKey, Value: []byte(dlqRec.MessageID)},
271308
{Op: kv.Put, Key: dlqByAgeKey, Value: []byte(dlqRec.MessageID)},
272309
}
310+
if dlqMeta.IsFIFO {
311+
seqKey := sqsQueueSeqKey(policy.DLQName)
312+
readKeys = append(readKeys, seqKey)
313+
elems = append(elems, &kv.Elem[kv.OP]{
314+
Op: kv.Put,
315+
Key: seqKey,
316+
Value: []byte(strconv.FormatUint(dlqSeq, 10)),
317+
})
318+
}
273319
if srcRec.MessageGroupId != "" {
274320
lockKey := sqsMsgGroupKey(srcQueueName, srcGen, srcRec.MessageGroupId)
275321
lock, err := s.loadFifoGroupLock(ctx, srcQueueName, srcGen, srcRec.MessageGroupId, readTS)

0 commit comments

Comments
 (0)