Skip to content

Commit c6d3a68

Browse files
committed
fix(sqs/throttle): drop dead sweep state + add throttle idempotency test (PR #679 round 2)
Two items from the round-2 Claude review on PR #679: (1) bucketStore.sweepMu and bucketStore.lastSweep became dead code when the move from on-hot-path maybeSweep to the background runSweepLoop landed in round 1. The sweep() ticker is the only caller, so the serialisation primitive that protected against concurrent on-hot-path callers no longer has a job. Removed both fields and the sweepMu.Lock/lastSweep=now/sweepMu.Unlock block; sweep() now reads b.clock() inline. Field comment updated to describe the post-runSweepLoop design (single-goroutine driver). (2) TestSQSServer_CatalogCreateIsIdempotent now exercises throttleConfigEqual via a fourth case: same name, same VisibilityTimeout, but the second create adds Throttle* attributes. attributesEqual must notice the diff and the call must reject as QueueNameExists. Without this case a future bug in throttleConfigEqual (e.g. always returning true) would slip past the existing VisibilityTimeout-only test. Both findings noted in the round-2 Claude review on PR #679.
1 parent e3530e9 commit c6d3a68

2 files changed

Lines changed: 35 additions & 17 deletions

File tree

adapter/sqs_catalog_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,29 @@ func TestSQSServer_CatalogCreateIsIdempotent(t *testing.T) {
136136
if got, _ := out3["__type"].(string); got != sqsErrQueueNameExists {
137137
t.Fatalf("differing-attrs error type: got %q want %q", got, sqsErrQueueNameExists)
138138
}
139+
140+
// Fourth call: same name, same non-throttle attrs as the original
141+
// create, but different Throttle* values. The original create had
142+
// no Throttle config; this one adds one. throttleConfigEqual must
143+
// notice the diff and the call must reject as QueueNameExists.
144+
// Without this case a bug in throttleConfigEqual (e.g. always
145+
// returning true) would slip past the existing VisibilityTimeout-
146+
// only test (Claude review on PR #679 round 2).
147+
withThrottle := map[string]any{
148+
"QueueName": "idempotent",
149+
"Attributes": map[string]string{
150+
"VisibilityTimeout": "60",
151+
"ThrottleSendCapacity": "10",
152+
"ThrottleSendRefillPerSecond": "1",
153+
},
154+
}
155+
status4, out4 := callSQS(t, node, sqsCreateQueueTarget, withThrottle)
156+
if status4 != http.StatusBadRequest {
157+
t.Fatalf("re-create with added Throttle*: got %d want 400; body %v", status4, out4)
158+
}
159+
if got, _ := out4["__type"].(string); got != sqsErrQueueNameExists {
160+
t.Fatalf("Throttle*-diff error type: got %q want %q", got, sqsErrQueueNameExists)
161+
}
139162
}
140163

141164
func TestSQSServer_CatalogGetAndSetAttributes(t *testing.T) {

adapter/sqs_throttle.go

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -105,15 +105,13 @@ type tokenBucket struct {
105105
// bucketStore holds every active bucket for an SQS server process.
106106
// sync.Map matches the read-mostly access pattern: lookups are nearly
107107
// always Load hits; LoadOrStore pays the write cost only on first use.
108+
//
109+
// The idle-evict sweep runs from runSweepLoop on a background ticker
110+
// — there is no hot-path serialisation primitive because the only
111+
// caller of sweep() is the sole goroutine the ticker drives.
108112
type bucketStore struct {
109-
buckets sync.Map // map[bucketKey]*tokenBucket
110-
clock func() time.Time
111-
// sweepMu serialises the lazy idle-evict sweep so concurrent first
112-
// requests cannot all run a full sweep in parallel. The sweep itself
113-
// acquires per-bucket mu briefly, so it never blocks a lookup that
114-
// already has the bucket.
115-
sweepMu sync.Mutex
116-
lastSweep time.Time
113+
buckets sync.Map // map[bucketKey]*tokenBucket
114+
clock func() time.Time
117115
evictedAfter time.Duration
118116
sweepEvery time.Duration
119117
}
@@ -305,17 +303,14 @@ func (b *bucketStore) runSweepLoop(ctx context.Context) {
305303
}
306304

307305
// sweep walks the bucket store dropping any bucket idle longer than
308-
// evictedAfter. Called from runSweepLoop on a background ticker.
309-
// Bucket lookups are still O(1) on the hot path; sweep iterates
310-
// every entry under the per-bucket lock (held briefly for the
311-
// timestamp read) so it never blocks a charge() that already has
306+
// evictedAfter. Called from runSweepLoop on a background ticker —
307+
// the ticker is the only caller, so sweep() does not need its own
308+
// serialisation. Bucket lookups stay O(1) on the hot path; sweep
309+
// iterates every entry under the per-bucket lock (held briefly for
310+
// the timestamp read) so it never blocks a charge() that already has
312311
// the bucket.
313312
func (b *bucketStore) sweep() {
314-
now := b.clock()
315-
b.sweepMu.Lock()
316-
b.lastSweep = now
317-
b.sweepMu.Unlock()
318-
cutoff := now.Add(-b.evictedAfter)
313+
cutoff := b.clock().Add(-b.evictedAfter)
319314
b.buckets.Range(func(k, v any) bool {
320315
bucket, _ := v.(*tokenBucket)
321316
bucket.mu.Lock()

0 commit comments

Comments
 (0)