-
Notifications
You must be signed in to change notification settings - Fork 2
feat(sqs): per-queue throttling (Phase 3.C) #679
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
e105815
e3530e9
c6d3a68
af2d12b
29c7296
ad67145
efe7e56
0b41e0f
416b112
576cf8c
7d7e620
e9c20de
dbd7a66
dcc5574
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,6 +5,7 @@ import ( | |
| "context" | ||
| "io" | ||
| "log/slog" | ||
| "math" | ||
| "net/http" | ||
| "net/url" | ||
| "regexp" | ||
|
|
@@ -92,6 +93,42 @@ type sqsQueueMeta struct { | |
| // commit time and trust HLC monotonicity to keep ordering sane. | ||
| CreatedAtMillis int64 `json:"created_at_millis,omitempty"` | ||
| LastModifiedAtMillis int64 `json:"last_modified_at_millis,omitempty"` | ||
| // Throttle is the per-queue rate-limit configuration. nil disables | ||
| // throttling (default). Set via SetQueueAttributes with the AWS-style | ||
| // names ThrottleSendCapacity / ThrottleSendRefillPerSecond / etc. | ||
| // Persisted on the meta so a leader failover loads the configuration | ||
| // along with the rest of the queue. | ||
| Throttle *sqsQueueThrottle `json:"throttle,omitempty"` | ||
| } | ||
|
|
||
| // sqsQueueThrottle is the per-queue token-bucket configuration. Three | ||
| // independent buckets per queue: Send (SendMessage[Batch]), Recv | ||
| // (ReceiveMessage / DeleteMessage[Batch] / ChangeMessageVisibility[Batch], | ||
| // charged on the consumer side), Default (catch-all for any future | ||
| // non-Send/Recv verb that gets wired into the throttle path). | ||
| // | ||
| // Field-name vocabulary uses short forms (Send*, Recv*, Default*) for the | ||
| // JSON contract and AWS-style attribute names; the in-memory bucketKey | ||
| // uses the canonical action vocabulary ("Send" | "Receive" | "*"). | ||
| // throttleConfigToBucketAction and bucketActionForCharge bridge the two. | ||
| type sqsQueueThrottle struct { | ||
| SendCapacity float64 `json:"send_capacity,omitempty"` | ||
| SendRefillPerSecond float64 `json:"send_refill_per_second,omitempty"` | ||
| RecvCapacity float64 `json:"recv_capacity,omitempty"` | ||
| RecvRefillPerSecond float64 `json:"recv_refill_per_second,omitempty"` | ||
| DefaultCapacity float64 `json:"default_capacity,omitempty"` | ||
| DefaultRefillPerSecond float64 `json:"default_refill_per_second,omitempty"` | ||
| } | ||
|
|
||
| // IsEmpty reports whether the configuration is the no-op (all six | ||
| // fields zero), in which case throttling is disabled for the queue. | ||
| func (t *sqsQueueThrottle) IsEmpty() bool { | ||
| if t == nil { | ||
| return true | ||
| } | ||
| return t.SendCapacity == 0 && t.SendRefillPerSecond == 0 && | ||
| t.RecvCapacity == 0 && t.RecvRefillPerSecond == 0 && | ||
| t.DefaultCapacity == 0 && t.DefaultRefillPerSecond == 0 | ||
| } | ||
|
|
||
| var storedSQSMetaPrefix = []byte{0x00, 'S', 'Q', 0x01} | ||
|
|
@@ -384,6 +421,19 @@ var sqsAttributeAppliers = map[string]attributeApplier{ | |
| m.ContentBasedDedup = b | ||
| return nil | ||
| }, | ||
| // Throttle* are non-AWS extensions for per-queue rate limiting, | ||
| // see docs/design/2026_04_26_proposed_sqs_per_queue_throttling.md. | ||
| // Each accepts a non-negative float64; the cross-attribute | ||
| // validation that enforces both-zero-or-both-positive on each | ||
| // (capacity, refill) pair, capacity ≥ refill, hard ceiling, and | ||
| // the capacity ≥ 10 floor for batch-charging buckets runs in | ||
| // validateThrottleConfig after every Throttle* applier has fired. | ||
| "ThrottleSendCapacity": applyThrottleField(throttleSetSendCapacity), | ||
| "ThrottleSendRefillPerSecond": applyThrottleField(throttleSetSendRefill), | ||
| "ThrottleRecvCapacity": applyThrottleField(throttleSetRecvCapacity), | ||
| "ThrottleRecvRefillPerSecond": applyThrottleField(throttleSetRecvRefill), | ||
| "ThrottleDefaultCapacity": applyThrottleField(throttleSetDefaultCapacity), | ||
| "ThrottleDefaultRefillPerSecond": applyThrottleField(throttleSetDefaultRefill), | ||
| "RedrivePolicy": func(m *sqsQueueMeta, v string) error { | ||
| // Validate the policy at attribute-apply time so a malformed | ||
| // RedrivePolicy never makes it onto the queue meta record. The | ||
|
|
@@ -419,6 +469,156 @@ func applyAttributes(meta *sqsQueueMeta, attrs map[string]string) error { | |
| return err | ||
| } | ||
| } | ||
| // Throttle* validation has to run after every applier so the | ||
| // pair-wise rules (both-zero-or-both-positive, capacity ≥ refill, | ||
| // capacity ≥ 10 for batch buckets) see the post-update meta as a | ||
| // whole. Running per-applier would reject a valid two-attribute | ||
| // update (e.g. SendCapacity + SendRefillPerSecond) on the first | ||
| // applier because the second value is not yet present. | ||
| if err := validateThrottleConfig(meta); err != nil { | ||
| return err | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // applyThrottleField wraps a setter that writes one Throttle* field | ||
| // into meta.Throttle, allocating the struct lazily on first use. The | ||
| // per-field setter does the float parse + non-negative + hard-ceiling | ||
| // check; cross-field rules run later in validateThrottleConfig. | ||
| func applyThrottleField(set func(*sqsQueueThrottle, float64)) attributeApplier { | ||
| return func(m *sqsQueueMeta, v string) error { | ||
| f, err := parseThrottleFloat(v) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| if m.Throttle == nil { | ||
| m.Throttle = &sqsQueueThrottle{} | ||
| } | ||
| set(m.Throttle, f) | ||
| return nil | ||
| } | ||
| } | ||
|
|
||
| // parseThrottleFloat parses the wire string into a non-negative float | ||
| // bounded by the hard ceiling. Any malformed or out-of-range value | ||
| // turns into InvalidAttributeValue with a self-describing message so | ||
| // the operator sees the cause without grepping the server log. | ||
| func parseThrottleFloat(value string) (float64, error) { | ||
| v := strings.TrimSpace(value) | ||
| f, err := strconv.ParseFloat(v, 64) | ||
| if err != nil { | ||
| return 0, newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, | ||
| "throttle attribute must be a non-negative number") | ||
| } | ||
| if math.IsNaN(f) || math.IsInf(f, 0) || f < 0 { | ||
| return 0, newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, | ||
| "throttle attribute must be finite and non-negative") | ||
| } | ||
| if f > throttleHardCeilingPerSecond { | ||
| return 0, newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, | ||
| "throttle attribute exceeds hard ceiling 100000") | ||
| } | ||
| return f, nil | ||
| } | ||
|
|
||
| // Per-field setters keep applyThrottleField a one-liner per attribute | ||
| // and let validateThrottleConfig stay outside the applier dispatch | ||
| // table. Defined as functions (not closures) so a future caller from | ||
| // outside applyAttributes — e.g. a programmatic admin surface — can | ||
| // reuse them without recreating the closure boilerplate. | ||
| func throttleSetSendCapacity(t *sqsQueueThrottle, f float64) { t.SendCapacity = f } | ||
| func throttleSetSendRefill(t *sqsQueueThrottle, f float64) { t.SendRefillPerSecond = f } | ||
| func throttleSetRecvCapacity(t *sqsQueueThrottle, f float64) { t.RecvCapacity = f } | ||
| func throttleSetRecvRefill(t *sqsQueueThrottle, f float64) { t.RecvRefillPerSecond = f } | ||
| func throttleSetDefaultCapacity(t *sqsQueueThrottle, f float64) { t.DefaultCapacity = f } | ||
| func throttleSetDefaultRefill(t *sqsQueueThrottle, f float64) { t.DefaultRefillPerSecond = f } | ||
|
|
||
| // validateThrottleConfig enforces the §3.2 cross-attribute rules on | ||
| // the post-applier meta. The single-field constraints (non-negative, | ||
| // hard ceiling) are already enforced inside parseThrottleFloat; | ||
| // what's left is pair-wise: | ||
| // | ||
| // - Each (capacity, refill) pair must be both zero (action disabled) | ||
| // or both positive. A capacity-without-refill bucket would never | ||
| // refill; a refill-without-capacity bucket has no burst headroom. | ||
| // - capacity ≥ refill, otherwise the bucket can never burst above | ||
| // steady state (the bucket can only ever hold one second's worth). | ||
| // - For action buckets that cover a batch verb (Send, Recv) the | ||
| // capacity must be ≥ throttleMinBatchCapacity (== 10). A capacity | ||
| // below the largest single charge is permanently unserviceable | ||
| // for full batches. | ||
| // | ||
| // If meta.Throttle is empty (the IsEmpty short-circuit) the function | ||
| // also drops the empty struct so a round-trip GetQueueAttributes | ||
| // reports the queue as untrothttled rather than zero-valued. Mirrors | ||
| // how nil throttle on the meta means "not configured". | ||
| func validateThrottleConfig(meta *sqsQueueMeta) error { | ||
| if meta.Throttle == nil { | ||
| return nil | ||
| } | ||
| t := meta.Throttle | ||
| if err := validateThrottlePair("ThrottleSend", t.SendCapacity, t.SendRefillPerSecond, true); err != nil { | ||
| return err | ||
| } | ||
| if err := validateThrottlePair("ThrottleRecv", t.RecvCapacity, t.RecvRefillPerSecond, true); err != nil { | ||
| return err | ||
| } | ||
| // Default* covers any future non-Send/Recv verb that gets wired | ||
| // into the throttle path — no batch verb is in scope today, so the | ||
| // minimum-batch-capacity check is off for this bucket. | ||
| if err := validateThrottlePair("ThrottleDefault", t.DefaultCapacity, t.DefaultRefillPerSecond, false); err != nil { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Useful? React with 👍 / 👎. |
||
| return err | ||
| } | ||
| if t.IsEmpty() { | ||
| // All-zero post-apply means the operator wrote a "disable" | ||
| // command; canonicalise to nil so downstream code hits the | ||
| // nil-throttle short-circuit rather than the IsEmpty branch. | ||
| meta.Throttle = nil | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // addThrottleAttributes renders the non-zero Throttle* pairs into out. | ||
| // Per §3.2 the wire-side vocabulary stays Send*/Recv*/Default*; the | ||
| // canonical bucket-action vocabulary is internal to the bucket store. | ||
| func addThrottleAttributes(out map[string]string, t *sqsQueueThrottle) { | ||
| if t.IsEmpty() { | ||
| return | ||
| } | ||
| if t.SendCapacity > 0 { | ||
| out["ThrottleSendCapacity"] = strconv.FormatFloat(t.SendCapacity, 'g', -1, 64) | ||
| out["ThrottleSendRefillPerSecond"] = strconv.FormatFloat(t.SendRefillPerSecond, 'g', -1, 64) | ||
| } | ||
| if t.RecvCapacity > 0 { | ||
| out["ThrottleRecvCapacity"] = strconv.FormatFloat(t.RecvCapacity, 'g', -1, 64) | ||
| out["ThrottleRecvRefillPerSecond"] = strconv.FormatFloat(t.RecvRefillPerSecond, 'g', -1, 64) | ||
| } | ||
| if t.DefaultCapacity > 0 { | ||
| out["ThrottleDefaultCapacity"] = strconv.FormatFloat(t.DefaultCapacity, 'g', -1, 64) | ||
| out["ThrottleDefaultRefillPerSecond"] = strconv.FormatFloat(t.DefaultRefillPerSecond, 'g', -1, 64) | ||
| } | ||
| } | ||
|
|
||
| // validateThrottlePair runs the per-(action, capacity, refill) checks. | ||
| // requireBatchCapacity gates the capacity ≥ 10 rule so the catch-all | ||
| // Default* bucket (no batch verbs in scope today) does not get the | ||
| // extra constraint. | ||
| func validateThrottlePair(prefix string, capacity, refill float64, requireBatchCapacity bool) error { | ||
| if capacity == 0 && refill == 0 { | ||
| return nil | ||
| } | ||
| if capacity == 0 || refill == 0 { | ||
| return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, | ||
| prefix+"Capacity and "+prefix+"RefillPerSecond must both be zero (disabled) or both positive") | ||
| } | ||
| if capacity < refill { | ||
| return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, | ||
| prefix+"Capacity must be ≥ "+prefix+"RefillPerSecond (capacity is the burst cap; below refill the bucket cannot accumulate)") | ||
| } | ||
| if requireBatchCapacity && capacity < throttleMinBatchCapacity { | ||
| return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, | ||
| prefix+"Capacity must be ≥ 10 — batch verbs (SendMessageBatch / DeleteMessageBatch) charge up to 10 tokens per call; a smaller capacity makes every full batch permanently unserviceable") | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
|
|
@@ -440,6 +640,13 @@ func attributesEqual(a, b *sqsQueueMeta) bool { | |
| if a == nil || b == nil { | ||
| return false | ||
| } | ||
| return baseAttributesEqual(a, b) && throttleConfigEqual(a.Throttle, b.Throttle) | ||
| } | ||
|
|
||
| // baseAttributesEqual compares the pre-Phase-3.C/3.D attribute set. | ||
| // Split from attributesEqual so adding fields per phase does not | ||
| // push the function over the cyclop ceiling. | ||
| func baseAttributesEqual(a, b *sqsQueueMeta) bool { | ||
| return a.IsFIFO == b.IsFIFO && | ||
| a.ContentBasedDedup == b.ContentBasedDedup && | ||
| a.VisibilityTimeoutSeconds == b.VisibilityTimeoutSeconds && | ||
|
|
@@ -450,6 +657,27 @@ func attributesEqual(a, b *sqsQueueMeta) bool { | |
| a.RedrivePolicy == b.RedrivePolicy | ||
| } | ||
|
|
||
| // throttleConfigEqual compares two Throttle configs for the | ||
| // CreateQueue idempotency check. Without including the throttle | ||
| // fields in attributesEqual, a re-create with different limits would | ||
| // be treated as idempotent and silently keep the old limits. | ||
| func throttleConfigEqual(a, b *sqsQueueThrottle) bool { | ||
| aEmpty := a.IsEmpty() | ||
| bEmpty := b.IsEmpty() | ||
| if aEmpty && bEmpty { | ||
| return true | ||
| } | ||
| if aEmpty != bEmpty { | ||
| return false | ||
| } | ||
| return a.SendCapacity == b.SendCapacity && | ||
| a.SendRefillPerSecond == b.SendRefillPerSecond && | ||
| a.RecvCapacity == b.RecvCapacity && | ||
| a.RecvRefillPerSecond == b.RecvRefillPerSecond && | ||
| a.DefaultCapacity == b.DefaultCapacity && | ||
| a.DefaultRefillPerSecond == b.DefaultRefillPerSecond | ||
| } | ||
|
|
||
| // ------------------------ storage primitives ------------------------ | ||
|
|
||
| func (s *SQSServer) nextTxnReadTS(ctx context.Context) uint64 { | ||
|
|
@@ -623,6 +851,14 @@ func (s *SQSServer) deleteQueue(w http.ResponseWriter, r *http.Request) { | |
| writeSQSErrorFromErr(w, err) | ||
| return | ||
| } | ||
| // Drop in-memory throttle buckets belonging to this queue so a | ||
| // same-name CreateQueue immediately after this delete starts with | ||
| // a fresh full-capacity bucket, not the stale balance from the | ||
| // previous incarnation. Without this step the old throttle would | ||
| // keep enforcing for up to the idle-evict window (default 1 h), | ||
| // surprising operators who use DeleteQueue+CreateQueue to reset | ||
| // queue state. | ||
| s.throttle.invalidateQueue(name) | ||
| // SQS DeleteQueue returns 200 with an empty body. | ||
| writeSQSJSON(w, map[string]any{}) | ||
| } | ||
|
|
@@ -954,6 +1190,12 @@ func queueMetaToAttributes(meta *sqsQueueMeta, selection sqsAttributeSelection, | |
| if meta.RedrivePolicy != "" { | ||
| all["RedrivePolicy"] = meta.RedrivePolicy | ||
| } | ||
| // Throttle* are non-AWS extensions. Surfacing them in | ||
| // GetQueueAttributes lets operators read back what they set; SDKs | ||
| // that strictly validate the attribute set will ignore unknown | ||
| // keys. Extracted into a helper so queueMetaToAttributes stays | ||
| // under the cyclop ceiling. | ||
| addThrottleAttributes(all, meta.Throttle) | ||
| if selection.expandAll { | ||
| return all | ||
| } | ||
|
|
@@ -989,6 +1231,20 @@ func (s *SQSServer) setQueueAttributes(w http.ResponseWriter, r *http.Request) { | |
| writeSQSErrorFromErr(w, err) | ||
| return | ||
| } | ||
| // Drop the in-memory bucket entries belonging to this queue *after* | ||
| // the Raft commit so the next request rebuilds from the freshly | ||
| // committed throttle config. Gated on whether the request actually | ||
| // touched a Throttle* attribute — an unconditional invalidate | ||
| // would reset the bucket on every unrelated SetQueueAttributes | ||
| // (e.g. VisibilityTimeout-only update), giving any caller a way to | ||
| // silently restore a noisy tenant's burst capacity by writing a | ||
| // no-op SetQueueAttributes (Codex P1 on PR #679). The bucket | ||
| // reconciliation in loadOrInit also catches a stale bucket if a | ||
| // throttle change slips past this gate (e.g. via a future admin | ||
| // path), so the gating here is purely a hot-path optimisation. | ||
| if throttleAttributesPresent(in.Attributes) { | ||
| s.throttle.invalidateQueue(name) | ||
| } | ||
| writeSQSJSON(w, map[string]any{}) | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Throttle configuration was added to queue metadata, but
CreateQueueidempotency still relies onattributesEqual, which does not includeThrottle*fields. As a result, creating an existing queue with different throttle settings can be treated as “same attributes” and incorrectly return success instead ofQueueNameExists, hiding configuration drift during provisioning.Useful? React with 👍 / 👎.