Skip to content

Commit fc35ee5

Browse files
committed
adapter/sqs: align peek partition iteration with effectivePartitionCount
Three r3 findings on PR #794: 1. Codex P2 (sqs_admin_peek.go:268). peekStartPartition and walkPeek's partition advance / termination keyed off meta.PartitionCount directly. In perQueue throughput mode (FifoThroughputLimit=perQueue) the data plane collapses every MessageGroupId to partition 0 (see effectivePartitionCount in sqs_keys_dispatch.go); peek still rotated over all N partitions, wasting up to N-1 guaranteed-empty ScanAt calls per peek (31 extra scans at PartitionCount=32). Switch to effectivePartitionCount(meta) in peekStartPartition, walkPeek's partition advance, and preparePeekCursor's bounds check. Caller audit: effectivePartitionCount is the same helper receiveMessage uses for its fanout loop (sqs_messages.go:885), so peek and receive now agree on partition usage. 2. CodeRabbit minor (test order-coupling). HappyPath asserted row ordering matched send order, but vis-index entries with the same visible_at millisecond tie-break on message_id (random hex) — a timing-sensitive flake. Replace ordered loop with set-based assertion via assertPeekRowsAsSet. 3. CodeRabbit minor (counter type assertion panic). The fanout counter test used `counter, _ := v.(*atomic.Uint32)` then counter.Load(), which panics if the assertion fails. Extract loadFanoutCounter helper that fails the test explicitly on bad type / nil rather than panicking. Tests: TestAdminPeekQueue_PerQueueFIFOCollapsesToOnePartition pins the perQueue functional contract end-to-end. TestPreparePeekCursor_PerQueueCollapse pins the cursor codec's effective-bounds validation (a Partition=2 cursor against a perQueue PartitionCount=4 queue is rejected — even though the raw PartitionCount would otherwise allow it).
1 parent 3e6841e commit fc35ee5

2 files changed

Lines changed: 157 additions & 54 deletions

File tree

adapter/sqs_admin_peek.go

Lines changed: 42 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -262,40 +262,56 @@ func (s *SQSServer) AdminPeekQueue(
262262
// peek pages between receives lands every receive on the same
263263
// partition, starving the other three).
264264
func (s *SQSServer) peekStartPartition(queueName string, cursor *peekCursor, meta *sqsQueueMeta) uint32 {
265-
if cursor != nil || meta.PartitionCount <= 1 {
265+
if cursor != nil {
266266
return 0
267267
}
268-
return s.nextReceiveFanoutStart(queueName, meta.PartitionCount)
268+
effective := effectivePartitionCount(meta)
269+
if effective <= 1 {
270+
return 0
271+
}
272+
return s.nextReceiveFanoutStart(queueName, effective)
269273
}
270274

271275
// preparePeekCursor builds the effective cursor for this call. On the
272276
// first page (cursor==nil) it stamps Generation + a rotated
273-
// StartPartition (partitioned queues only — non-partitioned start at
274-
// 0). On a follow-up page it validates the stored Generation matches
275-
// the queue's current generation AND that StartPartition / Partition
276-
// are within [0, max(PartitionCount, 1)); a mismatch on either
277+
// StartPartition (partitioned queues only — non-partitioned and
278+
// perQueue-throughput FIFO queues collapse to partition 0). On a
279+
// follow-up page it validates the stored Generation matches the
280+
// queue's current generation AND that StartPartition / Partition are
281+
// within [0, effectivePartitionCount(meta)); a mismatch on either
277282
// returns ErrAdminSQSValidation.
278283
//
284+
// effectivePartitionCount (not meta.PartitionCount) is the
285+
// authoritative iteration bound. perQueue FIFO mode collapses every
286+
// MessageGroupId to partition 0 (see partitionFor /
287+
// effectivePartitionCount), so partitions 1..N-1 are guaranteed empty
288+
// for those queues; walking them would be pointless read
289+
// amplification (Codex r3 P2). Keying validation off the effective
290+
// count keeps peek aligned with the data-plane's actual partition
291+
// usage.
292+
//
279293
// Bounds-check rationale: walkPeek terminates the partitioned
280-
// rotation when `(Partition + 1) % PartitionCount == StartPartition`.
281-
// If a client supplies StartPartition outside [0, PartitionCount),
282-
// that termination condition never fires and the call loops
283-
// ScanAt-by-ScanAt over guaranteed-empty partitions forever — a
284-
// request-amplification DoS against the admin endpoint (Codex r1
285-
// P1 on PR #794). Rejecting bad cursor partition indices up-front
286-
// closes the vector. Generation mismatch separately forces a
287-
// front-of-stream refresh after a purge.
294+
// rotation when `(Partition + 1) % effectivePartitionCount ==
295+
// StartPartition`. If a client supplies StartPartition outside
296+
// [0, effectivePartitionCount), that termination condition never
297+
// fires and the call loops ScanAt-by-ScanAt over guaranteed-empty
298+
// partitions forever — a request-amplification DoS against the
299+
// admin endpoint (Codex r1 P1 on PR #794). Rejecting bad cursor
300+
// partition indices up-front closes the vector. Generation mismatch
301+
// separately forces a front-of-stream refresh after a purge.
288302
//
289303
// startPartition is computed by the caller (the SQSServer method's
290-
// nextReceiveFanoutStart) so this helper stays method-free for
291-
// unit-testability. Non-partitioned queues pass 0.
304+
// peekStartPartition wrapper around nextReceiveFanoutStart) so this
305+
// helper stays method-free for unit-testability. Non-partitioned and
306+
// perQueue-collapsed queues pass 0.
292307
func preparePeekCursor(cursor *peekCursor, meta *sqsQueueMeta, startPartition uint32) (*peekCursor, error) {
308+
effective := effectivePartitionCount(meta)
293309
if cursor == nil {
294310
out := &peekCursor{
295311
V: peekCursorSchemaV1,
296312
Generation: meta.Generation,
297313
}
298-
if meta.PartitionCount > 1 {
314+
if effective > 1 {
299315
out.StartPartition = startPartition
300316
out.Partition = startPartition
301317
}
@@ -305,18 +321,10 @@ func preparePeekCursor(cursor *peekCursor, meta *sqsQueueMeta, startPartition ui
305321
return nil, errors.Wrap(ErrAdminSQSValidation,
306322
"admin peek: cursor is from a prior generation; restart from the front")
307323
}
308-
maxPartition := meta.PartitionCount
309-
if maxPartition <= 1 {
310-
// Non-partitioned queues only have partition 0. A non-zero
311-
// Partition or StartPartition would also fail the rotation
312-
// termination check; reject explicitly so the error is the
313-
// documented 400, not a silent O(infty) loop.
314-
maxPartition = 1
315-
}
316-
if cursor.StartPartition >= maxPartition || cursor.Partition >= maxPartition {
324+
if cursor.StartPartition >= effective || cursor.Partition >= effective {
317325
return nil, errors.Wrapf(ErrAdminSQSValidation,
318326
"admin peek: cursor partition index out of range (StartPartition=%d, Partition=%d, max=%d)",
319-
cursor.StartPartition, cursor.Partition, maxPartition)
327+
cursor.StartPartition, cursor.Partition, effective)
320328
}
321329
return cursor, nil
322330
}
@@ -350,11 +358,15 @@ func (s *SQSServer) walkPeek(
350358
cursor.LastKey = next
351359
return rows, cursor, nil
352360
}
353-
// Partition exhausted before Limit reached.
354-
if meta.PartitionCount <= 1 {
361+
// Partition exhausted before Limit reached. effectivePartitionCount
362+
// (not meta.PartitionCount) caps the rotation so perQueue-collapsed
363+
// FIFO queues stop after partition 0 instead of walking N-1
364+
// guaranteed-empty partitions (Codex r3 P2 on PR #794).
365+
effective := effectivePartitionCount(meta)
366+
if effective <= 1 {
355367
return rows, nil, nil
356368
}
357-
nextPart := (cursor.Partition + 1) % meta.PartitionCount
369+
nextPart := (cursor.Partition + 1) % effective
358370
if nextPart == cursor.StartPartition {
359371
return rows, nil, nil
360372
}

adapter/sqs_admin_peek_test.go

Lines changed: 115 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -46,30 +46,38 @@ func TestAdminPeekQueue_HappyPath(t *testing.T) {
4646
if nextCursor != "" {
4747
t.Fatalf("nextCursor=%q want empty (queue drained in one page)", nextCursor)
4848
}
49-
for i, row := range rows {
50-
assertPeekRowMatchesIndexedBody(t, i, row, "body-")
51-
}
49+
want := map[string]struct{}{"body-0": {}, "body-1": {}, "body-2": {}}
50+
assertPeekRowsAsSet(t, rows, want)
5251
}
5352

54-
// assertPeekRowMatchesIndexedBody pins the per-row invariants the
55-
// happy path expects: the body matches bodyPrefix+i, no truncation
56-
// fired (the bodies are short), MessageID is populated, and
57-
// SentTimestamp is non-zero. Pulled into a helper so
58-
// TestAdminPeekQueue_HappyPath stays under the cyclop budget.
59-
func assertPeekRowMatchesIndexedBody(t *testing.T, idx int, row AdminPeekedMessage, bodyPrefix string) {
53+
// assertPeekRowsAsSet pins the per-row invariants (MessageID
54+
// populated, SentTimestamp non-zero, no truncation) AND the set of
55+
// observed bodies, without coupling to row order. Vis-index entries
56+
// with the same visible_at millisecond tie-break on message_id which
57+
// is random, so an order-sensitive assertion is flaky under fast
58+
// sends — CodeRabbit r3 caught the earlier ordered-loop version.
59+
func assertPeekRowsAsSet(t *testing.T, rows []AdminPeekedMessage, want map[string]struct{}) {
6060
t.Helper()
61-
want := bodyPrefix + strconv.Itoa(idx)
62-
if row.Body != want {
63-
t.Fatalf("rows[%d].Body=%q want %q", idx, row.Body, want)
64-
}
65-
if row.BodyTruncated {
66-
t.Fatalf("rows[%d].BodyTruncated=true; want false for %d-byte body", idx, len(row.Body))
61+
got := make(map[string]struct{}, len(rows))
62+
for i, row := range rows {
63+
if row.BodyTruncated {
64+
t.Fatalf("rows[%d].BodyTruncated=true; want false for short body", i)
65+
}
66+
if row.MessageID == "" {
67+
t.Fatalf("rows[%d].MessageID empty", i)
68+
}
69+
if row.SentTimestamp.IsZero() {
70+
t.Fatalf("rows[%d].SentTimestamp zero", i)
71+
}
72+
got[row.Body] = struct{}{}
6773
}
68-
if row.MessageID == "" {
69-
t.Fatalf("rows[%d].MessageID empty", idx)
74+
for w := range want {
75+
if _, ok := got[w]; !ok {
76+
t.Fatalf("missing body %q in observed set %v", w, got)
77+
}
7078
}
71-
if row.SentTimestamp.IsZero() {
72-
t.Fatalf("rows[%d].SentTimestamp zero", idx)
79+
if len(got) != len(want) {
80+
t.Fatalf("observed set %v != expected set %v", got, want)
7381
}
7482
}
7583

@@ -826,6 +834,24 @@ func TestPreparePeekCursor_PartitionOutOfRange(t *testing.T) {
826834
})
827835
}
828836

837+
// loadFanoutCounter retrieves the per-queue receive fanout counter
838+
// the server's receive path uses for partition rotation. Returns the
839+
// counter or fails the test if it is absent / has an unexpected
840+
// type. Pulled into a helper so callers that read-then-compare the
841+
// counter value stay under the cyclop budget.
842+
func loadFanoutCounter(t *testing.T, node Node, queueName string) *atomic.Uint32 {
843+
t.Helper()
844+
v, ok := node.sqsServer.receiveFanoutCounters.Load(queueName)
845+
if !ok {
846+
t.Fatalf("fanout counter missing for queue %q (bump-on-first-page contract broken)", queueName)
847+
}
848+
counter, ok := v.(*atomic.Uint32)
849+
if !ok || counter == nil {
850+
t.Fatalf("fanout counter for %q has unexpected type %T (want *atomic.Uint32)", queueName, v)
851+
}
852+
return counter
853+
}
854+
829855
// TestAdminPeekQueue_ContinuationDoesNotBumpFanoutCounter pins Codex
830856
// r2 P1: AdminPeekQueue must NOT advance the shared
831857
// receiveFanoutCounters counter on continuation pages. The counter
@@ -863,11 +889,7 @@ func TestAdminPeekQueue_ContinuationDoesNotBumpFanoutCounter(t *testing.T) {
863889
t.Fatalf("first peek: rows=%d cursor=%q — need both populated to exercise continuation", len(rows), cursor)
864890
}
865891

866-
v, ok := node.sqsServer.receiveFanoutCounters.Load(name)
867-
if !ok {
868-
t.Fatalf("first peek did not create the fanout counter; the bump-on-first-page contract is broken")
869-
}
870-
counter, _ := v.(*atomic.Uint32)
892+
counter := loadFanoutCounter(t, node, name)
871893
before := counter.Load()
872894

873895
// Drive 5 continuation calls; each must not move the counter.
@@ -887,6 +909,75 @@ func TestAdminPeekQueue_ContinuationDoesNotBumpFanoutCounter(t *testing.T) {
887909
}
888910
}
889911

912+
// TestAdminPeekQueue_PerQueueFIFOCollapsesToOnePartition pins Codex
913+
// r3 P2: a partitioned FIFO queue in perQueue throughput mode
914+
// collapses every MessageGroupId to partition 0 (see
915+
// effectivePartitionCount). Peek must align with the data plane: it
916+
// walks only the effective partitions, not the raw PartitionCount
917+
// count, so a PartitionCount=4 perQueue queue does not waste 3
918+
// guaranteed-empty ScanAt calls on partitions 1..3 per peek call.
919+
//
920+
// Functional assertion: peek succeeds end-to-end on a perQueue queue
921+
// and surfaces every sent message. The optimization itself is
922+
// internal — a follow-up that re-introduces the meta.PartitionCount
923+
// rotation would still pass this assertion but the (Partition+1) %
924+
// effective termination encoded in walkPeek would still cap the
925+
// scan, so the regression vector is closed by the cursor codec
926+
// validation (effective bounds) being the chokepoint.
927+
func TestAdminPeekQueue_PerQueueFIFOCollapsesToOnePartition(t *testing.T) {
928+
t.Parallel()
929+
nodes, _, _ := createNode(t, 1)
930+
defer shutdown(nodes)
931+
node := sqsLeaderNode(t, nodes)
932+
933+
const name = "peek-perqueue.fifo"
934+
status, out := callSQS(t, node, sqsCreateQueueTarget, map[string]any{
935+
"QueueName": name,
936+
"Attributes": map[string]string{"FifoQueue": "true"},
937+
})
938+
if status != http.StatusOK {
939+
t.Fatalf("create: %d %v", status, out)
940+
}
941+
queueURL, _ := out["QueueUrl"].(string)
942+
installPartitionedMetaForTest(t, node, name, 4, htfifoThroughputPerQueue)
943+
944+
groups := []string{"alpha", "beta", "gamma"}
945+
sent := sendFIFOMessagesForPeek(t, node, queueURL, groups)
946+
seen := walkPeekUntilDone(t, context.Background(), node, name, 10)
947+
assertEveryMessageSeenOnce(t, sent, seen)
948+
}
949+
950+
// TestPreparePeekCursor_PerQueueCollapse confirms the cursor codec's
951+
// validation key off effectivePartitionCount, not the raw
952+
// meta.PartitionCount. A perQueue queue with PartitionCount=4 must
953+
// reject any cursor with Partition > 0 (the effective bound is 1)
954+
// even though meta.PartitionCount itself is 4.
955+
func TestPreparePeekCursor_PerQueueCollapse(t *testing.T) {
956+
t.Parallel()
957+
958+
t.Run("fresh cursor on perQueue stamps StartPartition=0", func(t *testing.T) {
959+
t.Parallel()
960+
meta := &sqsQueueMeta{Generation: 1, PartitionCount: 4, FifoThroughputLimit: htfifoThroughputPerQueue}
961+
out, err := preparePeekCursor(nil, meta, 2)
962+
if err != nil {
963+
t.Fatalf("err: %v", err)
964+
}
965+
if out.StartPartition != 0 || out.Partition != 0 {
966+
t.Fatalf("perQueue fresh cursor=%+v want StartPartition=Partition=0 (effective=1)", out)
967+
}
968+
})
969+
970+
t.Run("perQueue rejects cursor.Partition >= 1", func(t *testing.T) {
971+
t.Parallel()
972+
meta := &sqsQueueMeta{Generation: 1, PartitionCount: 4, FifoThroughputLimit: htfifoThroughputPerQueue}
973+
cursor := &peekCursor{V: peekCursorSchemaV1, Generation: 1, StartPartition: 0, Partition: 2}
974+
_, err := preparePeekCursor(cursor, meta, 0)
975+
if !errors.Is(err, ErrAdminSQSValidation) {
976+
t.Fatalf("perQueue Partition=2 (raw PartitionCount=4 would allow): want ErrAdminSQSValidation, got %v", err)
977+
}
978+
})
979+
}
980+
890981
// TestAdminPeekQueue_HostileCursorBoundedRequest is the end-to-end
891982
// regression: an attacker crafts a wire-level cursor with an
892983
// out-of-range partition and submits it. The call MUST return

0 commit comments

Comments
 (0)