Skip to content

Commit e105815

Browse files
committed
feat(sqs): per-queue throttling (Phase 3.C)
Implements the design from docs/design/2026_04_26_proposed_sqs_per_queue_throttling.md. Schema (sqs_catalog.go): - sqsQueueMeta gains an optional Throttle *sqsQueueThrottle field with six float64 sub-fields (Send/Recv/Default x Capacity/RefillPerSecond). - Six new ThrottleSendCapacity / ThrottleSendRefillPerSecond / ThrottleRecvCapacity / ThrottleRecvRefillPerSecond / ThrottleDefaultCapacity / ThrottleDefaultRefillPerSecond attributes go through the standard sqsAttributeAppliers dispatch. - validateThrottleConfig enforces the cross-field rules from the design: each (capacity, refill) pair must be both-zero or both-positive; capacity must be >= refill so the bucket can burst; Send/Recv capacities must be >= 10 (the SendMessageBatch / DeleteMessageBatch max charge) so a small capacity does not make every full batch permanently unserviceable; per-field hard ceiling of 100k. Validator runs once at the end of applyAttributes so a multi-field update sees the post-apply state as a whole. - queueMetaToAttributes surfaces the configured Throttle* fields so GetQueueAttributes("All") round-trips. Extracted into addThrottleAttributes to stay under the cyclop ceiling. Bucket store (sqs_throttle.go, new): - bucketStore: sync.Map[bucketKey]*tokenBucket; per-bucket sync.Mutex on the hot path so cross-queue traffic never serialises on a process-wide lock. Per Gemini medium on PR #664 the single-mutex alternative was rejected. Lazy idle-evict sweep removes inactive buckets after 1h to bound memory. - charge(): lock-free Load, LoadOrStore on miss (race-tolerant -- both racers compute identical capacity/refill from the same meta), then per-bucket lock for refill + take + release. Refill is elapsed * refillRate, capped at capacity. On reject, computes Retry-After from the actual refillRate and the requestedCount per the §3.4 formula (numerator is requested count, not 1, so a batch verb does not get told to retry in 1s when it really needs 10s). - resolveActionConfig() handles the Default* fall-through: when only DefaultCapacity is set, Send and Recv requests share the same bucketKey{action:"*"} so Default behaves as one shared cap rather than three independent quotas. - invalidateQueue() drops every bucket belonging to a queue. Called after the Raft commit on SetQueueAttributes / DeleteQueue so new limits take effect on the very next request, not after the 1h idle-evict sweep. Without this step the §3.1 cache-invalidation contract fails and operators see stale enforcement. Charging (sqs_messages.go, sqs_messages_batch.go): - All seven message-plane handlers wired: SendMessage / SendMessageBatch (Send bucket; batch charges by entry count), ReceiveMessage / DeleteMessage / DeleteMessageBatch / ChangeMessageVisibility / ChangeMessageVisibilityBatch (Receive bucket). - Throttle check sits OUTSIDE the OCC retry loop per §4.2 -- a rejected request never reaches the coordinator, so the existing sendMessageWithRetry et al. cannot busy-loop on a permanent rate-limit failure. - sendMessage extracted into prepareSendMessage + validateSend + body to stay under the cyclop ceiling once the throttle branch was added. Error envelope (sqs.go): - New sqsErrThrottling code + writeSQSThrottlingError helper that writes 400 + AWS-shaped JSON body + x-amzn-ErrorType + Retry-After header. The envelope is the same shape AWS uses; SDKs that key off x-amzn-ErrorType handle it without changes. Tests: - adapter/sqs_throttle_test.go: 18 unit tests covering bucket math (fresh capacity, refill elapsed, refill cap, batch reject preserves partial credit, Retry-After uses requested count, sub-1-RPS floor, per-action / per-queue / Default-fallthrough isolation, invalidateQueue, concurrent -race, default-off short-circuit) and the validator (nil/empty canonicalisation, both-zero-or-both-positive, capacity >= 10 batch floor, Default* exempt, capacity >= refill, parseThrottleFloat range checks, computeRetryAfter floor). - adapter/sqs_throttle_integration_test.go: 8 end-to-end tests covering the §6 testing strategy (default-off allows unbounded, send/recv reject after capacity with correct envelope + Retry-After, batch charges by entry count, SetQueueAttributes invalidation, DeleteQueue + CreateQueue lifecycle invalidation, GetQueueAttributes round-trip, validator rejects below batch min). Run with -race clean. Out of scope for this PR (deferred to a follow-up): - Prometheus counter (sqs_throttled_requests_total) and gauge (sqs_throttle_tokens_remaining) per the design's §4.1 monitoring/ registry.go entry. The wiring needs a new prometheus collector and hooking through the Registry seam; punted to keep this PR focused on the data-plane behaviour. The bucket store's chargeOutcome already exposes tokensAfter so the gauge wiring is one site. - §6.5 cross-protocol (Query) parity test: the Query protocol layer lives on PR #662 and the throttle envelope already has the same shape on both protocols (writeSQSError handles both), so the test is a follow-up after #662 lands. - §6.6 failover behaviour test: the §3.1 "fresh bucket on failover" contract is implementation-correct (the bucket map is per-process, no Raft state) but a 3-node failover test needs the cluster scaffolding and is best added with the Jepsen workload.
1 parent d246cc3 commit e105815

7 files changed

Lines changed: 1390 additions & 6 deletions

File tree

adapter/sqs.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"io"
66
"net"
77
"net/http"
8+
"strconv"
89
"time"
910

1011
"github.com/bootjp/elastickv/kv"
@@ -55,6 +56,11 @@ const (
5556
sqsErrInternalFailure = "InternalFailure"
5657
sqsErrServiceUnavailable = "ServiceUnavailable"
5758
sqsErrMalformedRequest = "MalformedQueryString"
59+
// sqsErrThrottling is the per-queue rate-limit rejection code.
60+
// Returned with HTTP 400 and a Retry-After header derived from the
61+
// bucket's refillRate + the request's charge count (see
62+
// computeRetryAfter in sqs_throttle.go for the formula).
63+
sqsErrThrottling = "Throttling"
5864
)
5965

6066
type SQSServerOption func(*SQSServer)
@@ -76,6 +82,11 @@ type SQSServer struct {
7682
// goroutines without ordering between them.
7783
reaperCtx context.Context
7884
reaperCancel context.CancelFunc
85+
// throttle is the per-queue rate-limit bucket store. Always
86+
// non-nil; charge() short-circuits when the queue's meta has no
87+
// throttle config so unconfigured queues pay one nil-check per
88+
// request and nothing else (see sqs_throttle.go).
89+
throttle *bucketStore
7990
}
8091

8192
// WithSQSLeaderMap configures the Raft-address-to-SQS-address mapping used to
@@ -98,6 +109,7 @@ func NewSQSServer(listen net.Listener, st store.MVCCStore, coordinate kv.Coordin
98109
coordinator: coordinate,
99110
reaperCtx: reaperCtx,
100111
reaperCancel: reaperCancel,
112+
throttle: newBucketStoreDefault(),
101113
}
102114
s.targetHandlers = map[string]func(http.ResponseWriter, *http.Request){
103115
sqsCreateQueueTarget: s.createQueue,
@@ -267,3 +279,19 @@ func writeSQSError(w http.ResponseWriter, status int, code string, message strin
267279
w.WriteHeader(status)
268280
_ = json.NewEncoder(w).Encode(resp)
269281
}
282+
283+
// writeSQSThrottlingError emits the rate-limit rejection envelope: 400
284+
// + the AWS-shaped JSON error body + a Retry-After header carrying
285+
// the integer-second wait derived from the bucket's refill rate and
286+
// the request's charge count. The action argument is the bucket-action
287+
// vocabulary ("Send" | "Receive" | "*") so the operator-visible
288+
// message names the bucket that ran out, not just the queue.
289+
func writeSQSThrottlingError(w http.ResponseWriter, queue, action string, retryAfter time.Duration) {
290+
if retryAfter < time.Second {
291+
retryAfter = time.Second
292+
}
293+
secs := int(retryAfter / time.Second)
294+
w.Header().Set("Retry-After", strconv.Itoa(secs))
295+
writeSQSError(w, http.StatusBadRequest, sqsErrThrottling,
296+
"Rate exceeded for queue '"+queue+"' action '"+action+"'")
297+
}

adapter/sqs_catalog.go

Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"context"
66
"io"
77
"log/slog"
8+
"math"
89
"net/http"
910
"net/url"
1011
"regexp"
@@ -92,6 +93,42 @@ type sqsQueueMeta struct {
9293
// commit time and trust HLC monotonicity to keep ordering sane.
9394
CreatedAtMillis int64 `json:"created_at_millis,omitempty"`
9495
LastModifiedAtMillis int64 `json:"last_modified_at_millis,omitempty"`
96+
// Throttle is the per-queue rate-limit configuration. nil disables
97+
// throttling (default). Set via SetQueueAttributes with the AWS-style
98+
// names ThrottleSendCapacity / ThrottleSendRefillPerSecond / etc.
99+
// Persisted on the meta so a leader failover loads the configuration
100+
// along with the rest of the queue.
101+
Throttle *sqsQueueThrottle `json:"throttle,omitempty"`
102+
}
103+
104+
// sqsQueueThrottle is the per-queue token-bucket configuration. Three
105+
// independent buckets per queue: Send (SendMessage[Batch]), Recv
106+
// (ReceiveMessage / DeleteMessage[Batch] / ChangeMessageVisibility[Batch],
107+
// charged on the consumer side), Default (catch-all for any future
108+
// non-Send/Recv verb that gets wired into the throttle path).
109+
//
110+
// Field-name vocabulary uses short forms (Send*, Recv*, Default*) for the
111+
// JSON contract and AWS-style attribute names; the in-memory bucketKey
112+
// uses the canonical action vocabulary ("Send" | "Receive" | "*").
113+
// throttleConfigToBucketAction and bucketActionForCharge bridge the two.
114+
type sqsQueueThrottle struct {
115+
SendCapacity float64 `json:"send_capacity,omitempty"`
116+
SendRefillPerSecond float64 `json:"send_refill_per_second,omitempty"`
117+
RecvCapacity float64 `json:"recv_capacity,omitempty"`
118+
RecvRefillPerSecond float64 `json:"recv_refill_per_second,omitempty"`
119+
DefaultCapacity float64 `json:"default_capacity,omitempty"`
120+
DefaultRefillPerSecond float64 `json:"default_refill_per_second,omitempty"`
121+
}
122+
123+
// IsEmpty reports whether the configuration is the no-op (all six
124+
// fields zero), in which case throttling is disabled for the queue.
125+
func (t *sqsQueueThrottle) IsEmpty() bool {
126+
if t == nil {
127+
return true
128+
}
129+
return t.SendCapacity == 0 && t.SendRefillPerSecond == 0 &&
130+
t.RecvCapacity == 0 && t.RecvRefillPerSecond == 0 &&
131+
t.DefaultCapacity == 0 && t.DefaultRefillPerSecond == 0
95132
}
96133

97134
var storedSQSMetaPrefix = []byte{0x00, 'S', 'Q', 0x01}
@@ -384,6 +421,19 @@ var sqsAttributeAppliers = map[string]attributeApplier{
384421
m.ContentBasedDedup = b
385422
return nil
386423
},
424+
// Throttle* are non-AWS extensions for per-queue rate limiting,
425+
// see docs/design/2026_04_26_proposed_sqs_per_queue_throttling.md.
426+
// Each accepts a non-negative float64; the cross-attribute
427+
// validation that enforces both-zero-or-both-positive on each
428+
// (capacity, refill) pair, capacity ≥ refill, hard ceiling, and
429+
// the capacity ≥ 10 floor for batch-charging buckets runs in
430+
// validateThrottleConfig after every Throttle* applier has fired.
431+
"ThrottleSendCapacity": applyThrottleField(throttleSetSendCapacity),
432+
"ThrottleSendRefillPerSecond": applyThrottleField(throttleSetSendRefill),
433+
"ThrottleRecvCapacity": applyThrottleField(throttleSetRecvCapacity),
434+
"ThrottleRecvRefillPerSecond": applyThrottleField(throttleSetRecvRefill),
435+
"ThrottleDefaultCapacity": applyThrottleField(throttleSetDefaultCapacity),
436+
"ThrottleDefaultRefillPerSecond": applyThrottleField(throttleSetDefaultRefill),
387437
"RedrivePolicy": func(m *sqsQueueMeta, v string) error {
388438
// Validate the policy at attribute-apply time so a malformed
389439
// RedrivePolicy never makes it onto the queue meta record. The
@@ -419,6 +469,156 @@ func applyAttributes(meta *sqsQueueMeta, attrs map[string]string) error {
419469
return err
420470
}
421471
}
472+
// Throttle* validation has to run after every applier so the
473+
// pair-wise rules (both-zero-or-both-positive, capacity ≥ refill,
474+
// capacity ≥ 10 for batch buckets) see the post-update meta as a
475+
// whole. Running per-applier would reject a valid two-attribute
476+
// update (e.g. SendCapacity + SendRefillPerSecond) on the first
477+
// applier because the second value is not yet present.
478+
if err := validateThrottleConfig(meta); err != nil {
479+
return err
480+
}
481+
return nil
482+
}
483+
484+
// applyThrottleField wraps a setter that writes one Throttle* field
485+
// into meta.Throttle, allocating the struct lazily on first use. The
486+
// per-field setter does the float parse + non-negative + hard-ceiling
487+
// check; cross-field rules run later in validateThrottleConfig.
488+
func applyThrottleField(set func(*sqsQueueThrottle, float64)) attributeApplier {
489+
return func(m *sqsQueueMeta, v string) error {
490+
f, err := parseThrottleFloat(v)
491+
if err != nil {
492+
return err
493+
}
494+
if m.Throttle == nil {
495+
m.Throttle = &sqsQueueThrottle{}
496+
}
497+
set(m.Throttle, f)
498+
return nil
499+
}
500+
}
501+
502+
// parseThrottleFloat parses the wire string into a non-negative float
503+
// bounded by the hard ceiling. Any malformed or out-of-range value
504+
// turns into InvalidAttributeValue with a self-describing message so
505+
// the operator sees the cause without grepping the server log.
506+
func parseThrottleFloat(value string) (float64, error) {
507+
v := strings.TrimSpace(value)
508+
f, err := strconv.ParseFloat(v, 64)
509+
if err != nil {
510+
return 0, newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue,
511+
"throttle attribute must be a non-negative number")
512+
}
513+
if math.IsNaN(f) || math.IsInf(f, 0) || f < 0 {
514+
return 0, newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue,
515+
"throttle attribute must be finite and non-negative")
516+
}
517+
if f > throttleHardCeilingPerSecond {
518+
return 0, newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue,
519+
"throttle attribute exceeds hard ceiling 100000")
520+
}
521+
return f, nil
522+
}
523+
524+
// Per-field setters keep applyThrottleField a one-liner per attribute
525+
// and let validateThrottleConfig stay outside the applier dispatch
526+
// table. Defined as functions (not closures) so a future caller from
527+
// outside applyAttributes — e.g. a programmatic admin surface — can
528+
// reuse them without recreating the closure boilerplate.
529+
func throttleSetSendCapacity(t *sqsQueueThrottle, f float64) { t.SendCapacity = f }
530+
func throttleSetSendRefill(t *sqsQueueThrottle, f float64) { t.SendRefillPerSecond = f }
531+
func throttleSetRecvCapacity(t *sqsQueueThrottle, f float64) { t.RecvCapacity = f }
532+
func throttleSetRecvRefill(t *sqsQueueThrottle, f float64) { t.RecvRefillPerSecond = f }
533+
func throttleSetDefaultCapacity(t *sqsQueueThrottle, f float64) { t.DefaultCapacity = f }
534+
func throttleSetDefaultRefill(t *sqsQueueThrottle, f float64) { t.DefaultRefillPerSecond = f }
535+
536+
// validateThrottleConfig enforces the §3.2 cross-attribute rules on
537+
// the post-applier meta. The single-field constraints (non-negative,
538+
// hard ceiling) are already enforced inside parseThrottleFloat;
539+
// what's left is pair-wise:
540+
//
541+
// - Each (capacity, refill) pair must be both zero (action disabled)
542+
// or both positive. A capacity-without-refill bucket would never
543+
// refill; a refill-without-capacity bucket has no burst headroom.
544+
// - capacity ≥ refill, otherwise the bucket can never burst above
545+
// steady state (the bucket can only ever hold one second's worth).
546+
// - For action buckets that cover a batch verb (Send, Recv) the
547+
// capacity must be ≥ throttleMinBatchCapacity (== 10). A capacity
548+
// below the largest single charge is permanently unserviceable
549+
// for full batches.
550+
//
551+
// If meta.Throttle is empty (the IsEmpty short-circuit) the function
552+
// also drops the empty struct so a round-trip GetQueueAttributes
553+
// reports the queue as untrothttled rather than zero-valued. Mirrors
554+
// how nil throttle on the meta means "not configured".
555+
func validateThrottleConfig(meta *sqsQueueMeta) error {
556+
if meta.Throttle == nil {
557+
return nil
558+
}
559+
t := meta.Throttle
560+
if err := validateThrottlePair("ThrottleSend", t.SendCapacity, t.SendRefillPerSecond, true); err != nil {
561+
return err
562+
}
563+
if err := validateThrottlePair("ThrottleRecv", t.RecvCapacity, t.RecvRefillPerSecond, true); err != nil {
564+
return err
565+
}
566+
// Default* covers any future non-Send/Recv verb that gets wired
567+
// into the throttle path — no batch verb is in scope today, so the
568+
// minimum-batch-capacity check is off for this bucket.
569+
if err := validateThrottlePair("ThrottleDefault", t.DefaultCapacity, t.DefaultRefillPerSecond, false); err != nil {
570+
return err
571+
}
572+
if t.IsEmpty() {
573+
// All-zero post-apply means the operator wrote a "disable"
574+
// command; canonicalise to nil so downstream code hits the
575+
// nil-throttle short-circuit rather than the IsEmpty branch.
576+
meta.Throttle = nil
577+
}
578+
return nil
579+
}
580+
581+
// addThrottleAttributes renders the non-zero Throttle* pairs into out.
582+
// Per §3.2 the wire-side vocabulary stays Send*/Recv*/Default*; the
583+
// canonical bucket-action vocabulary is internal to the bucket store.
584+
func addThrottleAttributes(out map[string]string, t *sqsQueueThrottle) {
585+
if t.IsEmpty() {
586+
return
587+
}
588+
if t.SendCapacity > 0 {
589+
out["ThrottleSendCapacity"] = strconv.FormatFloat(t.SendCapacity, 'g', -1, 64)
590+
out["ThrottleSendRefillPerSecond"] = strconv.FormatFloat(t.SendRefillPerSecond, 'g', -1, 64)
591+
}
592+
if t.RecvCapacity > 0 {
593+
out["ThrottleRecvCapacity"] = strconv.FormatFloat(t.RecvCapacity, 'g', -1, 64)
594+
out["ThrottleRecvRefillPerSecond"] = strconv.FormatFloat(t.RecvRefillPerSecond, 'g', -1, 64)
595+
}
596+
if t.DefaultCapacity > 0 {
597+
out["ThrottleDefaultCapacity"] = strconv.FormatFloat(t.DefaultCapacity, 'g', -1, 64)
598+
out["ThrottleDefaultRefillPerSecond"] = strconv.FormatFloat(t.DefaultRefillPerSecond, 'g', -1, 64)
599+
}
600+
}
601+
602+
// validateThrottlePair runs the per-(action, capacity, refill) checks.
603+
// requireBatchCapacity gates the capacity ≥ 10 rule so the catch-all
604+
// Default* bucket (no batch verbs in scope today) does not get the
605+
// extra constraint.
606+
func validateThrottlePair(prefix string, capacity, refill float64, requireBatchCapacity bool) error {
607+
if capacity == 0 && refill == 0 {
608+
return nil
609+
}
610+
if capacity == 0 || refill == 0 {
611+
return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue,
612+
prefix+"Capacity and "+prefix+"RefillPerSecond must both be zero (disabled) or both positive")
613+
}
614+
if capacity < refill {
615+
return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue,
616+
prefix+"Capacity must be ≥ "+prefix+"RefillPerSecond (capacity is the burst cap; below refill the bucket cannot accumulate)")
617+
}
618+
if requireBatchCapacity && capacity < throttleMinBatchCapacity {
619+
return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue,
620+
prefix+"Capacity must be ≥ 10 — batch verbs (SendMessageBatch / DeleteMessageBatch) charge up to 10 tokens per call; a smaller capacity makes every full batch permanently unserviceable")
621+
}
422622
return nil
423623
}
424624

@@ -623,6 +823,14 @@ func (s *SQSServer) deleteQueue(w http.ResponseWriter, r *http.Request) {
623823
writeSQSErrorFromErr(w, err)
624824
return
625825
}
826+
// Drop in-memory throttle buckets belonging to this queue so a
827+
// same-name CreateQueue immediately after this delete starts with
828+
// a fresh full-capacity bucket, not the stale balance from the
829+
// previous incarnation. Without this step the old throttle would
830+
// keep enforcing for up to the idle-evict window (default 1 h),
831+
// surprising operators who use DeleteQueue+CreateQueue to reset
832+
// queue state.
833+
s.throttle.invalidateQueue(name)
626834
// SQS DeleteQueue returns 200 with an empty body.
627835
writeSQSJSON(w, map[string]any{})
628836
}
@@ -954,6 +1162,12 @@ func queueMetaToAttributes(meta *sqsQueueMeta, selection sqsAttributeSelection,
9541162
if meta.RedrivePolicy != "" {
9551163
all["RedrivePolicy"] = meta.RedrivePolicy
9561164
}
1165+
// Throttle* are non-AWS extensions. Surfacing them in
1166+
// GetQueueAttributes lets operators read back what they set; SDKs
1167+
// that strictly validate the attribute set will ignore unknown
1168+
// keys. Extracted into a helper so queueMetaToAttributes stays
1169+
// under the cyclop ceiling.
1170+
addThrottleAttributes(all, meta.Throttle)
9571171
if selection.expandAll {
9581172
return all
9591173
}
@@ -989,6 +1203,15 @@ func (s *SQSServer) setQueueAttributes(w http.ResponseWriter, r *http.Request) {
9891203
writeSQSErrorFromErr(w, err)
9901204
return
9911205
}
1206+
// Drop the in-memory bucket entries belonging to this queue *after*
1207+
// the Raft commit so the next request rebuilds from the freshly
1208+
// committed throttle config. Without this step the old limits keep
1209+
// being enforced until the idle-evict sweep removes the stale
1210+
// entry — defeating the operator's intent to throttle a noisy
1211+
// tenant in real time. The LoadOrStore race a concurrent in-flight
1212+
// request might run with the stale bucket is benign: the rebuilt
1213+
// bucket starts at full capacity, same as failover semantics.
1214+
s.throttle.invalidateQueue(name)
9921215
writeSQSJSON(w, map[string]any{})
9931216
}
9941217

0 commit comments

Comments
 (0)