Skip to content

Commit 53c687d

Browse files
authored
feat(sqs): Phase 3.B — XML query-protocol support (3-verb proof) (#662)
## Summary Phase **3.B** of [`docs/design/2026_04_24_partial_sqs_compatible_adapter.md`](https://github.com/bootjp/elastickv/blob/main/docs/design/2026_04_24_partial_sqs_compatible_adapter.md) §16.4. Adds the AWS SQS **query protocol** (form-encoded request, XML response) alongside the existing JSON protocol on the same listener — older `aws-sdk-java` v1, `boto < 1.34`, and AWS CLI clients can now talk to elastickv without modification. Detection happens per-request via `Content-Type` / `X-Amz-Target` / `Action` presence; no flag, no separate port. See the new proposal doc committed alongside. This is an **architectural proof PR**: dispatch + decoding + encoding + error envelope + the `*Core` refactor pattern are all in place, with **three verbs** wired end-to-end as concrete proof. Each follow-up verb is a parser + response struct + one switch arm — no further design work needed. ### Verbs in this PR | Verb | Why it's in the proof set | |---|---| | `CreateQueue` | Exercises the `Attribute.N.{Name,Value}` indexed-collection parser. | | `ListQueues` | Exercises the repeated-element XML response shape. | | `GetQueueUrl` | Exercises the `<ErrorResponse>` envelope path via `QueueDoesNotExist`. | Every other verb returns a structured **501 `NotImplementedYet`** XML envelope so operators see the gap explicitly. `SendMessage` / `ReceiveMessage` / `DeleteMessage` are the highest-priority follow-ups (they need the same `*Core` refactor on the FIFO send loop). ### Key design points - **No new listener / no flag.** `pickSqsProtocol(*http.Request)` decides per request. JSON and Query share the SQS port and the SigV4 path. - **Wire-format-free cores.** `createQueue` / `listQueues` / `getQueueUrl` are now `decode → core → encode` with `core(ctx, in) (out, error)`. The JSON wrappers are unchanged in behavior; existing JSON tests pass without modification. - **DoS protection inherited.** Body read is bounded by the same `sqsMaxRequestBodyBytes` the JSON path uses. - **SigV4 unchanged.** The signed canonical request includes the form-encoded body, so the existing SigV4 middleware verifies query requests without code changes. - **Error parity.** `<Code>` reuses the existing `sqsErr*` constants. HTTP status mirrors what the JSON path returns, so SDK retry classifiers work across protocols. - **Cyclomatic budget honoured.** `handle()` was refactored to extract `handleQueryProtocol` — `cyclop ≤ 10` per project rules, no `//nolint`. ### Known limitation (design §11.4) `proxyToLeader`'s error writer always emits the JSON envelope, so a query-protocol client hitting a follower during a leader flip sees one JSON error before retry lands on the new leader. Follow-up PR threads the detected protocol onto the request context so the proxy emits matching XML. ## Test plan - [x] `go build ./...` — clean - [x] `go test -count=1 -race -run "TestSQS|QueryProtocol|TestPickSqs|TestCollectIndexedKV|TestWriteSQSQueryError" ./adapter/` — passes - [x] `golangci-lint run ./adapter/...` — `0 issues.` - [x] `pickSqsProtocol` table tests cover documented edge cases (header precedence, charset suffix, GET with Action, missing Action, nil request). - [x] `collectIndexedKVPairs` tests cover happy path, orphan Name, empty input, unrelated prefix. - [x] End-to-end via the in-process listener: CreateQueue / ListQueues / GetQueueUrl round-trip on the query side. - [x] **Cross-protocol parity**: a queue created via Query is visible via JSON `GetQueueUrl` with the same URL. - [x] Error envelope: 4xx maps to `<Type>Sender</Type>`, 5xx to `<Type>Receiver</Type>`, namespace pinned, `x-amzn-ErrorType` header set. - [x] Unknown verb returns 501 with the `NotImplementedYet` XML envelope. - [x] Missing `Action` parameter returns 400 (per design §3). ## Self-review (5 lenses) 1. **Data loss** — Wire-format change only. Cores are byte-for-byte identical to the previous handler bodies; no Raft / OCC / MVCC code is touched. 2. **Concurrency** — No new shared state. Detection is request-local. Body parsing is bounded. 3. **Performance** — One additional `Content-Type` string compare per request on the dispatch hot path. Negligible. 4. **Data consistency** — `*Core` returns the same business-logic outputs as before; the JSON tests are the regression net for parity. Cross-protocol parity test pins behaviour. 5. **Test coverage** — 10 new test cases cover detection, parsing, envelope shape, and three end-to-end verbs. Existing `TestSQS*` race suite passes on the refactor. ## Stacking This PR is **independent** — branched from current `main` (which has #638 + #649 merged). It does not depend on PR #650 / PR #659. Merge whenever ready.
2 parents f3cb756 + 459242a commit 53c687d

5 files changed

Lines changed: 1288 additions & 29 deletions

File tree

adapter/sqs.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,20 @@ func (s *SQSServer) handle(w http.ResponseWriter, r *http.Request) {
154154
return
155155
}
156156

157+
// pickSqsProtocol decides between the JSON path (X-Amz-Target +
158+
// JSON body, the existing default) and the query path (form-
159+
// encoded body, XML response) on a per-request basis. See
160+
// docs/design/2026_04_26_proposed_sqs_query_protocol.md for the
161+
// detection rules.
162+
if pickSqsProtocol(r) == sqsProtocolQuery {
163+
s.handleQueryProtocol(w, r)
164+
return
165+
}
166+
// JSON / Unknown both fall through to the JSON path: the JSON-
167+
// style 400 is the most informative error for a client that
168+
// has not picked a codec yet (§3 of the design doc). The
169+
// dispatch table below stays the single decision point.
170+
157171
if r.Method != http.MethodPost {
158172
w.Header().Set("Allow", http.MethodPost)
159173
writeSQSError(w, http.StatusMethodNotAllowed, sqsErrMalformedRequest, "SQS JSON protocol requires POST")
@@ -174,6 +188,27 @@ func (s *SQSServer) handle(w http.ResponseWriter, r *http.Request) {
174188
handler(w, r)
175189
}
176190

191+
// handleQueryProtocol owns the query-protocol leg of handle(): method
192+
// gating, SigV4 authorisation against the form body, and dispatch
193+
// into per-verb handlers. Pulled out of handle() so the dispatcher
194+
// stays under cyclop=10 even as more wire formats are added.
195+
func (s *SQSServer) handleQueryProtocol(w http.ResponseWriter, r *http.Request) {
196+
// GET is legal for query (some legacy ListQueues callers).
197+
// POST is the common case. Anything else (PUT/DELETE) is
198+
// outside the SQS surface entirely.
199+
if r.Method != http.MethodGet && r.Method != http.MethodPost {
200+
w.Header().Set("Allow", "GET, POST")
201+
writeSQSQueryError(w, newSQSAPIError(http.StatusMethodNotAllowed, sqsErrMalformedRequest,
202+
"SQS query protocol requires GET or POST"))
203+
return
204+
}
205+
if authErr := s.authorizeSQSRequest(r); authErr != nil {
206+
writeSQSQueryError(w, newSQSAPIError(authErr.Status, authErr.Code, authErr.Message))
207+
return
208+
}
209+
s.handleQuery(w, r)
210+
}
211+
177212
func (s *SQSServer) serveHealthz(w http.ResponseWriter, r *http.Request) bool {
178213
if r == nil || r.URL == nil {
179214
return false

adapter/sqs_catalog.go

Lines changed: 62 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -508,30 +508,41 @@ func (s *SQSServer) createQueue(w http.ResponseWriter, r *http.Request) {
508508
writeSQSErrorFromErr(w, err)
509509
return
510510
}
511-
if err := validateQueueName(in.QueueName); err != nil {
511+
queueName, err := s.createQueueCore(r.Context(), &in)
512+
if err != nil {
512513
writeSQSErrorFromErr(w, err)
513514
return
514515
}
516+
writeSQSJSON(w, map[string]string{"QueueUrl": s.queueURL(r, queueName)})
517+
}
518+
519+
// createQueueCore is the wire-format-free worker shared by the JSON
520+
// handler above and the query-protocol handler in
521+
// sqs_query_protocol.go (Phase 3.B). Returns the canonical queue
522+
// name on success so each wire wrapper can build its own QueueUrl
523+
// shape (the URL host comes from the request, which is a wire-layer
524+
// concern). Errors keep their typed sqsAPIError so both the JSON and
525+
// XML error envelopes reuse the existing classification path.
526+
func (s *SQSServer) createQueueCore(ctx context.Context, in *sqsCreateQueueInput) (string, error) {
527+
if err := validateQueueName(in.QueueName); err != nil {
528+
return "", err
529+
}
515530
requested, err := parseAttributesIntoMeta(in.QueueName, in.Attributes)
516531
if err != nil {
517-
writeSQSErrorFromErr(w, err)
518-
return
532+
return "", err
519533
}
520534
if len(in.Tags) > sqsMaxTagsPerQueue {
521535
// AWS caps tags per queue at 50. CreateQueue must reject
522536
// over-cap tag bundles up front; a silent slice-and-store
523537
// would let queues land with more tags than TagQueue would
524538
// ever accept on the same queue.
525-
writeSQSError(w, http.StatusBadRequest, sqsErrInvalidAttributeValue, "queue tag count exceeds 50")
526-
return
539+
return "", newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, "queue tag count exceeds 50")
527540
}
528541
requested.Tags = in.Tags
529-
530-
if err := s.createQueueWithRetry(r.Context(), requested); err != nil {
531-
writeSQSErrorFromErr(w, err)
532-
return
542+
if err := s.createQueueWithRetry(ctx, requested); err != nil {
543+
return "", err
533544
}
534-
writeSQSJSON(w, map[string]string{"QueueUrl": s.queueURL(r, in.QueueName)})
545+
return in.QueueName, nil
535546
}
536547

537548
func (s *SQSServer) createQueueWithRetry(ctx context.Context, requested *sqsQueueMeta) error {
@@ -684,33 +695,46 @@ func (s *SQSServer) listQueues(w http.ResponseWriter, r *http.Request) {
684695
writeSQSErrorFromErr(w, err)
685696
return
686697
}
687-
maxResults := clampListQueuesMaxResults(in.MaxResults)
688-
689-
names, err := s.scanQueueNames(r.Context())
698+
page, nextToken, err := s.listQueuesCore(r.Context(), &in)
690699
if err != nil {
691700
writeSQSErrorFromErr(w, err)
692701
return
693702
}
703+
urls := make([]string, 0, len(page))
704+
for _, n := range page {
705+
urls = append(urls, s.queueURL(r, n))
706+
}
707+
resp := map[string]any{"QueueUrls": urls}
708+
if nextToken != "" {
709+
resp["NextToken"] = nextToken
710+
}
711+
writeSQSJSON(w, resp)
712+
}
713+
714+
// listQueuesCore is the wire-format-free worker shared by the JSON
715+
// handler and the query-protocol handler. Returns the page of queue
716+
// *names* plus the next-page token (empty when not truncated); URL
717+
// construction is a wire-layer concern handled by each wrapper.
718+
func (s *SQSServer) listQueuesCore(ctx context.Context, in *sqsListQueuesInput) ([]string, string, error) {
719+
maxResults := clampListQueuesMaxResults(in.MaxResults)
720+
names, err := s.scanQueueNames(ctx)
721+
if err != nil {
722+
return nil, "", err
723+
}
694724
sort.Strings(names)
695725
filtered := filterByPrefix(names, in.QueueNamePrefix)
696726
start := resolveListQueuesStart(filtered, in.NextToken)
697-
698727
end := start + maxResults
699728
truncated := end < len(filtered)
700729
if !truncated {
701730
end = len(filtered)
702731
}
703732
page := filtered[start:end]
704-
705-
urls := make([]string, 0, len(page))
706-
for _, n := range page {
707-
urls = append(urls, s.queueURL(r, n))
708-
}
709-
resp := map[string]any{"QueueUrls": urls}
733+
var nextToken string
710734
if truncated && len(page) > 0 {
711-
resp["NextToken"] = encodeSQSSegment(page[len(page)-1])
735+
nextToken = encodeSQSSegment(page[len(page)-1])
712736
}
713-
writeSQSJSON(w, resp)
737+
return page, nextToken, nil
714738
}
715739

716740
func clampListQueuesMaxResults(requested int) int {
@@ -796,20 +820,29 @@ func (s *SQSServer) getQueueUrl(w http.ResponseWriter, r *http.Request) {
796820
writeSQSErrorFromErr(w, err)
797821
return
798822
}
799-
if err := validateQueueName(in.QueueName); err != nil {
823+
queueName, err := s.getQueueUrlCore(r.Context(), &in)
824+
if err != nil {
800825
writeSQSErrorFromErr(w, err)
801826
return
802827
}
803-
_, exists, err := s.loadQueueMetaAt(r.Context(), in.QueueName, s.nextTxnReadTS(r.Context()))
828+
writeSQSJSON(w, map[string]string{"QueueUrl": s.queueURL(r, queueName)})
829+
}
830+
831+
// getQueueUrlCore is the wire-format-free worker shared by the JSON
832+
// handler and the query-protocol handler. Returns the validated
833+
// queue name on success; URL construction is a wire-layer concern.
834+
func (s *SQSServer) getQueueUrlCore(ctx context.Context, in *sqsGetQueueUrlInput) (string, error) {
835+
if err := validateQueueName(in.QueueName); err != nil {
836+
return "", err
837+
}
838+
_, exists, err := s.loadQueueMetaAt(ctx, in.QueueName, s.nextTxnReadTS(ctx))
804839
if err != nil {
805-
writeSQSErrorFromErr(w, err)
806-
return
840+
return "", err
807841
}
808842
if !exists {
809-
writeSQSError(w, http.StatusBadRequest, sqsErrQueueDoesNotExist, "queue does not exist")
810-
return
843+
return "", newSQSAPIError(http.StatusBadRequest, sqsErrQueueDoesNotExist, "queue does not exist")
811844
}
812-
writeSQSJSON(w, map[string]string{"QueueUrl": s.queueURL(r, in.QueueName)})
845+
return in.QueueName, nil
813846
}
814847

815848
func (s *SQSServer) getQueueAttributes(w http.ResponseWriter, r *http.Request) {

0 commit comments

Comments
 (0)