Skip to content

Commit ddb4e6e

Browse files
committed
adapter/sqs: validate peek cursor LastKey + add JSON tags to wire types
Two r4 findings on PR #794: 1. Codex P2 (sqs_admin_peek.go:401). preparePeekCursor only validated Generation and partition indices; cursor.LastKey was trusted directly as the ScanAt start key, checked only against the upper bound. A forged LastKey below the queue's visibility-index prefix would let ScanAt walk unrelated key ranges, triggering up to Limit extra GetAt misses per call - read amplification against the leader. preparePeekCursor now rejects with ErrAdminSQSValidation when cursor.LastKey is not prefixed by sqsMsgVisPrefixForQueueDispatch(meta, queueName, Partition, Generation). The check catches three classes of forgery: completely arbitrary bytes (e.g. "aaaa"), a valid LastKey from a different queue (prefix encodes queue name), and a valid LastKey from a different partition of the same queue. preparePeekCursor's signature gains queueName; the only existing caller (AdminPeekQueue) and the existing unit tests were updated. 2. CodeRabbit Major (sqs_admin_peek.go:35-45, outside diff). AdminPeekedMessage lacked JSON tags, so json.Marshal emitted Go-style PascalCase ("MessageID", "BodyTruncated", ...) instead of the snake_case wire shape the design doc §3.5 specifies (the SPA's client adapter expects the spec'd form). Also empty Attributes / GroupID / DeduplicationID would serialize as "null" / "" instead of being omitted. JSON tags added with appropriate omitempty. Caller audit (semantic-change rule): preparePeekCursor signature changed from (cursor, meta, startPartition) to (cursor, meta, queueName, startPartition). One non-test caller (AdminPeekQueue at sqs_admin_peek.go:240) and 10 test call sites updated. The new queueName parameter is purely additive validation; it does not alter the cursor's stamping or generation-mismatch behavior. Tests: TestAdminPeekedMessage_JSONWireFormat / TestAdminPeekedMessage_JSONOmitsEmptyAttributes pin the JSON wire shape. TestPreparePeekCursor_PartitionOutOfRange gains three new sub-cases (forged LastKey, foreign-queue LastKey, mismatched- partition LastKey). The test function was rewritten as a table-driven loop so it stays under the cyclop budget.
1 parent fc35ee5 commit ddb4e6e

2 files changed

Lines changed: 150 additions & 59 deletions

File tree

adapter/sqs_admin_peek.go

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,21 @@ type AdminPeekedAttribute struct {
3131
BinaryValue []byte `json:"binary_value,omitempty"`
3232
}
3333

34-
// AdminPeekedMessage is one row in the peek result.
34+
// AdminPeekedMessage is one row in the peek result. JSON tags pin
35+
// the snake_case wire shape the design doc §3.5 specifies; without
36+
// them the encoder would emit Go-style PascalCase field names and
37+
// the SPA's client adapter would silently misparse every row.
38+
// CodeRabbit r4 caught the regression.
3539
type AdminPeekedMessage struct {
36-
MessageID string
37-
Body string // truncated per opts.BodyMaxBytes
38-
BodyTruncated bool // true when Body was cut
39-
BodyOriginalSize int64 // bytes in the original body, for display
40-
SentTimestamp time.Time // SQS SentTimestamp
41-
ReceiveCount int32 // ApproximateReceiveCount
42-
GroupID string // FIFO MessageGroupId, empty for standard
43-
DeduplicationID string // FIFO MessageDeduplicationId, empty for standard
44-
Attributes map[string]AdminPeekedAttribute // typed SQS message attributes
40+
MessageID string `json:"message_id"`
41+
Body string `json:"body"` // truncated per opts.BodyMaxBytes
42+
BodyTruncated bool `json:"body_truncated"` // true when Body was cut
43+
BodyOriginalSize int64 `json:"body_original_size"` // bytes in the original body, for display
44+
SentTimestamp time.Time `json:"sent_timestamp"` // SQS SentTimestamp
45+
ReceiveCount int32 `json:"receive_count"` // ApproximateReceiveCount
46+
GroupID string `json:"group_id,omitempty"` // FIFO MessageGroupId, empty for standard
47+
DeduplicationID string `json:"deduplication_id,omitempty"` // FIFO MessageDeduplicationId, empty for standard
48+
Attributes map[string]AdminPeekedAttribute `json:"attributes,omitempty"` // typed SQS message attributes
4549
}
4650

4751
// AdminPeekMessageOptions controls a peek call. Zero values map to
@@ -233,7 +237,7 @@ func (s *SQSServer) AdminPeekQueue(
233237
if !exists {
234238
return nil, "", ErrAdminSQSNotFound
235239
}
236-
cursor, err = preparePeekCursor(cursor, meta, s.peekStartPartition(name, cursor, meta))
240+
cursor, err = preparePeekCursor(cursor, meta, name, s.peekStartPartition(name, cursor, meta))
237241
if err != nil {
238242
return nil, "", err
239243
}
@@ -304,7 +308,15 @@ func (s *SQSServer) peekStartPartition(queueName string, cursor *peekCursor, met
304308
// peekStartPartition wrapper around nextReceiveFanoutStart) so this
305309
// helper stays method-free for unit-testability. Non-partitioned and
306310
// perQueue-collapsed queues pass 0.
307-
func preparePeekCursor(cursor *peekCursor, meta *sqsQueueMeta, startPartition uint32) (*peekCursor, error) {
311+
//
312+
// queueName is consumed to validate cursor.LastKey on continuation
313+
// pages: a forged LastKey outside the queue's visibility-index
314+
// prefix would otherwise let an attacker start a ScanAt at any byte
315+
// offset in the leader's keyspace (Codex r4 P2). Continuation
316+
// cursors are admin-supplied and signed only by base64-encoding, so
317+
// every field must be validated against the live queue meta before
318+
// being used as a storage cursor.
319+
func preparePeekCursor(cursor *peekCursor, meta *sqsQueueMeta, queueName string, startPartition uint32) (*peekCursor, error) {
308320
effective := effectivePartitionCount(meta)
309321
if cursor == nil {
310322
out := &peekCursor{
@@ -326,6 +338,13 @@ func preparePeekCursor(cursor *peekCursor, meta *sqsQueueMeta, startPartition ui
326338
"admin peek: cursor partition index out of range (StartPartition=%d, Partition=%d, max=%d)",
327339
cursor.StartPartition, cursor.Partition, effective)
328340
}
341+
if len(cursor.LastKey) > 0 {
342+
expectedPrefix := sqsMsgVisPrefixForQueueDispatch(meta, queueName, cursor.Partition, meta.Generation)
343+
if !bytes.HasPrefix(cursor.LastKey, expectedPrefix) {
344+
return nil, errors.Wrap(ErrAdminSQSValidation,
345+
"admin peek: cursor LastKey is outside the queue's visibility-index prefix")
346+
}
347+
}
329348
return cursor, nil
330349
}
331350

adapter/sqs_admin_peek_test.go

Lines changed: 119 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -727,6 +727,80 @@ func TestClampPeekBodyBytes(t *testing.T) {
727727
}
728728
}
729729

730+
// TestAdminPeekedMessage_JSONWireFormat pins the snake_case wire
731+
// shape the design doc §3.5 specifies. Without explicit JSON tags
732+
// the encoder emits Go-style PascalCase ("MessageID", "BodyTruncated"
733+
// …) and the SPA's client adapter silently misparses every row
734+
// (CodeRabbit r4 caught the regression).
735+
func TestAdminPeekedMessage_JSONWireFormat(t *testing.T) {
736+
t.Parallel()
737+
in := AdminPeekedMessage{
738+
MessageID: "m1",
739+
Body: "hello",
740+
BodyTruncated: true,
741+
BodyOriginalSize: 42,
742+
SentTimestamp: time.UnixMilli(1_700_000_000_000).UTC(),
743+
ReceiveCount: 3,
744+
GroupID: "g1",
745+
DeduplicationID: "d1",
746+
Attributes: map[string]AdminPeekedAttribute{
747+
"Source": {DataType: sqsAttributeBaseTypeString, StringValue: "checkout"},
748+
},
749+
}
750+
raw, err := json.Marshal(in)
751+
if err != nil {
752+
t.Fatalf("marshal: %v", err)
753+
}
754+
var got map[string]any
755+
if err := json.Unmarshal(raw, &got); err != nil {
756+
t.Fatalf("unmarshal: %v", err)
757+
}
758+
wantKeys := []string{
759+
"message_id", "body", "body_truncated", "body_original_size",
760+
"sent_timestamp", "receive_count", "group_id", "deduplication_id",
761+
"attributes",
762+
}
763+
for _, k := range wantKeys {
764+
if _, ok := got[k]; !ok {
765+
t.Fatalf("missing key %q in wire JSON; got=%v", k, got)
766+
}
767+
}
768+
// PascalCase keys must NOT appear.
769+
for _, k := range []string{"MessageID", "Body", "BodyTruncated", "GroupID", "Attributes"} {
770+
if _, ok := got[k]; ok {
771+
t.Fatalf("unwanted PascalCase key %q in wire JSON; got=%v", k, got)
772+
}
773+
}
774+
// Nested AdminPeekedAttribute also uses snake_case.
775+
attrs, _ := got["attributes"].(map[string]any)
776+
src, _ := attrs["Source"].(map[string]any)
777+
if _, ok := src["data_type"]; !ok {
778+
t.Fatalf("Source attribute missing 'data_type' key; got=%v", src)
779+
}
780+
}
781+
782+
// TestAdminPeekedMessage_JSONOmitsEmptyAttributes pins the
783+
// omitempty contract: empty Attributes / GroupID / DeduplicationID
784+
// must NOT appear on the wire (the SPA renders the field's absence,
785+
// not a "null" or "{}" placeholder).
786+
func TestAdminPeekedMessage_JSONOmitsEmptyAttributes(t *testing.T) {
787+
t.Parallel()
788+
in := AdminPeekedMessage{MessageID: "m1", Body: "x"}
789+
raw, err := json.Marshal(in)
790+
if err != nil {
791+
t.Fatalf("marshal: %v", err)
792+
}
793+
var got map[string]any
794+
if err := json.Unmarshal(raw, &got); err != nil {
795+
t.Fatalf("unmarshal: %v", err)
796+
}
797+
for _, k := range []string{"group_id", "deduplication_id", "attributes"} {
798+
if _, ok := got[k]; ok {
799+
t.Fatalf("empty %q must be omitted from wire JSON; got=%v", k, got)
800+
}
801+
}
802+
}
803+
730804
// TestPreparePeekCursor_FreshCursor confirms the first-call cursor
731805
// stamps Generation and (for partitioned queues) StartPartition.
732806
func TestPreparePeekCursor_FreshCursor(t *testing.T) {
@@ -735,7 +809,7 @@ func TestPreparePeekCursor_FreshCursor(t *testing.T) {
735809
t.Run("non-partitioned", func(t *testing.T) {
736810
t.Parallel()
737811
meta := &sqsQueueMeta{Generation: 7}
738-
out, err := preparePeekCursor(nil, meta, 0)
812+
out, err := preparePeekCursor(nil, meta, "test", 0)
739813
if err != nil {
740814
t.Fatalf("err: %v", err)
741815
}
@@ -747,7 +821,7 @@ func TestPreparePeekCursor_FreshCursor(t *testing.T) {
747821
t.Run("partitioned", func(t *testing.T) {
748822
t.Parallel()
749823
meta := &sqsQueueMeta{Generation: 11, PartitionCount: 4}
750-
out, err := preparePeekCursor(nil, meta, 2)
824+
out, err := preparePeekCursor(nil, meta, "test", 2)
751825
if err != nil {
752826
t.Fatalf("err: %v", err)
753827
}
@@ -763,7 +837,7 @@ func TestPreparePeekCursor_GenerationMismatch(t *testing.T) {
763837
t.Parallel()
764838
stale := &peekCursor{V: peekCursorSchemaV1, Generation: 3}
765839
meta := &sqsQueueMeta{Generation: 4}
766-
_, err := preparePeekCursor(stale, meta, 0)
840+
_, err := preparePeekCursor(stale, meta, "test", 0)
767841
if !errors.Is(err, ErrAdminSQSValidation) {
768842
t.Fatalf("want ErrAdminSQSValidation, got %v", err)
769843
}
@@ -779,52 +853,50 @@ func TestPreparePeekCursor_GenerationMismatch(t *testing.T) {
779853
// endpoint).
780854
func TestPreparePeekCursor_PartitionOutOfRange(t *testing.T) {
781855
t.Parallel()
782-
783-
t.Run("partitioned: StartPartition >= PartitionCount", func(t *testing.T) {
784-
t.Parallel()
785-
meta := &sqsQueueMeta{Generation: 1, PartitionCount: 4}
786-
cursor := &peekCursor{V: peekCursorSchemaV1, Generation: 1, StartPartition: 99, Partition: 0}
787-
_, err := preparePeekCursor(cursor, meta, 0)
788-
if !errors.Is(err, ErrAdminSQSValidation) {
789-
t.Fatalf("StartPartition=99/PC=4: want ErrAdminSQSValidation, got %v", err)
790-
}
791-
})
792-
793-
t.Run("partitioned: Partition >= PartitionCount", func(t *testing.T) {
794-
t.Parallel()
795-
meta := &sqsQueueMeta{Generation: 1, PartitionCount: 4}
796-
cursor := &peekCursor{V: peekCursorSchemaV1, Generation: 1, StartPartition: 0, Partition: 99}
797-
_, err := preparePeekCursor(cursor, meta, 0)
798-
if !errors.Is(err, ErrAdminSQSValidation) {
799-
t.Fatalf("Partition=99/PC=4: want ErrAdminSQSValidation, got %v", err)
800-
}
801-
})
802-
803-
t.Run("non-partitioned: non-zero StartPartition", func(t *testing.T) {
804-
t.Parallel()
805-
meta := &sqsQueueMeta{Generation: 1, PartitionCount: 0}
806-
cursor := &peekCursor{V: peekCursorSchemaV1, Generation: 1, StartPartition: 1, Partition: 0}
807-
_, err := preparePeekCursor(cursor, meta, 0)
808-
if !errors.Is(err, ErrAdminSQSValidation) {
809-
t.Fatalf("non-partitioned StartPartition=1: want ErrAdminSQSValidation, got %v", err)
810-
}
811-
})
812-
813-
t.Run("non-partitioned: non-zero Partition", func(t *testing.T) {
814-
t.Parallel()
815-
meta := &sqsQueueMeta{Generation: 1, PartitionCount: 1}
816-
cursor := &peekCursor{V: peekCursorSchemaV1, Generation: 1, StartPartition: 0, Partition: 5}
817-
_, err := preparePeekCursor(cursor, meta, 0)
818-
if !errors.Is(err, ErrAdminSQSValidation) {
819-
t.Fatalf("non-partitioned Partition=5: want ErrAdminSQSValidation, got %v", err)
820-
}
821-
})
856+
pc4 := &sqsQueueMeta{Generation: 1, PartitionCount: 4}
857+
pc0 := &sqsQueueMeta{Generation: 1, PartitionCount: 0}
858+
pc1 := &sqsQueueMeta{Generation: 1, PartitionCount: 1}
859+
// foreignKey / mismatchedKey share a "first byte not in the
860+
// admin prefix" property with "aaaa" so the test exercises all
861+
// three forged-LastKey classes the bounds check catches.
862+
foreignKey := append(sqsMsgVisPrefixForQueueDispatch(pc4, "queue-X", 0, 1), 'X')
863+
mismatchedKey := append(sqsMsgVisPrefixForQueueDispatch(pc4, "queue-A", 2, 1), 'X')
864+
865+
cases := []struct {
866+
name string
867+
meta *sqsQueueMeta
868+
queue string
869+
cursor *peekCursor
870+
}{
871+
{"partitioned: StartPartition >= PartitionCount", pc4, "queue-A",
872+
&peekCursor{V: peekCursorSchemaV1, Generation: 1, StartPartition: 99, Partition: 0}},
873+
{"partitioned: Partition >= PartitionCount", pc4, "queue-A",
874+
&peekCursor{V: peekCursorSchemaV1, Generation: 1, StartPartition: 0, Partition: 99}},
875+
{"non-partitioned: non-zero StartPartition", pc0, "queue-A",
876+
&peekCursor{V: peekCursorSchemaV1, Generation: 1, StartPartition: 1, Partition: 0}},
877+
{"non-partitioned: non-zero Partition", pc1, "queue-A",
878+
&peekCursor{V: peekCursorSchemaV1, Generation: 1, StartPartition: 0, Partition: 5}},
879+
{"forged LastKey outside partition prefix (Codex r4 P2)", pc4, "queue-A",
880+
&peekCursor{V: peekCursorSchemaV1, Generation: 1, StartPartition: 0, Partition: 0, LastKey: []byte("aaaa")}},
881+
{"LastKey from a different queue", pc4, "queue-A",
882+
&peekCursor{V: peekCursorSchemaV1, Generation: 1, StartPartition: 0, Partition: 0, LastKey: foreignKey}},
883+
{"LastKey from a different partition", pc4, "queue-A",
884+
&peekCursor{V: peekCursorSchemaV1, Generation: 1, StartPartition: 0, Partition: 0, LastKey: mismatchedKey}},
885+
}
886+
for _, tc := range cases {
887+
t.Run(tc.name, func(t *testing.T) {
888+
t.Parallel()
889+
_, err := preparePeekCursor(tc.cursor, tc.meta, tc.queue, 0)
890+
if !errors.Is(err, ErrAdminSQSValidation) {
891+
t.Fatalf("%s: want ErrAdminSQSValidation, got %v", tc.name, err)
892+
}
893+
})
894+
}
822895

823896
t.Run("partitioned: in-range cursor accepted", func(t *testing.T) {
824897
t.Parallel()
825-
meta := &sqsQueueMeta{Generation: 1, PartitionCount: 4}
826898
cursor := &peekCursor{V: peekCursorSchemaV1, Generation: 1, StartPartition: 3, Partition: 2}
827-
out, err := preparePeekCursor(cursor, meta, 0)
899+
out, err := preparePeekCursor(cursor, pc4, "queue-A", 0)
828900
if err != nil {
829901
t.Fatalf("in-range cursor: got err %v want nil", err)
830902
}
@@ -958,7 +1030,7 @@ func TestPreparePeekCursor_PerQueueCollapse(t *testing.T) {
9581030
t.Run("fresh cursor on perQueue stamps StartPartition=0", func(t *testing.T) {
9591031
t.Parallel()
9601032
meta := &sqsQueueMeta{Generation: 1, PartitionCount: 4, FifoThroughputLimit: htfifoThroughputPerQueue}
961-
out, err := preparePeekCursor(nil, meta, 2)
1033+
out, err := preparePeekCursor(nil, meta, "test", 2)
9621034
if err != nil {
9631035
t.Fatalf("err: %v", err)
9641036
}
@@ -971,7 +1043,7 @@ func TestPreparePeekCursor_PerQueueCollapse(t *testing.T) {
9711043
t.Parallel()
9721044
meta := &sqsQueueMeta{Generation: 1, PartitionCount: 4, FifoThroughputLimit: htfifoThroughputPerQueue}
9731045
cursor := &peekCursor{V: peekCursorSchemaV1, Generation: 1, StartPartition: 0, Partition: 2}
974-
_, err := preparePeekCursor(cursor, meta, 0)
1046+
_, err := preparePeekCursor(cursor, meta, "test", 0)
9751047
if !errors.Is(err, ErrAdminSQSValidation) {
9761048
t.Fatalf("perQueue Partition=2 (raw PartitionCount=4 would allow): want ErrAdminSQSValidation, got %v", err)
9771049
}

0 commit comments

Comments
 (0)