Skip to content

Commit fbd68a6

Browse files
committed
fix(sqs/htfifo): address PR #681 review feedback (Gemini medium x3)
Three medium-priority Gemini findings + one cyclop fix from the rebase onto the updated PR #679 throttling branch. (1) sqsErrInvalidParameterValue removed (sqs_catalog.go, sqs_partitioning.go, sqs_partitioning_test.go): the constant duplicated sqsErrValidation which already carries "InvalidParameterValue". Reuses the existing constant; comment on the validator clarifies the choice. (2) snapshotImmutableHTFIFO heap allocation gated on htfifoAttributesPresent (sqs_catalog.go): the snapshot is only needed when the request actually carries an HT-FIFO attribute. A SetQueueAttributes that touches only mutable attributes (e.g. VisibilityTimeout — the common path) now skips the allocation entirely. The validator short-circuit on preApply == nil keeps correctness identical: a request with no HT-FIFO attribute can never change an HT-FIFO field. (3) partitionFor inlined FNV-1a (sqs_partitioning.go): the hash/fnv.New64a + h.Write([]byte(messageGroupID)) path heap- allocates the byte slice and pays the hash.Hash interface dispatch on every SendMessage. Replaced with a manual FNV-1a loop over the string — measurably faster on the routing hot path. MessageGroupId is capped at 128 chars by validation so the loop is bounded. (4) cyclop (trySetQueueAttributesOnce): the conditional preApply branch + immutability check + Throttle validator pushed cyclop to 12. Extracted applyAndValidateSetAttributes so the function stays under the ceiling; the helper is also useful for any future admin-surface callers that want the same apply+validate flow without the OCC dispatch. All tests pass under -race; golangci-lint clean.
1 parent d5c0696 commit fbd68a6

3 files changed

Lines changed: 59 additions & 37 deletions

File tree

adapter/sqs_catalog.go

Lines changed: 37 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,6 @@ const (
6060
sqsErrQueueDoesNotExist = "AWS.SimpleQueueService.NonExistentQueue"
6161
sqsErrInvalidAttributeName = "InvalidAttributeName"
6262
sqsErrInvalidAttributeValue = "InvalidAttributeValue"
63-
// sqsErrInvalidParameterValue is the AWS code for incoherent
64-
// parameter combinations (vs malformed individual values, which
65-
// use sqsErrInvalidAttributeValue). HT-FIFO uses this for the
66-
// {PartitionCount > 1, DeduplicationScope = "queue"} cross-
67-
// attribute rejection.
68-
sqsErrInvalidParameterValue = "InvalidParameterValue"
6963
)
7064

7165
var sqsQueueNamePattern = regexp.MustCompile(`^[a-zA-Z0-9_-]{1,80}(\.fifo)?$`)
@@ -1376,43 +1370,56 @@ func (s *SQSServer) setQueueAttributesWithRetry(ctx context.Context, queueName s
13761370
return newSQSAPIError(http.StatusInternalServerError, sqsErrInternalFailure, "set queue attributes retry attempts exhausted")
13771371
}
13781372

1379-
// trySetQueueAttributesOnce is one read-validate-commit pass. The first
1380-
// return reports whether the caller should stop retrying (the attrs
1381-
// are now committed); an error means either a non-retryable failure
1382-
// (propagate) or a retryable write conflict (retry after backoff).
1383-
func (s *SQSServer) trySetQueueAttributesOnce(ctx context.Context, queueName string, attrs map[string]string) (bool, error) {
1384-
readTS := s.nextTxnReadTS(ctx)
1385-
meta, exists, err := s.loadQueueMetaAt(ctx, queueName, readTS)
1386-
if err != nil {
1387-
return false, errors.WithStack(err)
1388-
}
1389-
if !exists {
1390-
return false, newSQSAPIError(http.StatusBadRequest, sqsErrQueueDoesNotExist, "queue does not exist")
1373+
// applyAndValidateSetAttributes runs the apply + cross-validator
1374+
// chain for a SetQueueAttributes request. Extracted from
1375+
// trySetQueueAttributesOnce so that function stays under the cyclop
1376+
// ceiling once HT-FIFO immutability + Throttle validators were
1377+
// added. Returns nil on success; on rejection returns the typed
1378+
// sqsAPIError the caller forwards to writeSQSErrorFromErr.
1379+
//
1380+
// preApply snapshot allocation is gated on htfifoAttributesPresent
1381+
// so the common "mutable-only update" path stays alloc-free per the
1382+
// Gemini medium feedback on PR #681.
1383+
func applyAndValidateSetAttributes(meta *sqsQueueMeta, attrs map[string]string) error {
1384+
var preApply *sqsQueueMeta
1385+
if htfifoAttributesPresent(attrs) {
1386+
preApply = snapshotImmutableHTFIFO(meta)
13911387
}
1392-
// Snapshot the on-disk values of the immutable HT-FIFO fields
1393-
// before applying the request so the immutability check has a
1394-
// clean before/after pair. SetQueueAttributes is all-or-nothing
1395-
// per §3.2: if any immutable attribute carries a differing value,
1396-
// the entire request is rejected before any attribute is
1397-
// persisted (including mutable attributes in the same call).
1398-
preApply := snapshotImmutableHTFIFO(meta)
13991388
if err := applyAttributes(meta, attrs); err != nil {
1400-
return false, err
1389+
return err
14011390
}
1402-
if err := validatePartitionImmutability(preApply, meta); err != nil {
1403-
return false, err
1391+
if preApply != nil {
1392+
if err := validatePartitionImmutability(preApply, meta); err != nil {
1393+
return err
1394+
}
14041395
}
14051396
// ContentBasedDeduplication is FIFO-only; a Standard queue
14061397
// silently accepting it would advertise unsupported behavior to
14071398
// clients. Same rule enforced on CreateQueue.
14081399
if meta.ContentBasedDedup && !meta.IsFIFO {
1409-
return false, newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, "ContentBasedDeduplication is only valid on FIFO queues")
1400+
return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, "ContentBasedDeduplication is only valid on FIFO queues")
14101401
}
14111402
// HT-FIFO schema validator runs after applyAttributes so the
14121403
// FIFO-only checks see the post-apply state. IsFIFO comes from
14131404
// the loaded meta record (immutable from CreateQueue) so the
14141405
// validator sees the same flag CreateQueue set.
1415-
if err := validatePartitionConfig(meta); err != nil {
1406+
return validatePartitionConfig(meta)
1407+
}
1408+
1409+
// trySetQueueAttributesOnce is one read-validate-commit pass. The first
1410+
// return reports whether the caller should stop retrying (the attrs
1411+
// are now committed); an error means either a non-retryable failure
1412+
// (propagate) or a retryable write conflict (retry after backoff).
1413+
func (s *SQSServer) trySetQueueAttributesOnce(ctx context.Context, queueName string, attrs map[string]string) (bool, error) {
1414+
readTS := s.nextTxnReadTS(ctx)
1415+
meta, exists, err := s.loadQueueMetaAt(ctx, queueName, readTS)
1416+
if err != nil {
1417+
return false, errors.WithStack(err)
1418+
}
1419+
if !exists {
1420+
return false, newSQSAPIError(http.StatusBadRequest, sqsErrQueueDoesNotExist, "queue does not exist")
1421+
}
1422+
if err := applyAndValidateSetAttributes(meta, attrs); err != nil {
14161423
return false, err
14171424
}
14181425
meta.LastModifiedAtMillis = time.Now().UnixMilli()

adapter/sqs_partitioning.go

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package adapter
22

33
import (
4-
"hash/fnv"
54
"net/http"
65
"strconv"
76
)
@@ -80,14 +79,27 @@ func partitionFor(meta *sqsQueueMeta, messageGroupID string) uint32 {
8079
if messageGroupID == "" {
8180
return 0
8281
}
83-
h := fnv.New64a()
84-
_, _ = h.Write([]byte(messageGroupID))
82+
// Inlined FNV-1a over the string to avoid the []byte allocation
83+
// hash/fnv.New64a + h.Write would force (Gemini medium on PR
84+
// #681). MessageGroupId is capped at 128 chars by validation, so
85+
// this loop bounds at 128 iterations of integer arithmetic per
86+
// SendMessage — measurably faster than the hash.Hash interface
87+
// path on the routing hot path.
88+
const (
89+
fnv64Offset uint64 = 14695981039346656037
90+
fnv64Prime uint64 = 1099511628211
91+
)
92+
hash := fnv64Offset
93+
for i := 0; i < len(messageGroupID); i++ {
94+
hash ^= uint64(messageGroupID[i])
95+
hash *= fnv64Prime
96+
}
8597
// PartitionCount is a power of two (validator-enforced); mod is
8698
// equivalent to mask-AND. The mask is meta.PartitionCount - 1.
8799
// Computing the mask in uint64 first then narrowing to uint32 is
88100
// safe because htfifoMaxPartitions == 32 fits in uint32 trivially.
89101
mask := uint64(meta.PartitionCount - 1)
90-
return uint32(h.Sum64() & mask) //nolint:gosec // masked by (PartitionCount - 1) ≤ htfifoMaxPartitions − 1, fits in uint32.
102+
return uint32(hash & mask) //nolint:gosec // masked by (PartitionCount - 1) ≤ htfifoMaxPartitions − 1, fits in uint32.
91103
}
92104

93105
// isPowerOfTwo returns true when n is a positive power of two.
@@ -140,7 +152,10 @@ func validatePartitionConfig(meta *sqsQueueMeta) error {
140152
}
141153
}
142154
if meta.PartitionCount > 1 && meta.DeduplicationScope == htfifoDedupeScopeQueue {
143-
return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidParameterValue,
155+
// sqsErrValidation is "InvalidParameterValue" (Gemini medium
156+
// on PR #681 — uses the existing constant rather than a
157+
// duplicate-value alias).
158+
return newSQSAPIError(http.StatusBadRequest, sqsErrValidation,
144159
"queue-scoped deduplication is incompatible with multi-partition FIFO because the dedup key cannot be globally unique across partitions without a cross-partition OCC transaction")
145160
}
146161
return nil

adapter/sqs_partitioning_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,8 +187,8 @@ func TestValidatePartitionConfig_RejectsQueueScopedDedupOnPartitioned(t *testing
187187
require.Error(t, err)
188188
var apiErr *sqsAPIError
189189
require.True(t, errors.As(err, &apiErr), "expected sqsAPIError, got %T", err)
190-
require.Equal(t, sqsErrInvalidParameterValue, apiErr.errorType,
191-
"the cross-attribute rejection must use InvalidParameterValue (incoherent params), not InvalidAttributeValue (malformed individual value)")
190+
require.Equal(t, sqsErrValidation, apiErr.errorType,
191+
"the cross-attribute rejection must use InvalidParameterValue (incoherent params, sqsErrValidation), not InvalidAttributeValue (malformed individual value)")
192192
// Single-partition + queue-scoped dedup is fine (legacy behaviour).
193193
require.NoError(t, validatePartitionConfig(&sqsQueueMeta{
194194
IsFIFO: true,

0 commit comments

Comments
 (0)