Skip to content

Commit cdb3c87

Browse files
committed
fix(sqs): reject FIFO->Standard redrive policies, not just Standard->FIFO
The existing guard in redriveCandidateToDLQ only rejected the Standard-source -> FIFO-DLQ direction. The symmetric mismatch (FIFO source -> Standard DLQ) still committed: buildDLQRecord copies srcRec.MessageGroupId verbatim, and tryDeliverCandidate takes its FIFO group-lock path whenever MessageGroupId != "" - serializing delivery in a queue that should behave as Standard. Fix: load srcMeta in addition to dlqMeta and require srcMeta.IsFIFO == dlqMeta.IsFIFO before building the OCC dispatch. This is the runtime defense AWS relies on - the catalog accepts a RedrivePolicy that names a queue created or recreated as a different type later, so attribute-time validation cannot catch every case. The existing FIFO-DLQ + empty-MessageGroupId guard is kept as a defense-in-depth rail for malformed FIFO->FIFO records that slip past the type-equality check. Validation logic moved to a validateRedriveTargets helper so redriveCandidateToDLQ stays under the cyclop=10 ceiling per project rules - no //nolint. Adds TestSQSServer_RedrivePolicyStandardDlqRejectsFifoSource which fails on the prior code (DLQ ends up with a record carrying MessageGroupId:g1) and passes after the fix.
1 parent 9e8148c commit cdb3c87

2 files changed

Lines changed: 128 additions & 31 deletions

File tree

adapter/sqs_extra_test.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1654,6 +1654,77 @@ func TestSQSServer_RedrivePolicyFifoDlqRejectsStandardSource(t *testing.T) {
16541654
}
16551655
}
16561656

1657+
func TestSQSServer_RedrivePolicyStandardDlqRejectsFifoSource(t *testing.T) {
1658+
t.Parallel()
1659+
// Symmetric to RedrivePolicyFifoDlqRejectsStandardSource: the
1660+
// existing guard rejected only the Standard→FIFO direction, but
1661+
// FIFO→Standard was equally broken. A FIFO source carries
1662+
// MessageGroupId on every record; copying that into a Standard
1663+
// DLQ and then issuing ReceiveMessage on the DLQ trips
1664+
// tryDeliverCandidate's FIFO group-lock branch (gated solely on
1665+
// MessageGroupId != ""), which serializes delivery in a queue
1666+
// clients believe behaves as Standard. AWS rejects mixed-type
1667+
// redrive policies; this test pins that behavior.
1668+
nodes, _, _ := createNode(t, 1)
1669+
defer shutdown(nodes)
1670+
node := sqsLeaderNode(t, nodes)
1671+
1672+
// Standard DLQ (no FifoQueue attribute).
1673+
_, out := callSQS(t, node, sqsCreateQueueTarget, map[string]any{
1674+
"QueueName": "dlq-standard",
1675+
})
1676+
dlqURL, _ := out["QueueUrl"].(string)
1677+
if dlqURL == "" {
1678+
t.Fatalf("Standard DLQ create failed: %v", out)
1679+
}
1680+
1681+
// FIFO source with RedrivePolicy targeting the Standard DLQ.
1682+
policy := `{"deadLetterTargetArn":"arn:aws:sqs:us-east-1:000000000000:dlq-standard","maxReceiveCount":1}`
1683+
status, out := callSQS(t, node, sqsCreateQueueTarget, map[string]any{
1684+
"QueueName": "src-fifo.fifo",
1685+
"Attributes": map[string]string{
1686+
"FifoQueue": "true",
1687+
"RedrivePolicy": policy,
1688+
},
1689+
})
1690+
if status != http.StatusOK {
1691+
t.Fatalf("create FIFO source with Standard-DLQ policy: %d %v", status, out)
1692+
}
1693+
srcURL, _ := out["QueueUrl"].(string)
1694+
1695+
_, _ = callSQS(t, node, sqsSendMessageTarget, map[string]any{
1696+
"QueueUrl": srcURL,
1697+
"MessageBody": "poison",
1698+
"MessageGroupId": "g1",
1699+
"MessageDeduplicationId": "d1",
1700+
})
1701+
// First receive bumps ReceiveCount to 1 == maxReceiveCount;
1702+
// second receive (after visibility expiry) should attempt redrive.
1703+
// The runtime type-compatibility gate must trip, leaving the DLQ
1704+
// empty and the source message intact.
1705+
_, _ = callSQS(t, node, sqsReceiveMessageTarget, map[string]any{
1706+
"QueueUrl": srcURL,
1707+
"MaxNumberOfMessages": 1,
1708+
"VisibilityTimeout": 1,
1709+
})
1710+
time.Sleep(1100 * time.Millisecond)
1711+
_, _ = callSQS(t, node, sqsReceiveMessageTarget, map[string]any{
1712+
"QueueUrl": srcURL,
1713+
"MaxNumberOfMessages": 1,
1714+
})
1715+
1716+
// Direct DLQ receive: a successful redrive would have written a
1717+
// record with non-empty MessageGroupId here; the fix must keep
1718+
// the DLQ empty.
1719+
_, body := callSQS(t, node, sqsReceiveMessageTarget, map[string]any{
1720+
"QueueUrl": dlqURL,
1721+
"MaxNumberOfMessages": 10,
1722+
})
1723+
if msgs, _ := body["Messages"].([]any); len(msgs) > 0 {
1724+
t.Fatalf("Standard DLQ received a redriven FIFO record (would carry MessageGroupId): %v", msgs)
1725+
}
1726+
}
1727+
16571728
func TestSQSServer_TagQueueRequiresTags(t *testing.T) {
16581729
t.Parallel()
16591730
nodes, _, _ := createNode(t, 1)

adapter/sqs_redrive.go

Lines changed: 57 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -121,38 +121,9 @@ func (s *SQSServer) redriveCandidateToDLQ(
121121
srcArn string,
122122
readTS uint64,
123123
) (bool, error) {
124-
// Self-referential RedrivePolicy is rejected at attribute-apply
125-
// time, but a defense-in-depth check here keeps the receive path
126-
// safe even against records committed before the validator
127-
// existed (or under a future relaxation of the validator).
128-
// Without this, redrive would delete and rewrite the same record
129-
// in place, looping the poison message forever.
130-
if policy.DLQName == srcQueueName {
131-
return false, newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue,
132-
"RedrivePolicy.deadLetterTargetArn must not point at the source queue")
133-
}
134-
dlqMeta, exists, err := s.loadQueueMetaAt(ctx, policy.DLQName, readTS)
124+
dlqMeta, err := s.validateRedriveTargets(ctx, srcQueueName, srcRec, policy, readTS)
135125
if err != nil {
136-
return false, errors.WithStack(err)
137-
}
138-
if !exists {
139-
// DLQ vanished between policy-set and receive. Refusing the
140-
// move keeps the source message in flight; the operator can
141-
// recreate the DLQ or detach the policy.
142-
return false, newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue,
143-
"RedrivePolicy targets non-existent DLQ "+policy.DLQName)
144-
}
145-
// FIFO DLQs require source records to carry MessageGroupId so the
146-
// DLQ-side ReceiveMessage enforces group-lock semantics. Without
147-
// this gate, a redrive from a Standard source into a FIFO DLQ
148-
// would write records with empty MessageGroupId — tryDeliverCandidate
149-
// only takes the FIFO path when MessageGroupId is non-empty, so
150-
// those messages would bypass FIFO ordering inside what clients
151-
// believe is a strictly-ordered queue. We refuse the move and
152-
// surface the misconfiguration to operators.
153-
if dlqMeta.IsFIFO && srcRec.MessageGroupId == "" {
154-
return false, newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue,
155-
"FIFO DLQ requires source records to carry MessageGroupId")
126+
return false, err
156127
}
157128
dlqRec, dlqRecordBytes, err := buildDLQRecord(srcRec, dlqMeta, srcArn)
158129
if err != nil {
@@ -171,6 +142,61 @@ func (s *SQSServer) redriveCandidateToDLQ(
171142
return true, nil
172143
}
173144

145+
// validateRedriveTargets enforces every static precondition on the
146+
// (source, DLQ, policy) triple before the OCC dispatch is built.
147+
// Returns the loaded DLQ meta on success so the caller does not have
148+
// to re-load it.
149+
//
150+
// Failure modes (all surfaced as 4xx sqsAPIError):
151+
// - self-referential RedrivePolicy (defense-in-depth against records
152+
// that predate the attribute-time validator),
153+
// - DLQ vanished between policy-set and receive,
154+
// - source queue vanished mid-redrive (DeleteQueue race),
155+
// - source/DLQ queue-type mismatch (FIFO ↔ Standard) — AWS forbids
156+
// this and runtime is the only place it can be enforced because
157+
// the catalog accepts a RedrivePolicy that names a queue created
158+
// or recreated later as a different type,
159+
// - FIFO DLQ paired with a source record lacking MessageGroupId
160+
// (defense in depth against malformed records that slip past the
161+
// type-equality check).
162+
func (s *SQSServer) validateRedriveTargets(
163+
ctx context.Context,
164+
srcQueueName string,
165+
srcRec *sqsMessageRecord,
166+
policy *parsedRedrivePolicy,
167+
readTS uint64,
168+
) (*sqsQueueMeta, error) {
169+
if policy.DLQName == srcQueueName {
170+
return nil, newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue,
171+
"RedrivePolicy.deadLetterTargetArn must not point at the source queue")
172+
}
173+
dlqMeta, dlqExists, err := s.loadQueueMetaAt(ctx, policy.DLQName, readTS)
174+
if err != nil {
175+
return nil, errors.WithStack(err)
176+
}
177+
if !dlqExists {
178+
return nil, newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue,
179+
"RedrivePolicy targets non-existent DLQ "+policy.DLQName)
180+
}
181+
srcMeta, srcExists, err := s.loadQueueMetaAt(ctx, srcQueueName, readTS)
182+
if err != nil {
183+
return nil, errors.WithStack(err)
184+
}
185+
if !srcExists {
186+
return nil, newSQSAPIError(http.StatusBadRequest, sqsErrQueueDoesNotExist,
187+
"source queue disappeared during redrive")
188+
}
189+
if srcMeta.IsFIFO != dlqMeta.IsFIFO {
190+
return nil, newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue,
191+
"RedrivePolicy queue-type mismatch: source and DLQ must both be FIFO or both Standard")
192+
}
193+
if dlqMeta.IsFIFO && srcRec.MessageGroupId == "" {
194+
return nil, newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue,
195+
"FIFO DLQ requires source records to carry MessageGroupId")
196+
}
197+
return dlqMeta, nil
198+
}
199+
174200
// buildDLQRecord assembles the DLQ-side message record. Reset
175201
// ReceiveCount and FirstReceiveMillis so the DLQ consumer sees a
176202
// fresh delivery, not the source's bounce history.

0 commit comments

Comments
 (0)