Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
341b0de
docs(sqs): proposals for Phase 3.C (throttling) + 3.D (split-queue FIFO)
bootjp Apr 26, 2026
6fa02fa
docs(sqs): address review feedback on Phase 3.C / 3.D proposals
bootjp Apr 26, 2026
f289d6c
docs(sqs): address Claude follow-up review on Phase 3.C / 3.D proposals
bootjp Apr 26, 2026
d424d18
docs(sqs): address third Claude review on Phase 3.C / 3.D proposals
bootjp Apr 26, 2026
f6bd33a
docs(sqs): address fourth Claude review on Phase 3.C / 3.D proposals
bootjp Apr 26, 2026
624d411
docs(sqs): address fifth-round Claude review (DeleteQueue, WaitTimeSe…
bootjp Apr 26, 2026
499f2bc
docs(sqs): address sixth-round Codex P1 + Claude low (capacity, downg…
bootjp Apr 26, 2026
0c22d1c
docs(sqs): address seventh-round Codex P1 + Claude low (action keys, …
bootjp Apr 26, 2026
f18015b
docs(sqs): refine immutability gate to CreateQueue + clarify §3.4 router
bootjp Apr 26, 2026
6787de1
docs(sqs): address ninth-round Claude lows (4-byte handle, invalidati…
bootjp Apr 26, 2026
14b4d88
docs(sqs): address tenth-round Claude lows (all-or-nothing semantics,…
bootjp Apr 26, 2026
829cf97
docs(sqs): add WaitTimeSeconds shared-deadline timing test (§9 item 5)
bootjp Apr 26, 2026
fc7c3cb
docs(sqs): address two open Codex P1 mediums (control-plane dedup gat…
bootjp Apr 26, 2026
d246cc3
docs(sqs): address thirteenth-round Claude lows (dead-code clarificat…
bootjp Apr 26, 2026
e105815
feat(sqs): per-queue throttling (Phase 3.C)
bootjp Apr 26, 2026
e3530e9
fix(sqs/throttle): address PR #679 review feedback (Codex P1/P2 + Gem…
bootjp Apr 27, 2026
c6d3a68
fix(sqs/throttle): drop dead sweep state + add throttle idempotency t…
bootjp Apr 27, 2026
af2d12b
docs(sqs/throttle): refresh stale comments on idle-evict constants
bootjp Apr 27, 2026
29c7296
fix(sqs/throttle): use CompareAndDelete in loadOrInit reconciliation …
bootjp Apr 27, 2026
ad67145
feat(sqs): HT-FIFO schema + validators + dormancy gate (Phase 3.D PR 2)
bootjp Apr 26, 2026
efe7e56
fix(sqs/htfifo): address PR #681 review feedback (Gemini medium x3)
bootjp Apr 27, 2026
0b41e0f
fix(sqs/htfifo): PR #681 round 2 — FIFO-only PartitionCount + test cl…
bootjp Apr 27, 2026
416b112
feat(sqs): HT-FIFO schema + validators + dormancy gate (Phase 3.D PR …
bootjp Apr 27, 2026
576cf8c
fix(sqs/throttle): Default* batch floor + sweep eviction race (PR #67…
bootjp Apr 27, 2026
7d7e620
fix(sqs/throttle): close orphan-bucket window via evicted flag (PR #6…
bootjp Apr 27, 2026
e9c20de
fix(sqs/throttle): invalidateQueue lock-then-delete (PR #679 round 6.1)
bootjp Apr 27, 2026
dbd7a66
fix(sqs/htfifo): normalise PartitionCount in idempotency check (PR #6…
bootjp Apr 27, 2026
dcc5574
fix(sqs/htfifo): normalise PartitionCount in immutability check too (…
bootjp Apr 27, 2026
1ae4aa3
feat(sqs): per-queue throttling (Phase 3.C) (#679)
bootjp Apr 27, 2026
2edf9d4
Merge remote-tracking branch 'origin/main' into docs/sqs-phase3-propo…
bootjp Apr 27, 2026
53ee4d4
fix(sqs/throttle): generation-keyed bucket map (PR #664 round 7 / Cod…
bootjp Apr 27, 2026
f664d28
docs(sqs): align design with implementation; refactor partitionFor to…
bootjp Apr 27, 2026
2d5ea6c
Merge branch 'main' into docs/sqs-phase3-proposals
bootjp Apr 27, 2026
044128e
docs(sqs): correct stale Default* exemption + describe all 3 failover…
bootjp Apr 27, 2026
34cb485
fix(sqs/throttle): incarnation key + no-op invalidate gate + fail-clo…
bootjp Apr 28, 2026
72d6485
docs(sqs/throttle): correct chargeQueue comment — batch handlers use …
bootjp Apr 28, 2026
d1ce7c1
fix(sqs/throttle): hold bucket lock across reconciliation evict + fla…
bootjp Apr 28, 2026
cae2469
Merge branch 'main' into docs/sqs-phase3-proposals
bootjp Apr 28, 2026
ed5d065
Merge branch 'main' into docs/sqs-phase3-proposals
bootjp Apr 28, 2026
1725844
Merge branch 'main' into docs/sqs-phase3-proposals
bootjp Apr 28, 2026
ff76340
Merge branch 'main' into docs/sqs-phase3-proposals
bootjp Apr 28, 2026
642ec0d
fix(sqs): address CodeRabbit/Copilot findings from round 11/12 review…
github-actions[bot] Apr 28, 2026
55297f6
fix(sqs): chargeBucket fail-closed + perMessageGroupId requires Parti…
bootjp Apr 28, 2026
7e43488
fix(sqs): drop nolint:gosec + 3 doc fixes (PR #664 round 13 / Claude …
bootjp Apr 28, 2026
3f57be8
Merge branch 'main' into docs/sqs-phase3-proposals
bootjp Apr 28, 2026
8d6e214
fix(sqs): drop PR-ref from comment + add §9 internal-header strip tes…
bootjp Apr 28, 2026
213ded4
style(sqs): strip PR/round attributions from .go comments per CLAUDE.…
bootjp Apr 28, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions adapter/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io"
"net"
"net/http"
"strconv"
"time"

"github.com/bootjp/elastickv/kv"
Expand Down Expand Up @@ -55,6 +56,11 @@ const (
sqsErrInternalFailure = "InternalFailure"
sqsErrServiceUnavailable = "ServiceUnavailable"
sqsErrMalformedRequest = "MalformedQueryString"
// sqsErrThrottling is the per-queue rate-limit rejection code.
// Returned with HTTP 400 and a Retry-After header derived from the
// bucket's refillRate + the request's charge count (see
// computeRetryAfter in sqs_throttle.go for the formula).
sqsErrThrottling = "Throttling"
)

type SQSServerOption func(*SQSServer)
Expand All @@ -76,6 +82,11 @@ type SQSServer struct {
// goroutines without ordering between them.
reaperCtx context.Context
reaperCancel context.CancelFunc
// throttle is the per-queue rate-limit bucket store. Always
// non-nil; charge() short-circuits when the queue's meta has no
// throttle config so unconfigured queues pay one nil-check per
// request and nothing else (see sqs_throttle.go).
throttle *bucketStore
}

// WithSQSLeaderMap configures the Raft-address-to-SQS-address mapping used to
Expand All @@ -98,6 +109,7 @@ func NewSQSServer(listen net.Listener, st store.MVCCStore, coordinate kv.Coordin
coordinator: coordinate,
reaperCtx: reaperCtx,
reaperCancel: reaperCancel,
throttle: newBucketStoreDefault(),
}
s.targetHandlers = map[string]func(http.ResponseWriter, *http.Request){
sqsCreateQueueTarget: s.createQueue,
Expand Down Expand Up @@ -131,6 +143,10 @@ func NewSQSServer(listen net.Listener, st store.MVCCStore, coordinate kv.Coordin

func (s *SQSServer) Run() error {
s.startReaper(s.reaperCtx)
// Throttle bucket idle-evict runs on a background ticker so the
// request hot path never pays the O(N) sweep cost. Cleaned up by
// the same reaperCtx cancellation that stops the message reaper.
go s.throttle.runSweepLoop(s.reaperCtx)
if err := s.httpServer.Serve(s.listen); err != nil && !errors.Is(err, http.ErrServerClosed) {
return errors.WithStack(err)
}
Expand Down Expand Up @@ -302,3 +318,19 @@ func writeSQSError(w http.ResponseWriter, status int, code string, message strin
w.WriteHeader(status)
_ = json.NewEncoder(w).Encode(resp)
}

// writeSQSThrottlingError emits the rate-limit rejection envelope: 400
// + the AWS-shaped JSON error body + a Retry-After header carrying
// the integer-second wait derived from the bucket's refill rate and
// the request's charge count. The action argument is the bucket-action
// vocabulary ("Send" | "Receive" | "*") so the operator-visible
// message names the bucket that ran out, not just the queue.
func writeSQSThrottlingError(w http.ResponseWriter, queue, action string, retryAfter time.Duration) {
if retryAfter < time.Second {
retryAfter = time.Second
}
secs := int(retryAfter / time.Second)
w.Header().Set("Retry-After", strconv.Itoa(secs))
writeSQSError(w, http.StatusBadRequest, sqsErrThrottling,
"Rate exceeded for queue '"+queue+"' action '"+action+"'")
}
541 changes: 515 additions & 26 deletions adapter/sqs_catalog.go

Large diffs are not rendered by default.

23 changes: 23 additions & 0 deletions adapter/sqs_catalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,29 @@ func TestSQSServer_CatalogCreateIsIdempotent(t *testing.T) {
if got, _ := out3["__type"].(string); got != sqsErrQueueNameExists {
t.Fatalf("differing-attrs error type: got %q want %q", got, sqsErrQueueNameExists)
}

// Fourth call: same name, same non-throttle attrs as the original
// create, but different Throttle* values. The original create had
// no Throttle config; this one adds one. throttleConfigEqual must
// notice the diff and the call must reject as QueueNameExists.
// Without this case a bug in throttleConfigEqual (e.g. always
// returning true) would slip past the existing VisibilityTimeout-
// only test.
withThrottle := map[string]any{
"QueueName": "idempotent",
"Attributes": map[string]string{
"VisibilityTimeout": "60",
"ThrottleSendCapacity": "10",
"ThrottleSendRefillPerSecond": "1",
},
}
status4, out4 := callSQS(t, node, sqsCreateQueueTarget, withThrottle)
if status4 != http.StatusBadRequest {
t.Fatalf("re-create with added Throttle*: got %d want 400; body %v", status4, out4)
}
if got, _ := out4["__type"].(string); got != sqsErrQueueNameExists {
t.Fatalf("Throttle*-diff error type: got %q want %q", got, sqsErrQueueNameExists)
}
}

func TestSQSServer_CatalogGetAndSetAttributes(t *testing.T) {
Expand Down
59 changes: 53 additions & 6 deletions adapter/sqs_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,33 +294,67 @@ type sqsChangeVisibilityInput struct {

// ------------------------ handlers ------------------------

func (s *SQSServer) sendMessage(w http.ResponseWriter, r *http.Request) {
// prepareSendMessage decodes the SendMessage payload and resolves
// the queue name. Throttle charging happens after the meta load in
// validateSend so we don't pay an extra meta read just to discover
// throttling is off.
func (s *SQSServer) prepareSendMessage(w http.ResponseWriter, r *http.Request) (sqsSendMessageInput, string, bool) {
var in sqsSendMessageInput
if err := decodeSQSJSONInput(r, &in); err != nil {
writeSQSErrorFromErr(w, err)
return
return in, "", false
}
queueName, err := queueNameFromURL(in.QueueUrl)
if err != nil {
writeSQSErrorFromErr(w, err)
return
return in, "", false
}
return in, queueName, true
}

// validateSend loads queue meta, runs the throttle charge against
// the loaded throttle config (no extra meta read), then validates
// message attributes / FIFO params and resolves the delay. Returns
// ok=false if any step has already written the error response.
//
// Throttle check sits AFTER the meta load (so we have the throttle
// config) and AFTER the QueueDoesNotExist branch (so a missing
// queue is reported as 400 QueueDoesNotExist, not as a Throttling
// 400 against a non-existent bucket). It still sits OUTSIDE the
// OCC transaction (§4.2): a rejected request never reaches the
// coordinator.
func (s *SQSServer) validateSend(w http.ResponseWriter, r *http.Request, queueName string, in sqsSendMessageInput) (*sqsQueueMeta, uint64, int64, bool) {
meta, readTS, apiErr := s.loadQueueMetaForSend(r.Context(), queueName, []byte(in.MessageBody))
if apiErr != nil {
writeSQSErrorFromErr(w, apiErr)
return
return nil, 0, 0, false
}
if !s.chargeQueueWithThrottle(w, queueName, bucketActionSend, 1, meta.Throttle, meta.Incarnation) {
return nil, 0, 0, false
}
if apiErr := validateMessageAttributes(in.MessageAttributes); apiErr != nil {
writeSQSErrorFromErr(w, apiErr)
return
return nil, 0, 0, false
}
if apiErr := validateSendFIFOParams(meta, in); apiErr != nil {
writeSQSErrorFromErr(w, apiErr)
return
return nil, 0, 0, false
}
delay, apiErr := resolveSendDelay(meta, in.DelaySeconds)
if apiErr != nil {
writeSQSErrorFromErr(w, apiErr)
return nil, 0, 0, false
}
return meta, readTS, delay, true
}

func (s *SQSServer) sendMessage(w http.ResponseWriter, r *http.Request) {
in, queueName, ok := s.prepareSendMessage(w, r)
if !ok {
return
}
meta, readTS, delay, ok := s.validateSend(w, r, queueName, in)
if !ok {
return
}
if meta.IsFIFO {
Expand Down Expand Up @@ -530,6 +564,13 @@ func (s *SQSServer) receiveMessage(w http.ResponseWriter, r *http.Request) {
writeSQSError(w, http.StatusBadRequest, sqsErrQueueDoesNotExist, "queue does not exist")
return
}
// Throttle check uses the loaded meta's throttle config so we
// don't pay an extra meta read just to discover throttling is
// off. Sits AFTER the QueueDoesNotExist branch — a missing queue
// should not consume a Recv token.
if !s.chargeQueueWithThrottle(w, queueName, bucketActionReceive, 1, meta.Throttle, meta.Incarnation) {
return
}
max, maxErr := resolveReceiveMaxMessages(in.MaxNumberOfMessages)
if maxErr != nil {
writeSQSErrorFromErr(w, maxErr)
Expand Down Expand Up @@ -1106,6 +1147,9 @@ func (s *SQSServer) deleteMessage(w http.ResponseWriter, r *http.Request) {
writeSQSErrorFromErr(w, err)
return
}
if !s.chargeQueue(w, r, queueName, bucketActionReceive, 1) {
return
}
if err := s.deleteMessageWithRetry(r.Context(), queueName, handle); err != nil {
writeSQSErrorFromErr(w, err)
return
Expand Down Expand Up @@ -1258,6 +1302,9 @@ func (s *SQSServer) changeMessageVisibility(w http.ResponseWriter, r *http.Reque
writeSQSErrorFromErr(w, err)
return
}
if !s.chargeQueue(w, r, queueName, bucketActionReceive, 1) {
return
}
if err := s.changeVisibilityWithRetry(r.Context(), queueName, handle, timeout); err != nil {
writeSQSErrorFromErr(w, err)
return
Expand Down
9 changes: 9 additions & 0 deletions adapter/sqs_messages_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ func (s *SQSServer) sendMessageBatch(w http.ResponseWriter, r *http.Request) {
"total batch payload exceeds 262144 bytes")
return
}
if !s.chargeQueue(w, r, queueName, bucketActionSend, throttleChargeCount(len(in.Entries))) {
return
}

successful, failed, err := s.sendMessageBatchWithRetry(r.Context(), queueName, in.Entries)
if err != nil {
Expand Down Expand Up @@ -453,6 +456,9 @@ func (s *SQSServer) deleteMessageBatch(w http.ResponseWriter, r *http.Request) {
writeSQSErrorFromErr(w, err)
return
}
if !s.chargeQueue(w, r, queueName, bucketActionReceive, throttleChargeCount(len(in.Entries))) {
return
}

successful := make([]sqsBatchResultEntry, 0, len(in.Entries))
failed := make([]sqsBatchResultErrorEntry, 0)
Expand Down Expand Up @@ -519,6 +525,9 @@ func (s *SQSServer) changeMessageVisibilityBatch(w http.ResponseWriter, r *http.
writeSQSErrorFromErr(w, err)
return
}
if !s.chargeQueue(w, r, queueName, bucketActionReceive, throttleChargeCount(len(in.Entries))) {
return
}

successful := make([]sqsBatchResultEntry, 0, len(in.Entries))
failed := make([]sqsBatchResultErrorEntry, 0)
Expand Down
Loading
Loading