Skip to content

Commit e3530e9

Browse files
committed
fix(sqs/throttle): address PR #679 review feedback (Codex P1/P2 + Gemini high/medium)
Six fixes from the first round of automated reviews on PR #679: Codex P1 -- bucket reconciliation on stale config (sqs_throttle.go): loadOrInit returned cached buckets without checking that capacity / refillRate still match the queue's current Throttle config. After a leadership change, a node retaining buckets from a prior leader term would keep enforcing the prior term's limits even after a new SetQueueAttributes had committed -- the invalidation only runs on the leader that processed the commit, so a different leader's stale buckets survive. Now compares cap/refill on every Load hit and rebuilds (Delete + LoadOrStore) on mismatch. Codex P1 -- invalidate only on actual throttle change (sqs_catalog.go, sqs_throttle.go): cache invalidation in setQueueAttributes ran unconditionally after every successful commit, including unrelated- field updates and no-op writes. Result: any caller could silently restore a noisy tenant's burst capacity by writing a no-op SetQueueAttributes. Now gated on throttleAttributesPresent(in.Attributes) which checks the request for any Throttle* key. Bucket reconciliation above acts as the safety net if a future code path bypasses the gate. Codex P2 -- attributesEqual covers Throttle (sqs_catalog.go): CreateQueue idempotency relied on attributesEqual which did not include Throttle*. A re-create with different limits was treated as idempotent and silently kept the old limits. Now compares the full Throttle struct via throttleConfigEqual; baseAttributesEqual extracted to keep cyclop under the ceiling. Gemini high -- thread throttle through existing meta load (sqs_messages.go, sqs_throttle.go): chargeQueue did one Pebble read per request even though the hot-path handlers (sendMessage, receiveMessage) load the meta moments later. Added chargeQueueWithThrottle that takes pre-loaded throttle config; both hot-path handlers now load meta once and pass throttle in. Throttle check now sits AFTER the QueueDoesNotExist branch so a missing queue no longer consumes a token. Batch + delete handlers keep chargeQueue (one extra meta read) -- low-QPS verbs where the simplification of not pulling meta out of the retry loop is worth the per-call cost. Gemini high -- move sweep off hot path (sqs.go, sqs_throttle.go): maybeSweep ran the O(N) sync.Map.Range on whichever request was unlucky enough to trigger the per-minute sweep, causing latency spikes on many-queue clusters. Replaced with runSweepLoop on a background ticker tied to s.reaperCtx (started in Run alongside the existing message reaper, cleaned up by the same reaperCancel in Stop). The hot-path charge() no longer calls into the sweep at all. Gemini medium -- cap retry-after duration (sqs_throttle.go): computeRetryAfter could compute a multi-day Retry-After (or worse, overflow time.Duration arithmetic) for pathologically small refillRate / large requested values. Capped at throttleRetryAfterCap (1h, matching the bucket idle-evict window). Cap is applied before the Duration multiplication so overflow is impossible. New tests: - TestBucketStore_ReconcilesBucketOnConfigChange pins the Codex P1 reconciliation contract. - TestComputeRetryAfter_CapsAtMaximum pins the Gemini medium cap. - TestThrottleAttributesPresent covers the request-gate helper used by the conditional invalidation. All tests pass under -race; golangci-lint clean.
1 parent e105815 commit e3530e9

5 files changed

Lines changed: 252 additions & 40 deletions

File tree

adapter/sqs.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,10 @@ func NewSQSServer(listen net.Listener, st store.MVCCStore, coordinate kv.Coordin
143143

144144
func (s *SQSServer) Run() error {
145145
s.startReaper(s.reaperCtx)
146+
// Throttle bucket idle-evict runs on a background ticker so the
147+
// request hot path never pays the O(N) sweep cost. Cleaned up by
148+
// the same reaperCtx cancellation that stops the message reaper.
149+
go s.throttle.runSweepLoop(s.reaperCtx)
146150
if err := s.httpServer.Serve(s.listen); err != nil && !errors.Is(err, http.ErrServerClosed) {
147151
return errors.WithStack(err)
148152
}

adapter/sqs_catalog.go

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -640,6 +640,13 @@ func attributesEqual(a, b *sqsQueueMeta) bool {
640640
if a == nil || b == nil {
641641
return false
642642
}
643+
return baseAttributesEqual(a, b) && throttleConfigEqual(a.Throttle, b.Throttle)
644+
}
645+
646+
// baseAttributesEqual compares the pre-Phase-3.C/3.D attribute set.
647+
// Split from attributesEqual so adding fields per phase does not
648+
// push the function over the cyclop ceiling.
649+
func baseAttributesEqual(a, b *sqsQueueMeta) bool {
643650
return a.IsFIFO == b.IsFIFO &&
644651
a.ContentBasedDedup == b.ContentBasedDedup &&
645652
a.VisibilityTimeoutSeconds == b.VisibilityTimeoutSeconds &&
@@ -650,6 +657,27 @@ func attributesEqual(a, b *sqsQueueMeta) bool {
650657
a.RedrivePolicy == b.RedrivePolicy
651658
}
652659

660+
// throttleConfigEqual compares two Throttle configs for the
661+
// CreateQueue idempotency check. Without including the throttle
662+
// fields in attributesEqual, a re-create with different limits would
663+
// be treated as idempotent and silently keep the old limits.
664+
func throttleConfigEqual(a, b *sqsQueueThrottle) bool {
665+
aEmpty := a.IsEmpty()
666+
bEmpty := b.IsEmpty()
667+
if aEmpty && bEmpty {
668+
return true
669+
}
670+
if aEmpty != bEmpty {
671+
return false
672+
}
673+
return a.SendCapacity == b.SendCapacity &&
674+
a.SendRefillPerSecond == b.SendRefillPerSecond &&
675+
a.RecvCapacity == b.RecvCapacity &&
676+
a.RecvRefillPerSecond == b.RecvRefillPerSecond &&
677+
a.DefaultCapacity == b.DefaultCapacity &&
678+
a.DefaultRefillPerSecond == b.DefaultRefillPerSecond
679+
}
680+
653681
// ------------------------ storage primitives ------------------------
654682

655683
func (s *SQSServer) nextTxnReadTS(ctx context.Context) uint64 {
@@ -1205,13 +1233,18 @@ func (s *SQSServer) setQueueAttributes(w http.ResponseWriter, r *http.Request) {
12051233
}
12061234
// Drop the in-memory bucket entries belonging to this queue *after*
12071235
// the Raft commit so the next request rebuilds from the freshly
1208-
// committed throttle config. Without this step the old limits keep
1209-
// being enforced until the idle-evict sweep removes the stale
1210-
// entry — defeating the operator's intent to throttle a noisy
1211-
// tenant in real time. The LoadOrStore race a concurrent in-flight
1212-
// request might run with the stale bucket is benign: the rebuilt
1213-
// bucket starts at full capacity, same as failover semantics.
1214-
s.throttle.invalidateQueue(name)
1236+
// committed throttle config. Gated on whether the request actually
1237+
// touched a Throttle* attribute — an unconditional invalidate
1238+
// would reset the bucket on every unrelated SetQueueAttributes
1239+
// (e.g. VisibilityTimeout-only update), giving any caller a way to
1240+
// silently restore a noisy tenant's burst capacity by writing a
1241+
// no-op SetQueueAttributes (Codex P1 on PR #679). The bucket
1242+
// reconciliation in loadOrInit also catches a stale bucket if a
1243+
// throttle change slips past this gate (e.g. via a future admin
1244+
// path), so the gating here is purely a hot-path optimisation.
1245+
if throttleAttributesPresent(in.Attributes) {
1246+
s.throttle.invalidateQueue(name)
1247+
}
12151248
writeSQSJSON(w, map[string]any{})
12161249
}
12171250

adapter/sqs_messages.go

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -294,10 +294,10 @@ type sqsChangeVisibilityInput struct {
294294

295295
// ------------------------ handlers ------------------------
296296

297-
// prepareSendMessage decodes the SendMessage payload, resolves the
298-
// queue name, and runs the throttle charge. Returning early on any
299-
// failure keeps sendMessage under the cyclop ceiling — without this
300-
// extraction the throttle branch pushes the function over the limit.
297+
// prepareSendMessage decodes the SendMessage payload and resolves
298+
// the queue name. Throttle charging happens after the meta load in
299+
// validateSend so we don't pay an extra meta read just to discover
300+
// throttling is off (Gemini high on PR #679).
301301
func (s *SQSServer) prepareSendMessage(w http.ResponseWriter, r *http.Request) (sqsSendMessageInput, string, bool) {
302302
var in sqsSendMessageInput
303303
if err := decodeSQSJSONInput(r, &in); err != nil {
@@ -309,21 +309,29 @@ func (s *SQSServer) prepareSendMessage(w http.ResponseWriter, r *http.Request) (
309309
writeSQSErrorFromErr(w, err)
310310
return in, "", false
311311
}
312-
if !s.chargeQueue(w, r, queueName, bucketActionSend, 1) {
313-
return in, queueName, false
314-
}
315312
return in, queueName, true
316313
}
317314

318-
// validateSend loads queue meta, validates message attributes / FIFO
319-
// params, and resolves the delay. Returns ok=false if any validation
320-
// step has already written the error response.
315+
// validateSend loads queue meta, runs the throttle charge against
316+
// the loaded throttle config (no extra meta read), then validates
317+
// message attributes / FIFO params and resolves the delay. Returns
318+
// ok=false if any step has already written the error response.
319+
//
320+
// Throttle check sits AFTER the meta load (so we have the throttle
321+
// config) and AFTER the QueueDoesNotExist branch (so a missing
322+
// queue is reported as 400 QueueDoesNotExist, not as a Throttling
323+
// 400 against a non-existent bucket). It still sits OUTSIDE the
324+
// OCC transaction (§4.2): a rejected request never reaches the
325+
// coordinator.
321326
func (s *SQSServer) validateSend(w http.ResponseWriter, r *http.Request, queueName string, in sqsSendMessageInput) (*sqsQueueMeta, uint64, int64, bool) {
322327
meta, readTS, apiErr := s.loadQueueMetaForSend(r.Context(), queueName, []byte(in.MessageBody))
323328
if apiErr != nil {
324329
writeSQSErrorFromErr(w, apiErr)
325330
return nil, 0, 0, false
326331
}
332+
if !s.chargeQueueWithThrottle(w, queueName, bucketActionSend, 1, meta.Throttle) {
333+
return nil, 0, 0, false
334+
}
327335
if apiErr := validateMessageAttributes(in.MessageAttributes); apiErr != nil {
328336
writeSQSErrorFromErr(w, apiErr)
329337
return nil, 0, 0, false
@@ -535,9 +543,6 @@ func (s *SQSServer) receiveMessage(w http.ResponseWriter, r *http.Request) {
535543
writeSQSErrorFromErr(w, err)
536544
return
537545
}
538-
if !s.chargeQueue(w, r, queueName, bucketActionReceive, 1) {
539-
return
540-
}
541546
ctx := r.Context()
542547

543548
// Use LeaseRead to fence this scan against a leader that silently lost
@@ -559,6 +564,13 @@ func (s *SQSServer) receiveMessage(w http.ResponseWriter, r *http.Request) {
559564
writeSQSError(w, http.StatusBadRequest, sqsErrQueueDoesNotExist, "queue does not exist")
560565
return
561566
}
567+
// Throttle check uses the loaded meta's throttle config so we
568+
// don't pay an extra meta read just to discover throttling is off
569+
// (Gemini high on PR #679). Sits AFTER the QueueDoesNotExist
570+
// branch — a missing queue should not consume a Recv token.
571+
if !s.chargeQueueWithThrottle(w, queueName, bucketActionReceive, 1, meta.Throttle) {
572+
return
573+
}
562574
max, maxErr := resolveReceiveMaxMessages(in.MaxNumberOfMessages)
563575
if maxErr != nil {
564576
writeSQSErrorFromErr(w, maxErr)

adapter/sqs_throttle.go

Lines changed: 128 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package adapter
22

33
import (
4+
"context"
45
"math"
56
"net/http"
67
"sync"
@@ -30,6 +31,34 @@ const (
3031
// invalidate it.
3132
var throttleAllActions = []string{bucketActionSend, bucketActionReceive, bucketActionAny}
3233

34+
// throttleAttributeNames is the wire-side set of Throttle*
35+
// attributes a SetQueueAttributes request can carry. Used by the
36+
// invalidation gate in setQueueAttributes so an unrelated update
37+
// (e.g. VisibilityTimeout only) does not pay the cache invalidation
38+
// cost or, worse, give the caller a way to silently reset bucket
39+
// state via a no-op SetQueueAttributes (Codex P1 on PR #679).
40+
var throttleAttributeNames = []string{
41+
"ThrottleSendCapacity",
42+
"ThrottleSendRefillPerSecond",
43+
"ThrottleRecvCapacity",
44+
"ThrottleRecvRefillPerSecond",
45+
"ThrottleDefaultCapacity",
46+
"ThrottleDefaultRefillPerSecond",
47+
}
48+
49+
// throttleAttributesPresent reports whether attrs carries any
50+
// Throttle* key. Cheap O(6) check; the throttleAttributeNames slice
51+
// is the source of truth so a future Throttle* attribute name added
52+
// in one place automatically participates in the gate.
53+
func throttleAttributesPresent(attrs map[string]string) bool {
54+
for _, k := range throttleAttributeNames {
55+
if _, ok := attrs[k]; ok {
56+
return true
57+
}
58+
}
59+
return false
60+
}
61+
3362
// throttleHardCeilingPerSecond bounds any user-supplied capacity or
3463
// refill rate. A typo like SendCapacity=1e9 silently meaning "no limit"
3564
// is more dangerous than an explicit InvalidAttributeValue (Codex P1 on
@@ -147,7 +176,6 @@ func (b *bucketStore) charge(cfg *sqsQueueThrottle, queue, action string, count
147176
if count < 1 {
148177
count = 1
149178
}
150-
b.maybeSweep()
151179
// Bucket key uses the *resolved* action so Send-falls-through-to-
152180
// Default and Recv-falls-through-to-Default share the same Default
153181
// bucket. Without the resolution, an operator who configures only
@@ -191,12 +219,40 @@ func (b *bucketStore) charge(cfg *sqsQueueThrottle, queue, action string, count
191219
// safe because both racers compute identical (capacity, refillRate)
192220
// from the same meta snapshot — the bucket they would build is
193221
// behaviourally interchangeable.
222+
//
223+
// Reconciliation against stale config (Codex P1 on PR #679): if a
224+
// cached bucket's capacity/refillRate differ from the cfg's current
225+
// values, the bucket is replaced with a fresh one built from the
226+
// current config. Without this check, a node that lost leadership
227+
// during a SetQueueAttributes commit and then regained leadership
228+
// later would keep enforcing the prior leader-term's limits — the
229+
// SetQueueAttributes invalidation only runs on the leader that
230+
// processed the commit, so a different leader's stale buckets
231+
// survive. The reconciliation also covers the case where the
232+
// invalidation gate in setQueueAttributes is bypassed (e.g. by a
233+
// future admin path that mutates throttle config without touching
234+
// SetQueueAttributes).
194235
func (b *bucketStore) loadOrInit(queue, action string, capacity, refill float64) *tokenBucket {
195236
key := bucketKey{queue: queue, action: action}
196237
if v, ok := b.buckets.Load(key); ok {
197238
// type assertion is sound: only tokenBucket pointers are stored.
198239
bucket, _ := v.(*tokenBucket)
199-
return bucket
240+
// Cheap field comparison under the bucket's own lock — if the
241+
// cached bucket matches the current config we return it
242+
// directly. A mismatch means the on-disk meta moved while
243+
// this node held a stale bucket; rebuild from the current
244+
// config (full capacity, matching the failover semantics).
245+
bucket.mu.Lock()
246+
matches := bucket.capacity == capacity && bucket.refillRate == refill
247+
bucket.mu.Unlock()
248+
if matches {
249+
return bucket
250+
}
251+
b.buckets.Delete(key)
252+
// fall through to LoadOrStore — a concurrent racer might
253+
// have already inserted a fresh bucket with the current
254+
// config, in which case LoadOrStore picks it up and the new
255+
// bucket below is discarded.
200256
}
201257
now := b.clock()
202258
fresh := &tokenBucket{
@@ -225,20 +281,38 @@ func (b *bucketStore) invalidateQueue(queue string) {
225281
}
226282
}
227283

228-
// maybeSweep walks the bucket store dropping any bucket idle longer
229-
// than evictedAfter. Runs at most once per sweepEvery from the hot
230-
// path so a many-queue cluster does not pay the O(N) cost on every
231-
// request.
232-
func (b *bucketStore) maybeSweep() {
233-
if b.evictedAfter <= 0 {
284+
// runSweepLoop runs the idle-evict sweep on a background ticker so
285+
// the request hot path never pays the O(N) sync.Map.Range cost
286+
// (Gemini high on PR #679: a many-queue cluster would see latency
287+
// spikes on whichever request was unlucky enough to trigger the
288+
// per-minute on-hot-path sweep). Returns when ctx is done — the
289+
// SQSServer wires this to s.reaperCtx so a Stop() call cleans the
290+
// goroutine up alongside the existing reaper.
291+
func (b *bucketStore) runSweepLoop(ctx context.Context) {
292+
if b == nil || b.evictedAfter <= 0 || b.sweepEvery <= 0 {
234293
return
235294
}
236-
b.sweepMu.Lock()
237-
now := b.clock()
238-
if now.Sub(b.lastSweep) < b.sweepEvery {
239-
b.sweepMu.Unlock()
240-
return
295+
t := time.NewTicker(b.sweepEvery)
296+
defer t.Stop()
297+
for {
298+
select {
299+
case <-ctx.Done():
300+
return
301+
case <-t.C:
302+
b.sweep()
303+
}
241304
}
305+
}
306+
307+
// sweep walks the bucket store dropping any bucket idle longer than
308+
// evictedAfter. Called from runSweepLoop on a background ticker.
309+
// Bucket lookups are still O(1) on the hot path; sweep iterates
310+
// every entry under the per-bucket lock (held briefly for the
311+
// timestamp read) so it never blocks a charge() that already has
312+
// the bucket.
313+
func (b *bucketStore) sweep() {
314+
now := b.clock()
315+
b.sweepMu.Lock()
242316
b.lastSweep = now
243317
b.sweepMu.Unlock()
244318
cutoff := now.Add(-b.evictedAfter)
@@ -277,6 +351,17 @@ func resolveActionConfig(cfg *sqsQueueThrottle, action string) (string, float64,
277351
return action, 0, 0
278352
}
279353

354+
// throttleRetryAfterCap bounds the Retry-After value the client sees
355+
// (Gemini medium on PR #679). Without a cap, a tiny refillRate plus
356+
// a large requested count would compute a multi-day wait — and
357+
// time.Duration arithmetic can overflow at the upper end. One hour
358+
// matches the bucket store's idle-evict window: by the time the
359+
// suggested retry would otherwise expire, the bucket would have
360+
// been evicted and rebuilt at full capacity anyway, so a longer
361+
// suggestion is meaningless. Producers that hit the cap are also
362+
// strongly mis-configured; capping is a guard rail, not a feature.
363+
const throttleRetryAfterCap = time.Hour
364+
280365
// computeRetryAfter implements the §3.4 formula:
281366
//
282367
// needed := requested - currentTokens
@@ -287,6 +372,9 @@ func resolveActionConfig(cfg *sqsQueueThrottle, action string) (string, float64,
287372
// verbs, len(Entries) for batch verbs). The min-1 floor matches the
288373
// HTTP/1.1 §10.2.3 integer-second granularity. The validator keeps
289374
// refillRate > 0 so no divide-by-zero guard is needed.
375+
//
376+
// Capped at throttleRetryAfterCap to bound time.Duration arithmetic
377+
// against pathologically small refillRate / large requested values.
290378
func computeRetryAfter(requested, current, refillRate float64) time.Duration {
291379
needed := requested - current
292380
if needed <= 0 {
@@ -299,6 +387,12 @@ func computeRetryAfter(requested, current, refillRate float64) time.Duration {
299387
if secs < 1 {
300388
secs = 1
301389
}
390+
// Cap before multiplying to avoid time.Duration overflow on
391+
// pathological inputs (e.g. refillRate just above zero).
392+
const capSecs = float64(throttleRetryAfterCap / time.Second)
393+
if secs > capSecs {
394+
secs = capSecs
395+
}
302396
return time.Duration(secs) * time.Second
303397
}
304398

@@ -314,13 +408,15 @@ func throttleChargeCount(entries int) int {
314408
return entries
315409
}
316410

317-
// chargeQueue is the per-handler entry point. It loads the queue meta
318-
// at a fresh read timestamp (Pebble cache makes this cheap) and runs
319-
// the bucket store's charge against the queue's Throttle config. On
320-
// rejection it writes the Throttling envelope (400 + Retry-After +
321-
// AWS-shaped JSON body) and returns false; the caller short-circuits.
322-
// On allow it returns true and the caller continues with the existing
323-
// OCC dispatch.
411+
// chargeQueue is the per-handler entry point used by handlers that
412+
// do not already load the queue meta themselves (deleteMessage,
413+
// changeMessageVisibility, and their batch siblings). It loads the
414+
// meta at a fresh read timestamp (Pebble cache makes this cheap) and
415+
// runs the bucket store's charge against the queue's Throttle config.
416+
//
417+
// Handlers that DO load the meta themselves (sendMessage,
418+
// sendMessageBatch, receiveMessage) should use chargeQueueWithThrottle
419+
// to avoid the redundant load (Gemini high on PR #679).
324420
//
325421
// chargeQueue intentionally swallows missing-queue errors: the caller
326422
// is going to discover that the queue does not exist a few lines
@@ -336,6 +432,18 @@ func (s *SQSServer) chargeQueue(w http.ResponseWriter, r *http.Request, queueNam
336432
return true
337433
}
338434
throttle := s.queueThrottleConfig(r, queueName)
435+
return s.chargeQueueWithThrottle(w, queueName, action, count, throttle)
436+
}
437+
438+
// chargeQueueWithThrottle is the variant for handlers that already
439+
// have the throttle config in hand from their own meta load. Drops
440+
// the per-request meta load chargeQueue does, addressing the Gemini
441+
// high finding on PR #679 about redundant storage reads on the hot
442+
// path.
443+
func (s *SQSServer) chargeQueueWithThrottle(w http.ResponseWriter, queueName, action string, count int, throttle *sqsQueueThrottle) bool {
444+
if s.throttle == nil {
445+
return true
446+
}
339447
outcome := s.throttle.charge(throttle, queueName, action, count)
340448
if outcome.allowed {
341449
return true

0 commit comments

Comments
 (0)