Skip to content

feat(sqs): Phase 3.B — XML query-protocol support (3-verb proof)#662

Merged
bootjp merged 2 commits intomainfrom
feat/sqs-query-protocol
Apr 26, 2026
Merged

feat(sqs): Phase 3.B — XML query-protocol support (3-verb proof)#662
bootjp merged 2 commits intomainfrom
feat/sqs-query-protocol

Conversation

@bootjp
Copy link
Copy Markdown
Owner

@bootjp bootjp commented Apr 26, 2026

Summary

Phase 3.B of docs/design/2026_04_24_partial_sqs_compatible_adapter.md §16.4. Adds the AWS SQS query protocol (form-encoded request, XML response) alongside the existing JSON protocol on the same listener — older aws-sdk-java v1, boto < 1.34, and AWS CLI clients can now talk to elastickv without modification. Detection happens per-request via Content-Type / X-Amz-Target / Action presence; no flag, no separate port. See the new proposal doc committed alongside.

This is an architectural proof PR: dispatch + decoding + encoding + error envelope + the *Core refactor pattern are all in place, with three verbs wired end-to-end as concrete proof. Each follow-up verb is a parser + response struct + one switch arm — no further design work needed.

Verbs in this PR

Verb Why it's in the proof set
CreateQueue Exercises the Attribute.N.{Name,Value} indexed-collection parser.
ListQueues Exercises the repeated-element XML response shape.
GetQueueUrl Exercises the <ErrorResponse> envelope path via QueueDoesNotExist.

Every other verb returns a structured 501 NotImplementedYet XML envelope so operators see the gap explicitly. SendMessage / ReceiveMessage / DeleteMessage are the highest-priority follow-ups (they need the same *Core refactor on the FIFO send loop).

Key design points

  • No new listener / no flag. pickSqsProtocol(*http.Request) decides per request. JSON and Query share the SQS port and the SigV4 path.
  • Wire-format-free cores. createQueue / listQueues / getQueueUrl are now decode → core → encode with core(ctx, in) (out, error). The JSON wrappers are unchanged in behavior; existing JSON tests pass without modification.
  • DoS protection inherited. Body read is bounded by the same sqsMaxRequestBodyBytes the JSON path uses.
  • SigV4 unchanged. The signed canonical request includes the form-encoded body, so the existing SigV4 middleware verifies query requests without code changes.
  • Error parity. <Code> reuses the existing sqsErr* constants. HTTP status mirrors what the JSON path returns, so SDK retry classifiers work across protocols.
  • Cyclomatic budget honoured. handle() was refactored to extract handleQueryProtocolcyclop ≤ 10 per project rules, no //nolint.

Known limitation (design §11.4)

proxyToLeader's error writer always emits the JSON envelope, so a query-protocol client hitting a follower during a leader flip sees one JSON error before retry lands on the new leader. Follow-up PR threads the detected protocol onto the request context so the proxy emits matching XML.

Test plan

  • go build ./... — clean
  • go test -count=1 -race -run "TestSQS|QueryProtocol|TestPickSqs|TestCollectIndexedKV|TestWriteSQSQueryError" ./adapter/ — passes
  • golangci-lint run ./adapter/...0 issues.
  • pickSqsProtocol table tests cover documented edge cases (header precedence, charset suffix, GET with Action, missing Action, nil request).
  • collectIndexedKVPairs tests cover happy path, orphan Name, empty input, unrelated prefix.
  • End-to-end via the in-process listener: CreateQueue / ListQueues / GetQueueUrl round-trip on the query side.
  • Cross-protocol parity: a queue created via Query is visible via JSON GetQueueUrl with the same URL.
  • Error envelope: 4xx maps to <Type>Sender</Type>, 5xx to <Type>Receiver</Type>, namespace pinned, x-amzn-ErrorType header set.
  • Unknown verb returns 501 with the NotImplementedYet XML envelope.
  • Missing Action parameter returns 400 (per design §3).

Self-review (5 lenses)

  1. Data loss — Wire-format change only. Cores are byte-for-byte identical to the previous handler bodies; no Raft / OCC / MVCC code is touched.
  2. Concurrency — No new shared state. Detection is request-local. Body parsing is bounded.
  3. Performance — One additional Content-Type string compare per request on the dispatch hot path. Negligible.
  4. Data consistency*Core returns the same business-logic outputs as before; the JSON tests are the regression net for parity. Cross-protocol parity test pins behaviour.
  5. Test coverage — 10 new test cases cover detection, parsing, envelope shape, and three end-to-end verbs. Existing TestSQS* race suite passes on the refactor.

Stacking

This PR is independent — branched from current main (which has #638 + #649 merged). It does not depend on PR #650 / PR #659. Merge whenever ready.

Adds the AWS SQS query protocol (form-encoded request, XML response)
alongside the existing JSON protocol on the same listener. Detected
per-request via Content-Type / X-Amz-Target / Action presence; no
flag, no separate port. See docs/design/2026_04_26_proposed_sqs_query_protocol.md.

Wire-format wrappers reuse existing handler logic via a small
*Core(ctx, in) (out, error) refactor on the three verbs in this PR:

  - createQueueCore     in adapter/sqs_catalog.go
  - listQueuesCore      in adapter/sqs_catalog.go
  - getQueueUrlCore     in adapter/sqs_catalog.go

The JSON wrappers above the cores are unchanged in behaviour (same
input decode -> core -> response write); the existing JSON tests
pass without modification. The query-protocol wrappers in
adapter/sqs_query_protocol.go are the new code.

Verbs in this proof PR:

  - CreateQueue   (Attribute.N parser, single-leaf XML response)
  - ListQueues    (repeated <QueueUrl> XML response)
  - GetQueueUrl   (error envelope path via QueueDoesNotExist)

Every other verb returns 501 NotImplementedYet from the query path
with a structured XML error envelope so operators see the gap
explicitly. Adding each follow-up verb is a parser + response
struct + one switch arm.

Detection rules (pickSqsProtocol):

  - X-Amz-Target header set                            -> JSON
  - Content-Type starts with form-urlencoded           -> Query
  - GET with ?Action=...                               -> Query
  - Content-Type starts with x-amz-json-1.0            -> JSON
  - everything else                                    -> JSON-style 400

Bound by the existing sqsMaxRequestBodyBytes (DoS protection
inherited). Authorisation goes through the same SigV4 path on the
same canonical request — the codec change does not affect signing.

Known limitation (recorded in design §11.4): proxyToLeader's error
writer always emits the JSON envelope, so a query-protocol client
hitting a follower during a leader flip sees one JSON error before
the next request lands on the new leader. Follow-up PR threads the
detected protocol onto the request context.

Verified with:

  - go build ./...
  - go test -count=1 -race -run "TestSQS|QueryProtocol|TestPickSqs|TestCollectIndexedKV|TestWriteSQSQueryError" ./adapter/
  - golangci-lint run ./adapter/...

The handle() dispatcher was refactored to extract handleQueryProtocol
so cyclomatic complexity stays at 10 even as new wire formats land.
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 26, 2026

Warning

Rate limit exceeded

@bootjp has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 55 minutes and 7 seconds before requesting another review.

Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 55 minutes and 7 seconds.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 449e3719-3496-469a-a044-ae3654e918bd

📥 Commits

Reviewing files that changed from the base of the PR and between bf67fa5 and 459242a.

📒 Files selected for processing (5)
  • adapter/sqs.go
  • adapter/sqs_catalog.go
  • adapter/sqs_query_protocol.go
  • adapter/sqs_query_protocol_test.go
  • docs/design/2026_04_26_proposed_sqs_query_protocol.md
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/sqs-query-protocol

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces support for the SQS query protocol, enabling form-encoded requests and XML responses. It refactors the core logic for queue creation, listing, and URL retrieval into wire-format-free methods to facilitate reuse between the JSON and query protocols. Review feedback identifies several improvement opportunities, including optimizing the handling of URL query parameters, correcting the indexed key-value pair parser to support different suffixes for Attributes and Tags, and resolving a discrepancy between the implementation and documentation regarding the Request ID format.

Comment thread adapter/sqs_query_protocol.go Outdated
Comment on lines +141 to +144
values := url.Values{}
for k, vs := range r.URL.Query() {
values[k] = append(values[k], vs...)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

Optimization: r.URL.Query() returns a new url.Values map on each call. You can use it directly as the base for values instead of manually copying its entries into a new map. Additionally, ensure that the request body size is limited (e.g., using http.MaxBytesReader) to prevent Denial of Service attacks from oversized payloads.

Suggested change
values := url.Values{}
for k, vs := range r.URL.Query() {
values[k] = append(values[k], vs...)
}
values := r.URL.Query()
References
  1. When handling HTTP requests, always limit the size of the request body (e.g., using http.MaxBytesReader) to prevent Denial of Service attacks from oversized payloads.

Comment thread adapter/sqs_query_protocol.go Outdated
Comment on lines +221 to +224
in.Attributes = collectIndexedKVPairs(form, "Attribute")
// Tags follow the same indexed-pair shape as Attributes but with
// a different prefix per the AWS CreateQueue reference.
in.Tags = collectIndexedKVPairs(form, "Tag")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The collectIndexedKVPairs helper needs to distinguish between the .Name suffix used for Attributes and the .Key suffix used for Tags in the SQS Query protocol. This change updates the call sites to pass the correct suffix.

Suggested change
in.Attributes = collectIndexedKVPairs(form, "Attribute")
// Tags follow the same indexed-pair shape as Attributes but with
// a different prefix per the AWS CreateQueue reference.
in.Tags = collectIndexedKVPairs(form, "Tag")
in.Attributes = collectIndexedKVPairs(form, "Attribute", ".Name")
in.Tags = collectIndexedKVPairs(form, "Tag", ".Key")

Comment on lines +251 to +292
// collectIndexedKVPairs reads AWS-style indexed pairs of the form
//
// <prefix>.1.Name = key1
// <prefix>.1.Value = value1
// <prefix>.2.Name = key2
// <prefix>.2.Value = value2
//
// and returns them as a map. Pairs missing either side are dropped
// silently (AWS does the same — the validation that follows in the
// core handler reports the actual problem). Keys are sorted by
// their integer suffix so a caller that emits pairs out of order
// gets a deterministic map; map iteration order in Go is randomised
// so we don't actually depend on input order, but the sort is cheap
// and keeps the function easy to test by inspection.
func collectIndexedKVPairs(form url.Values, prefix string) map[string]string {
if len(form) == 0 {
return nil
}
const nameSuffix = ".Name"
wantPrefix := prefix + "."
out := map[string]string{}
for k, vs := range form {
if !strings.HasPrefix(k, wantPrefix) || !strings.HasSuffix(k, nameSuffix) {
continue
}
if len(vs) == 0 || vs[0] == "" {
continue
}
// k looks like "Attribute.1.Name"; the matching value key is
// "Attribute.1.Value".
valueKey := strings.TrimSuffix(k, nameSuffix) + ".Value"
valueVs, ok := form[valueKey]
if !ok || len(valueVs) == 0 {
continue
}
out[vs[0]] = valueVs[0]
}
if len(out) == 0 {
return nil
}
return out
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The implementation of collectIndexedKVPairs does not actually sort the keys as described in the comment (lines 260-264). Furthermore, the helper is hardcoded to the .Name suffix, which is incorrect for SQS Tags (which use .Key). I've updated the signature to accept the key suffix and removed the misleading comment about sorting, as map iteration order is non-deterministic anyway and the core logic already uses maps.

// collectIndexedKVPairs reads AWS-style indexed pairs of the form
//
//	<prefix>.1.<keySuffix> = key1
//	<prefix>.1.Value = value1
//	<prefix>.2.<keySuffix> = key2
//	<prefix>.2.Value = value2
//
// and returns them as a map. Pairs missing either side are dropped
// silently (AWS does the same — the validation that follows in the
// core handler reports the actual problem).
func collectIndexedKVPairs(form url.Values, prefix string, keySuffix string) map[string]string {
	if len(form) == 0 {
		return nil
	}
	wantPrefix := prefix + "."
	out := map[string]string{}
	for k, vs := range form {
		if !strings.HasPrefix(k, wantPrefix) || !strings.HasSuffix(k, keySuffix) {
			continue
		}
		if len(vs) == 0 || vs[0] == "" {
			continue
		}
		// k looks like "Attribute.1.Name" or "Tag.1.Key"; the matching
		// value key is always suffixed with ".Value".
		valueKey := strings.TrimSuffix(k, keySuffix) + ".Value"
		valueVs, ok := form[valueKey]
		if !ok || len(valueVs) == 0 {
			continue
		}
		out[vs[0]] = valueVs[0]
	}
	if len(out) == 0 {
		return nil
	}
	return out
}
References
  1. Centralize common complex logic, such as retry/backoff mechanisms, into shared helper functions to avoid code duplication and improve maintainability.

Comment thread adapter/sqs_query_protocol.go Outdated
Comment on lines +442 to +443
// shape keeps client logs / support workflows predictable.
func newQueryRequestID() string {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There is a mismatch between the comment and the implementation: 16 random bytes encoded with Base32 (5 bits per char) results in 26 characters (128/5 = 25.6), not 22. If a 22-character ID is required to match AWS's shape, you should use unpadded Base64 (128/6 = 21.33).

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: db3df09dd9

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread adapter/sqs_query_protocol.go Outdated
Comment on lines +222 to +224
// Tags follow the same indexed-pair shape as Attributes but with
// a different prefix per the AWS CreateQueue reference.
in.Tags = collectIndexedKVPairs(form, "Tag")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Parse query CreateQueue tags with AWS key/value fields

The query CreateQueue parser routes tags through collectIndexedKVPairs, which expects *.Name/*.Value pairs, but AWS query requests encode tags as key/value fields (for example Tag.Key and Tag.Value). In practice this drops user-supplied tags before createQueueCore is called, so queues created over query protocol lose tags and diverge from JSON-path behavior.

Useful? React with 👍 / 👎.

Comment thread adapter/sqs_query_protocol.go Outdated
if !ok || len(valueVs) == 0 {
continue
}
out[vs[0]] = valueVs[0]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Make indexed pair parsing deterministic for duplicate names

This assignment happens during iteration over form, which is a Go map, so duplicate logical keys (e.g., two Attribute.N.Name entries both naming DelaySeconds with different values) are resolved by whichever map entry is visited last. That makes the resulting attribute/tag map nondeterministic across requests; the parser should either reject duplicate names or apply a deterministic index-based rule.

Useful? React with 👍 / 👎.

…questId length

Address review feedback on PR #662:

1. Tags use .Key/.Value, not .Name/.Value (CodexP1 + Gemini high).
   The AWS SQS query protocol encodes Tags as Tag.N.Key=foo and
   Tag.N.Value=bar, but collectIndexedKVPairs hardcoded .Name. The
   query CreateQueue tag path silently dropped every tag. Made the
   key-field suffix a parameter so Attributes pass "Name" and Tags
   pass "Key", matching the AWS reference.

2. Indexed pair iteration is now deterministic (CodexP2 + Gemini high).
   The previous implementation iterated form (a Go map) so duplicate
   logical keys resolved randomly between calls. Pairs are now
   gathered, sorted by integer index, and flattened lower-index-wins.
   Comment block updated to actually describe the implementation.

3. RequestId is now 22 chars, matching the AWS shape and the
   function's own doc (Gemini medium). Previous 16-byte source
   produced 26 chars. New 14-byte source produces 23 chars trimmed
   to 22.

4. readQueryForm uses r.URL.Query() directly as the base map
   instead of copying entries one-by-one (Gemini high). Saves the
   per-request copy loop on every query-protocol request.

Drive-by refactor: collectIndexedKVPairs split into gather +
flatten halves with a small indexedPairKeyToIdx helper so the
function stays under cyclop=10 even with the new sort + dedup
logic. Per-PR-rule: no //nolint.

Three new tests:
  - TestCollectIndexedKVPairs_TagSuffix pins .Key vs .Name.
  - TestCollectIndexedKVPairs_DeterministicOnDuplicates pins lower-
    index-wins under 64 iterations (catches map randomness).
  - TestNewQueryRequestID_Length pins the AWS 22-char shape.

Verified with go test -race ./adapter/... and golangci-lint
(0 issues, no //nolint).
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented Apr 26, 2026

Pushed addressing the converging Gemini + Codex findings:

Bugs fixed:

  • Tags now parsed via .Key/.Value (was .Name/.Value) — CodexP1 + Gemini high. Query CreateQueue with tags previously silently dropped them.
  • Indexed pair iteration now deterministic: pairs gathered, sorted by integer index, lower-index-wins on duplicates — CodexP2 + Gemini high.
  • RequestId now actually 22 chars (was 26) — Gemini medium. 14 random bytes -> base32 no-pad -> trim.

Drive-by:

  • readQueryForm adopts r.URL.Query() directly instead of copying — Gemini high.

Refactor:

  • collectIndexedKVPairs split into gather + flatten halves with an indexedPairKeyToIdx helper to keep cyclop ≤ 10 (no //nolint).

3 new tests pin the fixes (Tag suffix, duplicate-key determinism over 64 iters, RequestId length). go test -race clean, golangci-lint run 0 issues.

@claude review

@chatgpt-codex-connector
Copy link
Copy Markdown

You have reached your Codex usage limits for code reviews. You can see your limits in the Codex usage dashboard.
To continue using code reviews, you can upgrade your account or add credits to your account and enable them for code reviews in your settings.

@claude
Copy link
Copy Markdown

claude Bot commented Apr 26, 2026

Claude finished @bootjp's task in 2m 12s —— View job


Review of fix(sqs/query): addressing Gemini + Codex findings

Todo list:

  • Gather context from trigger comment
  • Read current state of changed files
  • Review Tag suffix fix (.Key vs .Name)
  • Review deterministic indexed pair iteration
  • Review RequestId length fix (22 chars)
  • Review readQueryForm optimization
  • Review collectIndexedKVPairs gather+flatten refactor
  • Check for any remaining issues
  • Post comprehensive review

All four Gemini/Codex findings are correctly addressed. Notes per finding, then a few minor follow-up observations.

Fix 1 — Tag suffix .Key vs .Name

collectIndexedKVPairs now takes a keyField string parameter; parseQueryCreateQueue passes "Name" for Attributes and "Key" for Tags (sqs_query_protocol.go:230-236). TestCollectIndexedKVPairs_TagSuffix confirms that Tag.N.Name entries are rejected by the "Key" path, which is the right regression pin.

One correctness note: the test checks that Tag.3.Name/Tag.3.Value is not picked up when calling collectIndexedKVPairs(form, "Tag", "Key"). That is correct because strings.HasSuffix("Tag.3.Name", ".Key") is false. ✔

Fix 2 — Deterministic pair iteration ✅

The gather → sort → lower-index-wins flatten pattern in collectIndexedKVPairs / gatherIndexedKVPairs / indexedPairKeyToIdx (lines 285–359) is correct. The TestCollectIndexedKVPairs_DeterministicOnDuplicates test runs 64 iterations on a form where index 2 and index 5 both map to "VisibilityTimeout" — index 2 must win every time.

The indexedPairKeyToIdx helper correctly rejects non-integer N segments (e.g. "Attribute.foo.Name"strconv.Atoi("foo") → false), and multi-dot segments like "Attribute.1.2.Name" → idxStr = "1.2" → Atoi error → false. Edge cases are handled properly.

Fix 3 — RequestId length 22 chars ✅

14 bytes × 8 bits = 112 bits. Base32 at 5 bits/char = ⌈112/5⌉ = 23 raw chars (no padding). Trim to 22. The math checks out: 22 chars × 5 bits = 110 bits of addressable space, which is more than UUID-v4's 122 bits of randomness in a shorter representation — entropy is fine. TestNewQueryRequestID_Length pins this over 64 iterations.

One note: base32.StdEncoding produces uppercase A–Z + 27. AWS's real SQS RequestId for the query protocol is typically a lowercase UUID (e.g. f5d7-...). This diverges from AWS's wire shape but is not observable to most SDKs — they echo RequestId back in support calls but don't parse its structure. Fine for a proof PR; could be noted as a follow-up if strict wire compatibility is needed.

Fix 4 — r.URL.Query() optimization ✅

values := r.URL.Query() replaces the manual copy loop (line 153). The returned url.Values is already a fresh map, so subsequent mutation (values[k] = append(...) for the body merge) is safe. ✔

Refactor — gather + flatten split ✅

Splitting collectIndexedKVPairs into three functions keeps each well under cyclop≤10. The indexedKVPair struct (line 313) is minimal and unexported — correct scope. The shape is clean for the follow-up verbs that will reuse the same parser.


Minor observations (non-blocking)

1. make([]indexedKVPair, 0) in gatherIndexedKVPairs (line 326)

var pairs []indexedKVPair is idiomatic Go for a nil-initialised slice and avoids a zero-length allocation. Both are correct; this is cosmetic. Fix this →

2. Negative index accepted silently

indexedPairKeyToIdx uses strconv.Atoi, which accepts negative numbers. A key like Attribute.-1.Name would produce idx = -1. After sorting, a negative index wins over any positive one. AWS doesn't allow negative indices, but in practice no real SDK emits them. Acceptable for now; worth rejecting with idx < 1 if strict spec compliance is needed later.

3. Missing Action emits JSON, not XML

handleQuery calls writeSQSError (JSON envelope) when Action is empty (line 117). This is documented as intentional per design §3 and tested in TestSQSServer_QueryProtocol_MissingActionReturns400. Just confirming it's deliberate — a query-protocol client that omits Action is badly misconfigured anyway and won't retry on a 400.

4. proxyToLeader JSON-only error on leader flip

Still present as documented in the PR's "Known limitation" section — the follow-up PR wiring the detected protocol onto the request context is the right venue. No action needed here.


Overall: the fixes are correct and the test coverage squarely pins each regression vector. The code is ready to merge on the fix commit. The four observations above are cosmetic or future-work items that don't block this PR.

@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented Apr 26, 2026

Thanks for the thorough re-review on 459242a2. With your "code is ready to merge" verdict and CI green, this PR has reached its exit condition. The four minor non-blocking observations you noted (cosmetic make([]…, 0)var, accepted negative indices, intentional JSON envelope on missing Action, proxy JSON-only error in the leader-flip window) all match either project conventions or documented design decisions; recording them as follow-up work in the partial doc rather than amending this proof PR.

Please merge at your convenience.

@bootjp bootjp merged commit 53c687d into main Apr 26, 2026
9 checks passed
@bootjp bootjp deleted the feat/sqs-query-protocol branch April 26, 2026 13:42
bootjp added a commit that referenced this pull request Apr 27, 2026
## Summary

Phase 3.C implementation: per-queue rate limiting via token-bucket
throttling. Implements the design from
[`docs/design/2026_04_26_proposed_sqs_per_queue_throttling.md`](https://github.com/bootjp/elastickv/blob/docs/sqs-phase3-proposals/docs/design/2026_04_26_proposed_sqs_per_queue_throttling.md)
(currently on PR #664).

- Token-bucket store on `*SQSServer`: `sync.Map[bucketKey]*tokenBucket`,
per-bucket `sync.Mutex` so cross-queue traffic never serialises on a
process-wide lock. Lazy idle-evict (1h) bounds memory.
- New `sqsQueueMeta.Throttle *sqsQueueThrottle` field with six float64
sub-fields (Send/Recv/Default × Capacity/RefillPerSecond), wired through
the existing `sqsAttributeAppliers` dispatch as `ThrottleSendCapacity` /
`ThrottleSendRefillPerSecond` / etc.
- Validator (`validateThrottleConfig`) enforces the §3.2 cross-attribute
rules: each (capacity, refill) pair both-zero or both-positive; capacity
≥ refill; Send/Recv capacities ≥ 10 (the SendMessageBatch /
DeleteMessageBatch max charge); per-field hard ceiling 100k.
- All seven message-plane handlers wired (SendMessage[Batch],
ReceiveMessage, DeleteMessage[Batch], ChangeMessageVisibility[Batch]).
Throttle check sits **outside** the OCC retry loop per §4.2 — a rejected
request never reaches the coordinator.
- On rejection: `400 Throttling` with `x-amzn-ErrorType` + AWS-shaped
JSON envelope + `Retry-After` header computed from the §3.4 formula
(numerator is requested count, not 1, so a batch verb does not get told
to retry in 1s when it really needs 10s).
- Cache invalidation on `SetQueueAttributes` and `DeleteQueue` (§3.1):
drops every bucket belonging to the queue after the Raft commit so new
limits take effect on the very next request, not after the 1h idle-evict
sweep.
- `GetQueueAttributes("All")` round-trips Throttle* fields so SDKs can
confirm the config landed.

## Test plan

- [x] `adapter/sqs_throttle_test.go` — 18 unit tests:
- bucket math: fresh capacity, refill elapsed, refill cap-at-capacity,
batch reject preserves partial credit, Retry-After uses requested count,
sub-1-RPS floor;
- isolation: per-action, per-queue, Default-fallthrough shares one
bucket;
  - lifecycle: `invalidateQueue` drops all action keys;
  - concurrency: `-race` clean, exactly-capacity successes;
  - default-off short-circuit;
- validator: nil/empty canonicalisation, both-zero-or-both-positive,
capacity ≥ 10 batch floor, Default* exempt, capacity ≥ refill,
`parseThrottleFloat` range checks, `computeRetryAfter` floor.
- [x] `adapter/sqs_throttle_integration_test.go` — 8 end-to-end tests
against a real `createNode` cluster:
  - default-off allows unbounded;
- send/recv reject after capacity with correct envelope + `Retry-After`;
  - batch charges by entry count;
  - `SetQueueAttributes` invalidation (raise-and-immediate-success);
  - `DeleteQueue` + `CreateQueue` lifecycle invalidation;
  - `GetQueueAttributes` round-trip;
  - validator rejects below batch min.
- [x] `golangci-lint run` clean.
- [x] `go test -race -run
"TestBucketStore|TestValidateThrottleConfig|TestParseThrottleFloat|TestComputeRetryAfter|TestSQSServer_Throttle"
./adapter/...` clean.

## Self-review (5 lenses)

1. **Data loss** — Throttle config is on `sqsQueueMeta`, persisted via
the existing `setQueueAttributesWithRetry` OCC commit. Bucket state is
per-process and never written to storage, by design (§3.1: replicating
bucket state would cost a Raft commit per token decrement, defeating the
bucket). On leader failover the new leader rebuilds at full capacity —
same as the AWS region-failover semantic. No write path can lose
throttle config.
2. **Concurrency** — Per-bucket `sync.Mutex`, never held across
`sync.Map` ops. `LoadOrStore` race on first insert is safe (both racers
compute identical config from the same meta snapshot). Concurrent
`SetQueueAttributes` + in-flight charge: the charge may briefly use the
old bucket; the next request rebuilds from the fresh meta. `-race` test
pins the count invariant: exactly `capacity` successes out of N
concurrent goroutines, never more.
3. **Performance** — Hot path on unconfigured queues is one `nil` check
(`Throttle == nil`) → return. On configured queues: one `sync.Map.Load`
(lock-free), one mutex acquire on the bucket. No global lock.
`sync.Map`'s read-mostly optimisation matches the access pattern. The
lazy idle-evict sweep runs at most once per minute from the hot path so
a many-queue cluster does not pay an O(N) cost per request.
4. **Data consistency** — Throttling sits outside OCC; a rejected
request never touches the coordinator (§4.2). The OCC retry loop in
`sendMessageWithRetry` cannot busy-loop on a permanent rate-limit
failure. Cache invalidation after the Raft commit (not before) so the
rebuilt bucket reads the freshly committed config — there is no window
where a request can read a stale bucket built from the old config.
5. **Test coverage** — 18 unit tests cover bucket math, isolation,
lifecycle, concurrency, default-off; 8 integration tests cover the
end-to-end wire path. Both pass under `-race`. Validator has a dedicated
table-driven test per rule.

## Out of scope (deferred to follow-ups)

- **Prometheus metrics** (counter
`sqs_throttled_requests_total{queue,action}` + gauge
`sqs_throttle_tokens_remaining{queue,action}` per the design's §4.1
monitoring/registry.go entry). The bucket store's `chargeOutcome`
already exposes `tokensAfter` so the gauge wiring is one site; the
counter is a one-line increment after `writeSQSThrottlingError`. Punted
to keep this PR focused on data-plane behaviour.
- **§6.5 cross-protocol (Query) parity test** — the Query protocol layer
lives on PR #662 (separate branch); the throttle envelope already has
the same shape on both protocols (`writeSQSError` handles both), so the
test is a follow-up after #662 lands.
- **§6.6 failover behaviour test** — the §3.1 "fresh bucket on failover"
contract is implementation-correct (the bucket map is per-process, no
Raft state) but a 3-node failover test needs cluster scaffolding and is
best added with the Jepsen workload.

## Branch base

Branched off
[`docs/sqs-phase3-proposals`](https://github.com/bootjp/elastickv/tree/docs/sqs-phase3-proposals)
(PR #664) so the design doc is part of the reviewable surface. Will
rebase against `main` after #664 merges.
bootjp added a commit that referenced this pull request Apr 28, 2026
…FO) (#664)

## Summary

**Docs-only PR.** Two design proposals for the remaining Phase 3 SQS
items per
[`docs/design/2026_04_24_proposed_sqs_compatible_adapter.md`](https://github.com/bootjp/elastickv/blob/main/docs/design/2026_04_24_proposed_sqs_compatible_adapter.md)
Section 14 (Phase 3 bullets). Both items were explicitly called out as
needing separate design docs before any implementation work; this PR
lands those proposals so the implementation PRs have a reviewed
architecture to build on.

### 3.C — Per-queue throttling and tenant fairness
([proposal](docs/design/2026_04_26_proposed_sqs_per_queue_throttling.md))

Per-queue token-bucket throttling configured on queue meta (no separate
keyspace), evaluated at the SQS adapter layer on the leader (no Raft per
request), surfaced as the AWS `Throttling` error envelope so SDK
retry/backoff just works.

Key decisions:
- **Default-off**. Existing queues are unaffected; operators opt in per
queue via `SetQueueAttributes`.
- **Per-action buckets** (Send / Receive / Default) so a slow consumer
cannot pin the producer.
- **Per-leader buckets, no replication**. Worst case on failover: one
extra burst on the new leader. Acceptable per AWS-equivalent behaviour
at region failover boundaries; replicating would cost a Raft commit per
`SendMessage`.
- **Batch verbs charge by entry count**, not call count, with
all-or-nothing rejection (matches AWS).
- **Admin-only configuration plane**. Standard SQS clients see
`InvalidAttributeName` on the `Throttle*` attributes (matches AWS
behaviour for unknown attributes); the data-plane enforcement runs for
everyone.

### 3.D — Split-queue FIFO (high-throughput FIFO)
([proposal](docs/design/2026_04_26_proposed_sqs_split_queue_fifo.md))

Per-`MessageGroupId` hash partitioning across multiple Raft groups,
mirroring AWS High Throughput FIFO. Within-group ordering preserved;
across-group throughput scales with the partition count.

Key decisions:
- **Existing single-partition FIFO queues stay byte-identical**
(`PartitionCount = 0` path is the legacy layout; no migration runs
implicitly).
- **Power-of-two partition counts only** (1, 2, 4, 8, 16, 32) so the
routing step is `hash & (N-1)` and future offline rebuilds stay
tractable.
- **Partition count is immutable after first SendMessage**. Live
re-partitioning would break ordering for in-flight messages of every
group whose hash bucket changed; out of scope.
- **Multi-PR rollout plan** with an explicit "gate of no return" called
out at PR 5 (the data-plane PR). PRs 1–4 are reversible no-ops on data
layout; once a partitioned FIFO holds real data, rollback means draining
and recreating the queue.
- **FNV-1a hash** (deterministic across processes / Go versions /
architectures). Risk of attacker-controlled `MessageGroupId` pinning all
traffic to one partition is documented and accepted (the feature is for
cooperative operators).

## Test plan

- [x] Markdown renders correctly on GitHub (manually previewed).
- [x] Cross-references resolve (the partial-doc rename in PR #659 is in
flight; both proposals reference the partial filename — they will
resolve once #659 merges, otherwise resolve to the current `_proposed_`
filename for a 1-character mismatch that is fine to leave for now).
- [x] No code changes; CI is irrelevant beyond the markdown lint pass.

## Self-review

This is a docs-only PR; the 5-lens self-review collapses to:

1. **Data loss / Concurrency / Performance / Consistency** — N/A, no
code touched.
2. **Test coverage** — N/A, no code touched. The proposals themselves
include a Testing Strategy section (§6 / §9) so the implementation PRs
have explicit acceptance criteria.

## Stacking

This PR is **independent** of #650, #659, and #662. Branched from
current `main`. Merge whenever ready — landing the proposal docs early
lets reviewers comment on the architecture before the implementation PRs
go up.


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **Documentation**
* Added detailed designs for optional per-queue token-bucket throttling
(rules, batch semantics, AWS-shaped Throttling errors with Retry-After,
metrics, rollout default-off) and for HT‑FIFO “split-queue” partitioning
(routing, immutability, gating/rollout).
* **New Features**
* Optional queue-level send/receive throttling with batch-aware charging
and Retry-After; HT‑FIFO partitioning with deterministic partition
routing and immutable partition attributes.
* **Tests**
* Extensive unit and integration tests for throttling, HT‑FIFO
partitioning, attribute validation, immutability, idempotency, and cache
invalidation.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant