Skip to content

Commit bc6dbd4

Browse files
authored
docs(sqs): proposals for Phase 3.C (throttling) + 3.D (split-queue FIFO) (#664)
## Summary **Docs-only PR.** Two design proposals for the remaining Phase 3 SQS items per [`docs/design/2026_04_24_proposed_sqs_compatible_adapter.md`](https://github.com/bootjp/elastickv/blob/main/docs/design/2026_04_24_proposed_sqs_compatible_adapter.md) Section 14 (Phase 3 bullets). Both items were explicitly called out as needing separate design docs before any implementation work; this PR lands those proposals so the implementation PRs have a reviewed architecture to build on. ### 3.C — Per-queue throttling and tenant fairness ([proposal](docs/design/2026_04_26_proposed_sqs_per_queue_throttling.md)) Per-queue token-bucket throttling configured on queue meta (no separate keyspace), evaluated at the SQS adapter layer on the leader (no Raft per request), surfaced as the AWS `Throttling` error envelope so SDK retry/backoff just works. Key decisions: - **Default-off**. Existing queues are unaffected; operators opt in per queue via `SetQueueAttributes`. - **Per-action buckets** (Send / Receive / Default) so a slow consumer cannot pin the producer. - **Per-leader buckets, no replication**. Worst case on failover: one extra burst on the new leader. Acceptable per AWS-equivalent behaviour at region failover boundaries; replicating would cost a Raft commit per `SendMessage`. - **Batch verbs charge by entry count**, not call count, with all-or-nothing rejection (matches AWS). - **Admin-only configuration plane**. Standard SQS clients see `InvalidAttributeName` on the `Throttle*` attributes (matches AWS behaviour for unknown attributes); the data-plane enforcement runs for everyone. ### 3.D — Split-queue FIFO (high-throughput FIFO) ([proposal](docs/design/2026_04_26_proposed_sqs_split_queue_fifo.md)) Per-`MessageGroupId` hash partitioning across multiple Raft groups, mirroring AWS High Throughput FIFO. Within-group ordering preserved; across-group throughput scales with the partition count. Key decisions: - **Existing single-partition FIFO queues stay byte-identical** (`PartitionCount = 0` path is the legacy layout; no migration runs implicitly). - **Power-of-two partition counts only** (1, 2, 4, 8, 16, 32) so the routing step is `hash & (N-1)` and future offline rebuilds stay tractable. - **Partition count is immutable after first SendMessage**. Live re-partitioning would break ordering for in-flight messages of every group whose hash bucket changed; out of scope. - **Multi-PR rollout plan** with an explicit "gate of no return" called out at PR 5 (the data-plane PR). PRs 1–4 are reversible no-ops on data layout; once a partitioned FIFO holds real data, rollback means draining and recreating the queue. - **FNV-1a hash** (deterministic across processes / Go versions / architectures). Risk of attacker-controlled `MessageGroupId` pinning all traffic to one partition is documented and accepted (the feature is for cooperative operators). ## Test plan - [x] Markdown renders correctly on GitHub (manually previewed). - [x] Cross-references resolve (the partial-doc rename in PR #659 is in flight; both proposals reference the partial filename — they will resolve once #659 merges, otherwise resolve to the current `_proposed_` filename for a 1-character mismatch that is fine to leave for now). - [x] No code changes; CI is irrelevant beyond the markdown lint pass. ## Self-review This is a docs-only PR; the 5-lens self-review collapses to: 1. **Data loss / Concurrency / Performance / Consistency** — N/A, no code touched. 2. **Test coverage** — N/A, no code touched. The proposals themselves include a Testing Strategy section (§6 / §9) so the implementation PRs have explicit acceptance criteria. ## Stacking This PR is **independent** of #650, #659, and #662. Branched from current `main`. Merge whenever ready — landing the proposal docs early lets reviewers comment on the architecture before the implementation PRs go up. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **Documentation** * Added detailed designs for optional per-queue token-bucket throttling (rules, batch semantics, AWS-shaped Throttling errors with Retry-After, metrics, rollout default-off) and for HT‑FIFO “split-queue” partitioning (routing, immutability, gating/rollout). * **New Features** * Optional queue-level send/receive throttling with batch-aware charging and Retry-After; HT‑FIFO partitioning with deterministic partition routing and immutable partition attributes. * **Tests** * Extensive unit and integration tests for throttling, HT‑FIFO partitioning, attribute validation, immutability, idempotency, and cache invalidation. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
2 parents 67f40c5 + 213ded4 commit bc6dbd4

15 files changed

Lines changed: 4304 additions & 39 deletions

adapter/sqs.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"io"
66
"net"
77
"net/http"
8+
"strconv"
89
"time"
910

1011
"github.com/bootjp/elastickv/kv"
@@ -55,6 +56,11 @@ const (
5556
sqsErrInternalFailure = "InternalFailure"
5657
sqsErrServiceUnavailable = "ServiceUnavailable"
5758
sqsErrMalformedRequest = "MalformedQueryString"
59+
// sqsErrThrottling is the per-queue rate-limit rejection code.
60+
// Returned with HTTP 400 and a Retry-After header derived from the
61+
// bucket's refillRate + the request's charge count (see
62+
// computeRetryAfter in sqs_throttle.go for the formula).
63+
sqsErrThrottling = "Throttling"
5864
)
5965

6066
type SQSServerOption func(*SQSServer)
@@ -76,6 +82,11 @@ type SQSServer struct {
7682
// goroutines without ordering between them.
7783
reaperCtx context.Context
7884
reaperCancel context.CancelFunc
85+
// throttle is the per-queue rate-limit bucket store. Always
86+
// non-nil; charge() short-circuits when the queue's meta has no
87+
// throttle config so unconfigured queues pay one nil-check per
88+
// request and nothing else (see sqs_throttle.go).
89+
throttle *bucketStore
7990
}
8091

8192
// WithSQSLeaderMap configures the Raft-address-to-SQS-address mapping used to
@@ -98,6 +109,7 @@ func NewSQSServer(listen net.Listener, st store.MVCCStore, coordinate kv.Coordin
98109
coordinator: coordinate,
99110
reaperCtx: reaperCtx,
100111
reaperCancel: reaperCancel,
112+
throttle: newBucketStoreDefault(),
101113
}
102114
s.targetHandlers = map[string]func(http.ResponseWriter, *http.Request){
103115
sqsCreateQueueTarget: s.createQueue,
@@ -131,6 +143,10 @@ func NewSQSServer(listen net.Listener, st store.MVCCStore, coordinate kv.Coordin
131143

132144
func (s *SQSServer) Run() error {
133145
s.startReaper(s.reaperCtx)
146+
// Throttle bucket idle-evict runs on a background ticker so the
147+
// request hot path never pays the O(N) sweep cost. Cleaned up by
148+
// the same reaperCtx cancellation that stops the message reaper.
149+
go s.throttle.runSweepLoop(s.reaperCtx)
134150
if err := s.httpServer.Serve(s.listen); err != nil && !errors.Is(err, http.ErrServerClosed) {
135151
return errors.WithStack(err)
136152
}
@@ -302,3 +318,19 @@ func writeSQSError(w http.ResponseWriter, status int, code string, message strin
302318
w.WriteHeader(status)
303319
_ = json.NewEncoder(w).Encode(resp)
304320
}
321+
322+
// writeSQSThrottlingError emits the rate-limit rejection envelope: 400
323+
// + the AWS-shaped JSON error body + a Retry-After header carrying
324+
// the integer-second wait derived from the bucket's refill rate and
325+
// the request's charge count. The action argument is the bucket-action
326+
// vocabulary ("Send" | "Receive" | "*") so the operator-visible
327+
// message names the bucket that ran out, not just the queue.
328+
func writeSQSThrottlingError(w http.ResponseWriter, queue, action string, retryAfter time.Duration) {
329+
if retryAfter < time.Second {
330+
retryAfter = time.Second
331+
}
332+
secs := int(retryAfter / time.Second)
333+
w.Header().Set("Retry-After", strconv.Itoa(secs))
334+
writeSQSError(w, http.StatusBadRequest, sqsErrThrottling,
335+
"Rate exceeded for queue '"+queue+"' action '"+action+"'")
336+
}

adapter/sqs_catalog.go

Lines changed: 515 additions & 26 deletions
Large diffs are not rendered by default.

adapter/sqs_catalog_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,29 @@ func TestSQSServer_CatalogCreateIsIdempotent(t *testing.T) {
136136
if got, _ := out3["__type"].(string); got != sqsErrQueueNameExists {
137137
t.Fatalf("differing-attrs error type: got %q want %q", got, sqsErrQueueNameExists)
138138
}
139+
140+
// Fourth call: same name, same non-throttle attrs as the original
141+
// create, but different Throttle* values. The original create had
142+
// no Throttle config; this one adds one. throttleConfigEqual must
143+
// notice the diff and the call must reject as QueueNameExists.
144+
// Without this case a bug in throttleConfigEqual (e.g. always
145+
// returning true) would slip past the existing VisibilityTimeout-
146+
// only test.
147+
withThrottle := map[string]any{
148+
"QueueName": "idempotent",
149+
"Attributes": map[string]string{
150+
"VisibilityTimeout": "60",
151+
"ThrottleSendCapacity": "10",
152+
"ThrottleSendRefillPerSecond": "1",
153+
},
154+
}
155+
status4, out4 := callSQS(t, node, sqsCreateQueueTarget, withThrottle)
156+
if status4 != http.StatusBadRequest {
157+
t.Fatalf("re-create with added Throttle*: got %d want 400; body %v", status4, out4)
158+
}
159+
if got, _ := out4["__type"].(string); got != sqsErrQueueNameExists {
160+
t.Fatalf("Throttle*-diff error type: got %q want %q", got, sqsErrQueueNameExists)
161+
}
139162
}
140163

141164
func TestSQSServer_CatalogGetAndSetAttributes(t *testing.T) {

adapter/sqs_messages.go

Lines changed: 53 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -294,33 +294,67 @@ type sqsChangeVisibilityInput struct {
294294

295295
// ------------------------ handlers ------------------------
296296

297-
func (s *SQSServer) sendMessage(w http.ResponseWriter, r *http.Request) {
297+
// prepareSendMessage decodes the SendMessage payload and resolves
298+
// the queue name. Throttle charging happens after the meta load in
299+
// validateSend so we don't pay an extra meta read just to discover
300+
// throttling is off.
301+
func (s *SQSServer) prepareSendMessage(w http.ResponseWriter, r *http.Request) (sqsSendMessageInput, string, bool) {
298302
var in sqsSendMessageInput
299303
if err := decodeSQSJSONInput(r, &in); err != nil {
300304
writeSQSErrorFromErr(w, err)
301-
return
305+
return in, "", false
302306
}
303307
queueName, err := queueNameFromURL(in.QueueUrl)
304308
if err != nil {
305309
writeSQSErrorFromErr(w, err)
306-
return
310+
return in, "", false
307311
}
312+
return in, queueName, true
313+
}
314+
315+
// validateSend loads queue meta, runs the throttle charge against
316+
// the loaded throttle config (no extra meta read), then validates
317+
// message attributes / FIFO params and resolves the delay. Returns
318+
// ok=false if any step has already written the error response.
319+
//
320+
// Throttle check sits AFTER the meta load (so we have the throttle
321+
// config) and AFTER the QueueDoesNotExist branch (so a missing
322+
// queue is reported as 400 QueueDoesNotExist, not as a Throttling
323+
// 400 against a non-existent bucket). It still sits OUTSIDE the
324+
// OCC transaction (§4.2): a rejected request never reaches the
325+
// coordinator.
326+
func (s *SQSServer) validateSend(w http.ResponseWriter, r *http.Request, queueName string, in sqsSendMessageInput) (*sqsQueueMeta, uint64, int64, bool) {
308327
meta, readTS, apiErr := s.loadQueueMetaForSend(r.Context(), queueName, []byte(in.MessageBody))
309328
if apiErr != nil {
310329
writeSQSErrorFromErr(w, apiErr)
311-
return
330+
return nil, 0, 0, false
331+
}
332+
if !s.chargeQueueWithThrottle(w, queueName, bucketActionSend, 1, meta.Throttle, meta.Incarnation) {
333+
return nil, 0, 0, false
312334
}
313335
if apiErr := validateMessageAttributes(in.MessageAttributes); apiErr != nil {
314336
writeSQSErrorFromErr(w, apiErr)
315-
return
337+
return nil, 0, 0, false
316338
}
317339
if apiErr := validateSendFIFOParams(meta, in); apiErr != nil {
318340
writeSQSErrorFromErr(w, apiErr)
319-
return
341+
return nil, 0, 0, false
320342
}
321343
delay, apiErr := resolveSendDelay(meta, in.DelaySeconds)
322344
if apiErr != nil {
323345
writeSQSErrorFromErr(w, apiErr)
346+
return nil, 0, 0, false
347+
}
348+
return meta, readTS, delay, true
349+
}
350+
351+
func (s *SQSServer) sendMessage(w http.ResponseWriter, r *http.Request) {
352+
in, queueName, ok := s.prepareSendMessage(w, r)
353+
if !ok {
354+
return
355+
}
356+
meta, readTS, delay, ok := s.validateSend(w, r, queueName, in)
357+
if !ok {
324358
return
325359
}
326360
if meta.IsFIFO {
@@ -530,6 +564,13 @@ func (s *SQSServer) receiveMessage(w http.ResponseWriter, r *http.Request) {
530564
writeSQSError(w, http.StatusBadRequest, sqsErrQueueDoesNotExist, "queue does not exist")
531565
return
532566
}
567+
// Throttle check uses the loaded meta's throttle config so we
568+
// don't pay an extra meta read just to discover throttling is
569+
// off. Sits AFTER the QueueDoesNotExist branch — a missing queue
570+
// should not consume a Recv token.
571+
if !s.chargeQueueWithThrottle(w, queueName, bucketActionReceive, 1, meta.Throttle, meta.Incarnation) {
572+
return
573+
}
533574
max, maxErr := resolveReceiveMaxMessages(in.MaxNumberOfMessages)
534575
if maxErr != nil {
535576
writeSQSErrorFromErr(w, maxErr)
@@ -1106,6 +1147,9 @@ func (s *SQSServer) deleteMessage(w http.ResponseWriter, r *http.Request) {
11061147
writeSQSErrorFromErr(w, err)
11071148
return
11081149
}
1150+
if !s.chargeQueue(w, r, queueName, bucketActionReceive, 1) {
1151+
return
1152+
}
11091153
if err := s.deleteMessageWithRetry(r.Context(), queueName, handle); err != nil {
11101154
writeSQSErrorFromErr(w, err)
11111155
return
@@ -1258,6 +1302,9 @@ func (s *SQSServer) changeMessageVisibility(w http.ResponseWriter, r *http.Reque
12581302
writeSQSErrorFromErr(w, err)
12591303
return
12601304
}
1305+
if !s.chargeQueue(w, r, queueName, bucketActionReceive, 1) {
1306+
return
1307+
}
12611308
if err := s.changeVisibilityWithRetry(r.Context(), queueName, handle, timeout); err != nil {
12621309
writeSQSErrorFromErr(w, err)
12631310
return

adapter/sqs_messages_batch.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,9 @@ func (s *SQSServer) sendMessageBatch(w http.ResponseWriter, r *http.Request) {
9494
"total batch payload exceeds 262144 bytes")
9595
return
9696
}
97+
if !s.chargeQueue(w, r, queueName, bucketActionSend, throttleChargeCount(len(in.Entries))) {
98+
return
99+
}
97100

98101
successful, failed, err := s.sendMessageBatchWithRetry(r.Context(), queueName, in.Entries)
99102
if err != nil {
@@ -453,6 +456,9 @@ func (s *SQSServer) deleteMessageBatch(w http.ResponseWriter, r *http.Request) {
453456
writeSQSErrorFromErr(w, err)
454457
return
455458
}
459+
if !s.chargeQueue(w, r, queueName, bucketActionReceive, throttleChargeCount(len(in.Entries))) {
460+
return
461+
}
456462

457463
successful := make([]sqsBatchResultEntry, 0, len(in.Entries))
458464
failed := make([]sqsBatchResultErrorEntry, 0)
@@ -519,6 +525,9 @@ func (s *SQSServer) changeMessageVisibilityBatch(w http.ResponseWriter, r *http.
519525
writeSQSErrorFromErr(w, err)
520526
return
521527
}
528+
if !s.chargeQueue(w, r, queueName, bucketActionReceive, throttleChargeCount(len(in.Entries))) {
529+
return
530+
}
522531

523532
successful := make([]sqsBatchResultEntry, 0, len(in.Entries))
524533
failed := make([]sqsBatchResultErrorEntry, 0)

0 commit comments

Comments
 (0)