Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions adapter/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io"
"net"
"net/http"
"strconv"
"time"

"github.com/bootjp/elastickv/kv"
Expand Down Expand Up @@ -55,6 +56,11 @@ const (
sqsErrInternalFailure = "InternalFailure"
sqsErrServiceUnavailable = "ServiceUnavailable"
sqsErrMalformedRequest = "MalformedQueryString"
// sqsErrThrottling is the per-queue rate-limit rejection code.
// Returned with HTTP 400 and a Retry-After header derived from the
// bucket's refillRate + the request's charge count (see
// computeRetryAfter in sqs_throttle.go for the formula).
sqsErrThrottling = "Throttling"
)

type SQSServerOption func(*SQSServer)
Expand All @@ -76,6 +82,11 @@ type SQSServer struct {
// goroutines without ordering between them.
reaperCtx context.Context
reaperCancel context.CancelFunc
// throttle is the per-queue rate-limit bucket store. Always
// non-nil; charge() short-circuits when the queue's meta has no
// throttle config so unconfigured queues pay one nil-check per
// request and nothing else (see sqs_throttle.go).
throttle *bucketStore
}

// WithSQSLeaderMap configures the Raft-address-to-SQS-address mapping used to
Expand All @@ -98,6 +109,7 @@ func NewSQSServer(listen net.Listener, st store.MVCCStore, coordinate kv.Coordin
coordinator: coordinate,
reaperCtx: reaperCtx,
reaperCancel: reaperCancel,
throttle: newBucketStoreDefault(),
}
s.targetHandlers = map[string]func(http.ResponseWriter, *http.Request){
sqsCreateQueueTarget: s.createQueue,
Expand Down Expand Up @@ -131,6 +143,10 @@ func NewSQSServer(listen net.Listener, st store.MVCCStore, coordinate kv.Coordin

func (s *SQSServer) Run() error {
s.startReaper(s.reaperCtx)
// Throttle bucket idle-evict runs on a background ticker so the
// request hot path never pays the O(N) sweep cost. Cleaned up by
// the same reaperCtx cancellation that stops the message reaper.
go s.throttle.runSweepLoop(s.reaperCtx)
if err := s.httpServer.Serve(s.listen); err != nil && !errors.Is(err, http.ErrServerClosed) {
return errors.WithStack(err)
}
Expand Down Expand Up @@ -267,3 +283,19 @@ func writeSQSError(w http.ResponseWriter, status int, code string, message strin
w.WriteHeader(status)
_ = json.NewEncoder(w).Encode(resp)
}

// writeSQSThrottlingError emits the rate-limit rejection envelope: 400
// + the AWS-shaped JSON error body + a Retry-After header carrying
// the integer-second wait derived from the bucket's refill rate and
// the request's charge count. The action argument is the bucket-action
// vocabulary ("Send" | "Receive" | "*") so the operator-visible
// message names the bucket that ran out, not just the queue.
func writeSQSThrottlingError(w http.ResponseWriter, queue, action string, retryAfter time.Duration) {
if retryAfter < time.Second {
retryAfter = time.Second
}
secs := int(retryAfter / time.Second)
w.Header().Set("Retry-After", strconv.Itoa(secs))
writeSQSError(w, http.StatusBadRequest, sqsErrThrottling,
"Rate exceeded for queue '"+queue+"' action '"+action+"'")
}
256 changes: 256 additions & 0 deletions adapter/sqs_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"io"
"log/slog"
"math"
"net/http"
"net/url"
"regexp"
Expand Down Expand Up @@ -92,6 +93,42 @@ type sqsQueueMeta struct {
// commit time and trust HLC monotonicity to keep ordering sane.
CreatedAtMillis int64 `json:"created_at_millis,omitempty"`
LastModifiedAtMillis int64 `json:"last_modified_at_millis,omitempty"`
// Throttle is the per-queue rate-limit configuration. nil disables
// throttling (default). Set via SetQueueAttributes with the AWS-style
// names ThrottleSendCapacity / ThrottleSendRefillPerSecond / etc.
// Persisted on the meta so a leader failover loads the configuration
// along with the rest of the queue.
Throttle *sqsQueueThrottle `json:"throttle,omitempty"`
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Compare throttle settings in CreateQueue idempotency checks

Throttle configuration was added to queue metadata, but CreateQueue idempotency still relies on attributesEqual, which does not include Throttle* fields. As a result, creating an existing queue with different throttle settings can be treated as “same attributes” and incorrectly return success instead of QueueNameExists, hiding configuration drift during provisioning.

Useful? React with 👍 / 👎.

}

// sqsQueueThrottle is the per-queue token-bucket configuration. Three
// independent buckets per queue: Send (SendMessage[Batch]), Recv
// (ReceiveMessage / DeleteMessage[Batch] / ChangeMessageVisibility[Batch],
// charged on the consumer side), Default (catch-all for any future
// non-Send/Recv verb that gets wired into the throttle path).
//
// Field-name vocabulary uses short forms (Send*, Recv*, Default*) for the
// JSON contract and AWS-style attribute names; the in-memory bucketKey
// uses the canonical action vocabulary ("Send" | "Receive" | "*").
// throttleConfigToBucketAction and bucketActionForCharge bridge the two.
type sqsQueueThrottle struct {
SendCapacity float64 `json:"send_capacity,omitempty"`
SendRefillPerSecond float64 `json:"send_refill_per_second,omitempty"`
RecvCapacity float64 `json:"recv_capacity,omitempty"`
RecvRefillPerSecond float64 `json:"recv_refill_per_second,omitempty"`
DefaultCapacity float64 `json:"default_capacity,omitempty"`
DefaultRefillPerSecond float64 `json:"default_refill_per_second,omitempty"`
}

// IsEmpty reports whether the configuration is the no-op (all six
// fields zero), in which case throttling is disabled for the queue.
func (t *sqsQueueThrottle) IsEmpty() bool {
if t == nil {
return true
}
return t.SendCapacity == 0 && t.SendRefillPerSecond == 0 &&
t.RecvCapacity == 0 && t.RecvRefillPerSecond == 0 &&
t.DefaultCapacity == 0 && t.DefaultRefillPerSecond == 0
}

var storedSQSMetaPrefix = []byte{0x00, 'S', 'Q', 0x01}
Expand Down Expand Up @@ -384,6 +421,19 @@ var sqsAttributeAppliers = map[string]attributeApplier{
m.ContentBasedDedup = b
return nil
},
// Throttle* are non-AWS extensions for per-queue rate limiting,
// see docs/design/2026_04_26_proposed_sqs_per_queue_throttling.md.
// Each accepts a non-negative float64; the cross-attribute
// validation that enforces both-zero-or-both-positive on each
// (capacity, refill) pair, capacity ≥ refill, hard ceiling, and
// the capacity ≥ 10 floor for batch-charging buckets runs in
// validateThrottleConfig after every Throttle* applier has fired.
"ThrottleSendCapacity": applyThrottleField(throttleSetSendCapacity),
"ThrottleSendRefillPerSecond": applyThrottleField(throttleSetSendRefill),
"ThrottleRecvCapacity": applyThrottleField(throttleSetRecvCapacity),
"ThrottleRecvRefillPerSecond": applyThrottleField(throttleSetRecvRefill),
"ThrottleDefaultCapacity": applyThrottleField(throttleSetDefaultCapacity),
"ThrottleDefaultRefillPerSecond": applyThrottleField(throttleSetDefaultRefill),
"RedrivePolicy": func(m *sqsQueueMeta, v string) error {
// Validate the policy at attribute-apply time so a malformed
// RedrivePolicy never makes it onto the queue meta record. The
Expand Down Expand Up @@ -419,6 +469,156 @@ func applyAttributes(meta *sqsQueueMeta, attrs map[string]string) error {
return err
}
}
// Throttle* validation has to run after every applier so the
// pair-wise rules (both-zero-or-both-positive, capacity ≥ refill,
// capacity ≥ 10 for batch buckets) see the post-update meta as a
// whole. Running per-applier would reject a valid two-attribute
// update (e.g. SendCapacity + SendRefillPerSecond) on the first
// applier because the second value is not yet present.
if err := validateThrottleConfig(meta); err != nil {
return err
}
return nil
}

// applyThrottleField wraps a setter that writes one Throttle* field
// into meta.Throttle, allocating the struct lazily on first use. The
// per-field setter does the float parse + non-negative + hard-ceiling
// check; cross-field rules run later in validateThrottleConfig.
func applyThrottleField(set func(*sqsQueueThrottle, float64)) attributeApplier {
return func(m *sqsQueueMeta, v string) error {
f, err := parseThrottleFloat(v)
if err != nil {
return err
}
if m.Throttle == nil {
m.Throttle = &sqsQueueThrottle{}
}
set(m.Throttle, f)
return nil
}
}

// parseThrottleFloat parses the wire string into a non-negative float
// bounded by the hard ceiling. Any malformed or out-of-range value
// turns into InvalidAttributeValue with a self-describing message so
// the operator sees the cause without grepping the server log.
func parseThrottleFloat(value string) (float64, error) {
v := strings.TrimSpace(value)
f, err := strconv.ParseFloat(v, 64)
if err != nil {
return 0, newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue,
"throttle attribute must be a non-negative number")
}
if math.IsNaN(f) || math.IsInf(f, 0) || f < 0 {
return 0, newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue,
"throttle attribute must be finite and non-negative")
}
if f > throttleHardCeilingPerSecond {
return 0, newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue,
"throttle attribute exceeds hard ceiling 100000")
}
return f, nil
}

// Per-field setters keep applyThrottleField a one-liner per attribute
// and let validateThrottleConfig stay outside the applier dispatch
// table. Defined as functions (not closures) so a future caller from
// outside applyAttributes — e.g. a programmatic admin surface — can
// reuse them without recreating the closure boilerplate.
func throttleSetSendCapacity(t *sqsQueueThrottle, f float64) { t.SendCapacity = f }
func throttleSetSendRefill(t *sqsQueueThrottle, f float64) { t.SendRefillPerSecond = f }
func throttleSetRecvCapacity(t *sqsQueueThrottle, f float64) { t.RecvCapacity = f }
func throttleSetRecvRefill(t *sqsQueueThrottle, f float64) { t.RecvRefillPerSecond = f }
func throttleSetDefaultCapacity(t *sqsQueueThrottle, f float64) { t.DefaultCapacity = f }
func throttleSetDefaultRefill(t *sqsQueueThrottle, f float64) { t.DefaultRefillPerSecond = f }

// validateThrottleConfig enforces the §3.2 cross-attribute rules on
// the post-applier meta. The single-field constraints (non-negative,
// hard ceiling) are already enforced inside parseThrottleFloat;
// what's left is pair-wise:
//
// - Each (capacity, refill) pair must be both zero (action disabled)
// or both positive. A capacity-without-refill bucket would never
// refill; a refill-without-capacity bucket has no burst headroom.
// - capacity ≥ refill, otherwise the bucket can never burst above
// steady state (the bucket can only ever hold one second's worth).
// - For action buckets that cover a batch verb (Send, Recv) the
// capacity must be ≥ throttleMinBatchCapacity (== 10). A capacity
// below the largest single charge is permanently unserviceable
// for full batches.
//
// If meta.Throttle is empty (the IsEmpty short-circuit) the function
// also drops the empty struct so a round-trip GetQueueAttributes
// reports the queue as untrothttled rather than zero-valued. Mirrors
// how nil throttle on the meta means "not configured".
func validateThrottleConfig(meta *sqsQueueMeta) error {
if meta.Throttle == nil {
return nil
}
t := meta.Throttle
if err := validateThrottlePair("ThrottleSend", t.SendCapacity, t.SendRefillPerSecond, true); err != nil {
return err
}
if err := validateThrottlePair("ThrottleRecv", t.RecvCapacity, t.RecvRefillPerSecond, true); err != nil {
return err
}
// Default* covers any future non-Send/Recv verb that gets wired
// into the throttle path — no batch verb is in scope today, so the
// minimum-batch-capacity check is off for this bucket.
if err := validateThrottlePair("ThrottleDefault", t.DefaultCapacity, t.DefaultRefillPerSecond, false); err != nil {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Enforce batch-capacity floor on fallback Default throttle bucket

validateThrottleConfig disables the batch-capacity check for ThrottleDefault* (requireBatchCapacity=false), but resolveActionConfig routes Send/Receive traffic to Default whenever Send*/Recv* are unset. That means configs like ThrottleDefaultCapacity=5, ThrottleDefaultRefillPerSecond=1 are accepted, yet full SendMessageBatch/DeleteMessageBatch requests (charge=10) can never succeed because the bucket can never accumulate 10 tokens. This creates a permanently unserviceable, hard-throttled batch path from a configuration the validator currently allows.

Useful? React with 👍 / 👎.

return err
}
if t.IsEmpty() {
// All-zero post-apply means the operator wrote a "disable"
// command; canonicalise to nil so downstream code hits the
// nil-throttle short-circuit rather than the IsEmpty branch.
meta.Throttle = nil
}
return nil
}

// addThrottleAttributes renders the non-zero Throttle* pairs into out.
// Per §3.2 the wire-side vocabulary stays Send*/Recv*/Default*; the
// canonical bucket-action vocabulary is internal to the bucket store.
func addThrottleAttributes(out map[string]string, t *sqsQueueThrottle) {
if t.IsEmpty() {
return
}
if t.SendCapacity > 0 {
out["ThrottleSendCapacity"] = strconv.FormatFloat(t.SendCapacity, 'g', -1, 64)
out["ThrottleSendRefillPerSecond"] = strconv.FormatFloat(t.SendRefillPerSecond, 'g', -1, 64)
}
if t.RecvCapacity > 0 {
out["ThrottleRecvCapacity"] = strconv.FormatFloat(t.RecvCapacity, 'g', -1, 64)
out["ThrottleRecvRefillPerSecond"] = strconv.FormatFloat(t.RecvRefillPerSecond, 'g', -1, 64)
}
if t.DefaultCapacity > 0 {
out["ThrottleDefaultCapacity"] = strconv.FormatFloat(t.DefaultCapacity, 'g', -1, 64)
out["ThrottleDefaultRefillPerSecond"] = strconv.FormatFloat(t.DefaultRefillPerSecond, 'g', -1, 64)
}
}

// validateThrottlePair runs the per-(action, capacity, refill) checks.
// requireBatchCapacity gates the capacity ≥ 10 rule so the catch-all
// Default* bucket (no batch verbs in scope today) does not get the
// extra constraint.
func validateThrottlePair(prefix string, capacity, refill float64, requireBatchCapacity bool) error {
if capacity == 0 && refill == 0 {
return nil
}
if capacity == 0 || refill == 0 {
return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue,
prefix+"Capacity and "+prefix+"RefillPerSecond must both be zero (disabled) or both positive")
}
if capacity < refill {
return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue,
prefix+"Capacity must be ≥ "+prefix+"RefillPerSecond (capacity is the burst cap; below refill the bucket cannot accumulate)")
}
if requireBatchCapacity && capacity < throttleMinBatchCapacity {
return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue,
prefix+"Capacity must be ≥ 10 — batch verbs (SendMessageBatch / DeleteMessageBatch) charge up to 10 tokens per call; a smaller capacity makes every full batch permanently unserviceable")
}
return nil
}

Expand All @@ -440,6 +640,13 @@ func attributesEqual(a, b *sqsQueueMeta) bool {
if a == nil || b == nil {
return false
}
return baseAttributesEqual(a, b) && throttleConfigEqual(a.Throttle, b.Throttle)
}

// baseAttributesEqual compares the pre-Phase-3.C/3.D attribute set.
// Split from attributesEqual so adding fields per phase does not
// push the function over the cyclop ceiling.
func baseAttributesEqual(a, b *sqsQueueMeta) bool {
return a.IsFIFO == b.IsFIFO &&
a.ContentBasedDedup == b.ContentBasedDedup &&
a.VisibilityTimeoutSeconds == b.VisibilityTimeoutSeconds &&
Expand All @@ -450,6 +657,27 @@ func attributesEqual(a, b *sqsQueueMeta) bool {
a.RedrivePolicy == b.RedrivePolicy
}

// throttleConfigEqual compares two Throttle configs for the
// CreateQueue idempotency check. Without including the throttle
// fields in attributesEqual, a re-create with different limits would
// be treated as idempotent and silently keep the old limits.
func throttleConfigEqual(a, b *sqsQueueThrottle) bool {
aEmpty := a.IsEmpty()
bEmpty := b.IsEmpty()
if aEmpty && bEmpty {
return true
}
if aEmpty != bEmpty {
return false
}
return a.SendCapacity == b.SendCapacity &&
a.SendRefillPerSecond == b.SendRefillPerSecond &&
a.RecvCapacity == b.RecvCapacity &&
a.RecvRefillPerSecond == b.RecvRefillPerSecond &&
a.DefaultCapacity == b.DefaultCapacity &&
a.DefaultRefillPerSecond == b.DefaultRefillPerSecond
}

// ------------------------ storage primitives ------------------------

func (s *SQSServer) nextTxnReadTS(ctx context.Context) uint64 {
Expand Down Expand Up @@ -623,6 +851,14 @@ func (s *SQSServer) deleteQueue(w http.ResponseWriter, r *http.Request) {
writeSQSErrorFromErr(w, err)
return
}
// Drop in-memory throttle buckets belonging to this queue so a
// same-name CreateQueue immediately after this delete starts with
// a fresh full-capacity bucket, not the stale balance from the
// previous incarnation. Without this step the old throttle would
// keep enforcing for up to the idle-evict window (default 1 h),
// surprising operators who use DeleteQueue+CreateQueue to reset
// queue state.
s.throttle.invalidateQueue(name)
// SQS DeleteQueue returns 200 with an empty body.
writeSQSJSON(w, map[string]any{})
}
Expand Down Expand Up @@ -954,6 +1190,12 @@ func queueMetaToAttributes(meta *sqsQueueMeta, selection sqsAttributeSelection,
if meta.RedrivePolicy != "" {
all["RedrivePolicy"] = meta.RedrivePolicy
}
// Throttle* are non-AWS extensions. Surfacing them in
// GetQueueAttributes lets operators read back what they set; SDKs
// that strictly validate the attribute set will ignore unknown
// keys. Extracted into a helper so queueMetaToAttributes stays
// under the cyclop ceiling.
addThrottleAttributes(all, meta.Throttle)
if selection.expandAll {
return all
}
Expand Down Expand Up @@ -989,6 +1231,20 @@ func (s *SQSServer) setQueueAttributes(w http.ResponseWriter, r *http.Request) {
writeSQSErrorFromErr(w, err)
return
}
// Drop the in-memory bucket entries belonging to this queue *after*
// the Raft commit so the next request rebuilds from the freshly
// committed throttle config. Gated on whether the request actually
// touched a Throttle* attribute — an unconditional invalidate
// would reset the bucket on every unrelated SetQueueAttributes
// (e.g. VisibilityTimeout-only update), giving any caller a way to
// silently restore a noisy tenant's burst capacity by writing a
// no-op SetQueueAttributes (Codex P1 on PR #679). The bucket
// reconciliation in loadOrInit also catches a stale bucket if a
// throttle change slips past this gate (e.g. via a future admin
// path), so the gating here is purely a hot-path optimisation.
if throttleAttributesPresent(in.Attributes) {
s.throttle.invalidateQueue(name)
}
writeSQSJSON(w, map[string]any{})
}

Expand Down
Loading
Loading