Skip to content

Commit 166e079

Browse files
committed
sqs(catalog,capability): idempotent gate placement + sanitized public message (PR #734, round 1)
Two PR #734 review findings addressed in one commit because they touch the same code path: 1. Codex P1 — Check existing queue before enforcing HTFIFO peer gate The previous placement of validateHTFIFOCapability inside createQueueCore (before createQueueWithRetry) ran the gate on EVERY CreateQueue call with PartitionCount > 1, including idempotent retries on an already-existing queue with identical attributes. A transient peer poll failure (timeout / unreachable / malformed health) during such a retry would then return InvalidAttributeValue instead of the AWS-correct 200 OK, breaking create-or-get clients during partial outages or rolling upgrades. Fix: move the gate INTO tryCreateQueueOnce after the existence check and BEFORE the OCC dispatch. The order in tryCreateQueueOnce is now: 1. loadQueueMetaAt — check if queue exists at the snapshot 2. exists + matching attrs → return (true, nil) idempotent OK 3. exists + different attrs → return QueueNameExists 4. validateHTFIFOCapability — runs ONLY on the genuine create path 5. loadQueueGenerationAt + dispatch the create The gate may run more than once across OCC retries (each retry that gets to "queue still missing" re-polls), but every retry that hits an existing-queue match short-circuits before the gate runs — so idempotent CreateQueue under a partial cluster outage stays AWS-correct. Caller audit: validateHTFIFOCapability has exactly one production caller (now tryCreateQueueOnce); both the JSON handler (createQueue → createQueueCore → createQueueWithRetry → tryCreateQueueOnce) and the query-protocol handler (sqs_query_protocol.go: 182 → createQueueCore → …) reach it through that one path. Move is symmetric — no caller observes a semantic change for a queue that genuinely needs to be created; only the existing-queue path stops paying the gate cost. 2. CodeRabbit major — Don't send raw peer poll details back to caller buildHTFIFOCapabilityRejection's output (peer addresses + raw poller error text) was returned to the wire layer as the InvalidAttributeValue message, leaking cluster topology to any authenticated CreateQueue caller. This conflicts with the stricter error-redaction policy already used elsewhere in sqs_catalog.go. Fix: the wire-level rejection is now the sanitized constant htfifoCapabilityRejectionPublic ("PartitionCount > 1 requires every cluster peer to advertise the htfifo capability via /sqs_health; one or more peers did not — see server logs for details"). The full per-peer detail goes to slog.Warn with structured fields (queueName, partitionCount, peerCount, detail) so an operator triaging the rolling upgrade can read the failing peer addresses + reasons from the server logs without rerunning the poll out-of-band. Renamed buildHTFIFOCapabilityRejection → formatHTFIFOCapabilityReportForLog to make its server-side-only contract obvious at the call site. Test changes: - New TestSQSServer_HTFIFO_CapabilityGate_IsIdempotentOnExistingQueue (wire-level): creates a partitioned queue on a single-node cluster (gate vacuously passes), poisons leaderSQS with an unreachable address, then re-creates the same queue with identical attrs and expects 200; finally creates a NEW queue with the poisoned peer map and expects the 400 (proves the gate is still in effect for genuine creates). - New TestValidateHTFIFOCapability_PublicMessageDoesNotLeakPeerDetails: pins the sanitization contract — the wire-level message must equal htfifoCapabilityRejectionPublic exactly, never contain a peer host:port. - Updated TestValidateHTFIFOCapability_RejectsWhenOnePeerLacksCapability and TestValidateHTFIFOCapability_RejectsWhenPeerUnreachable to assert the sanitized constant + NotContains on the peer address. - Renamed TestBuildHTFIFOCapabilityRejection_ShapesOperatorMessage → TestFormatHTFIFOCapabilityReportForLog_ShapesServerSideDetail to match the renamed helper; assertion that the helper output is server-side-only (no client wire surface assertion here). Below threshold (intentionally not addressed in this round): - Gemini medium on collectSQSPeers concurrency: leaderSQS is only mutated at SQSServer construction (WithSQSLeaderMap), not at request time. Gemini's own comment acknowledges this. - Gemini medium on caching the capability status: CreateQueue is a rare control-plane operation; caching adds a stale-window failure mode (a cluster that already lost a peer would still accept a partitioned queue while the cache is warm). Pure performance suggestion, not correctness.
1 parent df11d2b commit 166e079

4 files changed

Lines changed: 180 additions & 43 deletions

File tree

adapter/sqs_capability_gate.go

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,21 @@ package adapter
22

33
import (
44
"context"
5+
"log/slog"
56
"net/http"
67
"sort"
78
"strings"
89
)
910

11+
// htfifoCapabilityRejectionPublic is the sanitized client-facing
12+
// reason returned from validateHTFIFOCapability when the cluster
13+
// poll fails. Per CodeRabbit major review: peer addresses and raw
14+
// poller error text MUST NOT leak to authenticated clients (the
15+
// CreateQueue surface is part of the public AWS-shaped API), so
16+
// the wire-level message is intentionally generic and the per-peer
17+
// detail goes to slog.Warn for operator triage.
18+
const htfifoCapabilityRejectionPublic = "PartitionCount > 1 requires every cluster peer to advertise the htfifo capability via /sqs_health; one or more peers did not — see server logs for details"
19+
1020
// validateHTFIFOCapability is the §11 PR 5b-3 gate that replaced the
1121
// PR 2 dormancy reject. CreateQueue calls this on every request; it
1222
// is a no-op for legacy / single-partition meta and the full
@@ -53,8 +63,18 @@ func (s *SQSServer) validateHTFIFOCapability(ctx context.Context, requested *sqs
5363
}
5464
report := PollSQSHTFIFOCapability(ctx, peers, PollerConfig{})
5565
if report == nil || !report.AllAdvertise {
66+
// Log the full per-peer detail for operator triage. The
67+
// client-visible message stays generic (no peer addresses,
68+
// no raw poller error text) so the CreateQueue surface
69+
// does not leak cluster topology to authenticated callers
70+
// — CodeRabbit major review on PR #734.
71+
slog.Warn("sqs: htfifo capability gate rejected partitioned CreateQueue",
72+
"queueName", requested.Name,
73+
"partitionCount", requested.PartitionCount,
74+
"peerCount", len(peers),
75+
"detail", formatHTFIFOCapabilityReportForLog(report))
5676
return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue,
57-
buildHTFIFOCapabilityRejection(report))
77+
htfifoCapabilityRejectionPublic)
5878
}
5979
return nil
6080
}
@@ -86,15 +106,17 @@ func (s *SQSServer) collectSQSPeers() []string {
86106
return peers
87107
}
88108

89-
// buildHTFIFOCapabilityRejection composes the operator-facing message
90-
// for a failed capability poll. Lists the peers that did not
91-
// advertise htfifo (with the per-peer Error or "missing capability"
92-
// reason) so the operator can fix the rolling-upgrade lag without
93-
// rerunning the poll out-of-band. Order matches report.Peers, which
94-
// matches collectSQSPeers' sorted input order — deterministic.
95-
func buildHTFIFOCapabilityRejection(report *HTFIFOCapabilityReport) string {
109+
// formatHTFIFOCapabilityReportForLog composes the per-peer detail
110+
// surfaced to slog.Warn when the gate rejects. Lists each failing
111+
// peer's address and reason (per-peer Error or "missing capability")
112+
// so an operator triaging a partial-rolling-upgrade cluster can fix
113+
// the lag from the server logs without rerunning the poll
114+
// out-of-band. NEVER returned to the client — that path uses the
115+
// sanitized htfifoCapabilityRejectionPublic constant. Order matches
116+
// report.Peers, which matches collectSQSPeers' sorted input order —
117+
// deterministic so log lines diff cleanly across runs.
118+
func formatHTFIFOCapabilityReportForLog(report *HTFIFOCapabilityReport) string {
96119
var b strings.Builder
97-
b.WriteString("PartitionCount > 1 requires every cluster peer to advertise the htfifo capability via /sqs_health; the following peers did not: ")
98120
if report == nil {
99121
b.WriteString("(no report)")
100122
return b.String()
@@ -120,7 +142,7 @@ func buildHTFIFOCapabilityRejection(report *HTFIFOCapabilityReport) string {
120142
if first {
121143
// Defensive: AllAdvertise was false but no peer surfaced a
122144
// reason. Should never happen, but emit a non-empty hint
123-
// rather than a truncated message ending in a colon.
145+
// rather than a truncated empty string.
124146
b.WriteString("(unknown peer)")
125147
}
126148
return b.String()

adapter/sqs_capability_gate_test.go

Lines changed: 51 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -112,10 +112,10 @@ func TestValidateHTFIFOCapability_RejectsWhenOnePeerLacksCapability(t *testing.T
112112
require.True(t, ok, "must surface as sqsAPIError so the wire layer maps to InvalidAttributeValue, got %T", err)
113113
require.Equal(t, http.StatusBadRequest, apiErr.status)
114114
require.Equal(t, sqsErrInvalidAttributeValue, apiErr.errorType)
115-
require.Contains(t, apiErr.message, "every cluster peer to advertise the htfifo capability",
116-
"message must explain the gate so the operator knows what to fix")
117-
require.Contains(t, apiErr.message, oldAddr,
118-
"the offending peer must appear in the message, got %q", apiErr.message)
115+
require.Equal(t, htfifoCapabilityRejectionPublic, apiErr.message,
116+
"client message must be the sanitized constant — peer addresses live in server logs (CodeRabbit major)")
117+
require.NotContains(t, apiErr.message, oldAddr,
118+
"peer host:port MUST NOT leak through the wire-level rejection")
119119
}
120120

121121
// TestValidateHTFIFOCapability_RejectsWhenPeerUnreachable pins
@@ -145,8 +145,10 @@ func TestValidateHTFIFOCapability_RejectsWhenPeerUnreachable(t *testing.T) {
145145
require.True(t, ok)
146146
require.Equal(t, http.StatusBadRequest, apiErr.status)
147147
require.Equal(t, sqsErrInvalidAttributeValue, apiErr.errorType)
148-
require.Contains(t, apiErr.message, deadAddr,
149-
"unreachable peer must be named in the rejection message")
148+
require.Equal(t, htfifoCapabilityRejectionPublic, apiErr.message,
149+
"client message must be the sanitized constant — transport error text lives in server logs (CodeRabbit major)")
150+
require.NotContains(t, apiErr.message, deadAddr,
151+
"peer host:port MUST NOT leak through the wire-level rejection")
150152
}
151153

152154
// TestCollectSQSPeers_Deterministic pins the helper's order +
@@ -174,11 +176,15 @@ func TestCollectSQSPeers_Deterministic(t *testing.T) {
174176
require.Empty(t, (&SQSServer{}).collectSQSPeers())
175177
}
176178

177-
// TestBuildHTFIFOCapabilityRejection_ShapesOperatorMessage pins the
178-
// rejection-message shape so a future refactor cannot accidentally
179-
// truncate the per-peer detail. Each failing peer must contribute
180-
// a "(reason)" suffix; peers that pass do not appear at all.
181-
func TestBuildHTFIFOCapabilityRejection_ShapesOperatorMessage(t *testing.T) {
179+
// TestFormatHTFIFOCapabilityReportForLog_ShapesServerSideDetail
180+
// pins the SERVER-SIDE log helper's shape — never returned to the
181+
// client (CodeRabbit major review on PR #734: peer addresses + raw
182+
// poller errors leak cluster topology to authenticated callers).
183+
// Each failing peer must contribute a "(reason)" suffix so an
184+
// operator triaging a rolling-upgrade cluster can fix the lag from
185+
// the log lines without rerunning the poll out-of-band; peers that
186+
// pass do not appear.
187+
func TestFormatHTFIFOCapabilityReportForLog_ShapesServerSideDetail(t *testing.T) {
182188
t.Parallel()
183189

184190
report := &HTFIFOCapabilityReport{
@@ -189,15 +195,41 @@ func TestBuildHTFIFOCapabilityRejection_ShapesOperatorMessage(t *testing.T) {
189195
},
190196
}
191197

192-
msg := buildHTFIFOCapabilityRejection(report)
193-
require.Contains(t, msg, "every cluster peer to advertise the htfifo capability")
194-
require.NotContains(t, msg, "ok:9000", "advertising peers must NOT appear in the rejection")
195-
require.Contains(t, msg, "old:9000 (missing capability)")
196-
require.Contains(t, msg, "down:9000 (dial tcp: refused)")
198+
detail := formatHTFIFOCapabilityReportForLog(report)
199+
require.NotContains(t, detail, "ok:9000", "advertising peers must NOT appear in the log detail")
200+
require.Contains(t, detail, "old:9000 (missing capability)")
201+
require.Contains(t, detail, "down:9000 (dial tcp: refused)")
197202

198203
// Defensive: nil report and "all-passing-but-AllAdvertise-false" path.
199-
require.Contains(t, buildHTFIFOCapabilityRejection(nil), "no report")
204+
require.Contains(t, formatHTFIFOCapabilityReportForLog(nil), "no report")
200205
allPass := &HTFIFOCapabilityReport{Peers: []HTFIFOCapabilityPeerStatus{{Address: "x", HasHTFIFO: true}}}
201-
require.Contains(t, buildHTFIFOCapabilityRejection(allPass), "unknown peer",
202-
"never emit a truncated 'did not: ' tail when no peer details surface")
206+
require.Contains(t, formatHTFIFOCapabilityReportForLog(allPass), "unknown peer",
207+
"never emit an empty detail when no peer reasons surface")
208+
}
209+
210+
// TestValidateHTFIFOCapability_PublicMessageDoesNotLeakPeerDetails
211+
// pins the CodeRabbit major review's redaction contract: the
212+
// client-visible InvalidAttributeValue message MUST NOT include
213+
// peer addresses or raw poller error text. The two failure-path
214+
// tests above check that the gate REJECTS; this test specifically
215+
// checks that the rejection message is the sanitized
216+
// htfifoCapabilityRejectionPublic constant — no host:port, no raw
217+
// transport error.
218+
func TestValidateHTFIFOCapability_PublicMessageDoesNotLeakPeerDetails(t *testing.T) {
219+
t.Parallel()
220+
221+
old := htfifoCapabilityServer(t, []string{}) // no htfifo
222+
oldAddr := strings.TrimPrefix(old.URL, "http://")
223+
s := &SQSServer{leaderSQS: map[string]string{"raft1": oldAddr}}
224+
225+
err := s.validateHTFIFOCapability(context.Background(), &sqsQueueMeta{Name: "q.fifo", PartitionCount: 4})
226+
require.Error(t, err)
227+
var apiErr *sqsAPIError
228+
require.True(t, errors.As(err, &apiErr))
229+
require.Equal(t, htfifoCapabilityRejectionPublic, apiErr.message,
230+
"client message must be the sanitized constant, never the per-peer detail")
231+
require.NotContains(t, apiErr.message, oldAddr,
232+
"peer host:port MUST NOT appear in the wire-level rejection — operator detail is server-side only")
233+
require.Contains(t, apiErr.message, "see server logs for details",
234+
"public message must point operators at the server-side detail")
203235
}

adapter/sqs_catalog.go

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -481,9 +481,10 @@ var sqsAttributeAppliers = map[string]attributeApplier{
481481
// docs/design/2026_04_26_proposed_sqs_split_queue_fifo.md). Set
482482
// at CreateQueue time; SetQueueAttributes attempts to change it
483483
// reject via the immutability check in trySetQueueAttributesOnce.
484-
// PR 2 of the rollout introduces the field but the temporary
485-
// dormancy gate in tryCreateQueueOnce rejects PartitionCount > 1
486-
// until PR 5 lifts the gate atomically with the data plane.
484+
// PartitionCount > 1 is gated by validateHTFIFOCapability (the
485+
// cluster-wide htfifo capability poll, PR 5b-3) which runs
486+
// inside tryCreateQueueOnce after the existence check so
487+
// idempotent retries do not pay the network cost.
487488
"PartitionCount": func(m *sqsQueueMeta, v string) error {
488489
// Parse at uint64 width and bound-check explicitly so the
489490
// uint32 narrowing below is gosec-clean.
@@ -903,17 +904,12 @@ func (s *SQSServer) createQueueCore(ctx context.Context, in *sqsCreateQueueInput
903904
if err != nil {
904905
return "", err
905906
}
906-
// Cluster-wide htfifo capability gate (Phase 3.D §11 PR 5b-3,
907-
// replaces the PR 2 dormancy reject). PartitionCount > 1 is
908-
// rejected unless this binary AND every peer in s.leaderSQS
909-
// advertise the htfifo capability via /sqs_health. The gate
910-
// fails closed on any peer timeout, HTTP error, malformed body,
911-
// or missing capability so a partitioned queue cannot land in a
912-
// partially-upgraded cluster where some peer would silently
913-
// store its records under the legacy single-partition keyspace.
914-
if err := s.validateHTFIFOCapability(ctx, requested); err != nil {
915-
return "", err
916-
}
907+
// The cluster-wide htfifo capability gate (Phase 3.D §11 PR 5b-3)
908+
// runs INSIDE tryCreateQueueOnce after the existence check, not
909+
// here. Idempotent CreateQueue retries on an already-existing
910+
// partitioned queue must NOT pay the network poll cost or risk a
911+
// transient peer-poll failure flipping a 200-OK retry into a 400
912+
// (Codex P1 review on PR #734).
917913
if len(in.Tags) > sqsMaxTagsPerQueue {
918914
// AWS caps tags per queue at 50. CreateQueue must reject
919915
// over-cap tag bundles up front; a silent slice-and-store
@@ -963,6 +959,23 @@ func (s *SQSServer) tryCreateQueueOnce(ctx context.Context, requested *sqsQueueM
963959
}
964960
return false, newSQSAPIError(http.StatusBadRequest, sqsErrQueueNameExists, "queue already exists with different attributes")
965961
}
962+
// Cluster-wide htfifo capability gate (Phase 3.D §11 PR 5b-3,
963+
// replaces the PR 2 dormancy reject). PartitionCount > 1 is
964+
// rejected unless this binary AND every peer in s.leaderSQS
965+
// advertise the htfifo capability via /sqs_health. Fails closed
966+
// on any peer timeout, HTTP error, malformed body, or missing
967+
// capability so a partitioned queue cannot land in a partially-
968+
// upgraded cluster.
969+
//
970+
// Runs AFTER the existence check (so idempotent CreateQueue
971+
// retries on an already-existing partitioned queue do not pay
972+
// the network poll cost or risk a transient peer-poll failure
973+
// flipping a 200-OK retry into a 400 — Codex P1 review on PR
974+
// #734) and BEFORE the dispatch (so a rejected create does not
975+
// burn an OCC commit).
976+
if err := s.validateHTFIFOCapability(ctx, requested); err != nil {
977+
return false, err
978+
}
966979
lastGen, err := s.loadQueueGenerationAt(ctx, requested.Name, readTS)
967980
if err != nil {
968981
return false, errors.WithStack(err)

adapter/sqs_partitioning_integration_test.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"net/http"
55
"strings"
66
"testing"
7+
8+
"github.com/stretchr/testify/require"
79
)
810

911
// TestSQSServer_HTFIFO_CapabilityGate_AcceptsOnSingleNode pins the
@@ -35,6 +37,74 @@ func TestSQSServer_HTFIFO_CapabilityGate_AcceptsOnSingleNode(t *testing.T) {
3537
}
3638
}
3739

40+
// TestSQSServer_HTFIFO_CapabilityGate_IsIdempotentOnExistingQueue
41+
// pins the Codex P1 review fix on PR #734: a CreateQueue retry on
42+
// an already-existing partitioned queue with identical attributes
43+
// MUST return 200 (idempotent) even when the cluster-wide
44+
// capability poll would now fail. Before the fix, the gate ran
45+
// before the existence check, so a transient peer outage during
46+
// a CreateQueue retry would flip a 200-OK idempotent response
47+
// into a 400. Now the gate runs INSIDE tryCreateQueueOnce after
48+
// the existence check; an existing queue with matching attrs
49+
// short-circuits to 200 and never touches the network.
50+
//
51+
// The test creates a partitioned queue on a single-node cluster
52+
// (gate passes vacuously), then poisons the SQSServer's peer
53+
// map with an unreachable address so any subsequent gate
54+
// invocation would reject. The second CreateQueue with identical
55+
// attrs must still succeed; only an actually-new partitioned
56+
// queue would now fail the gate.
57+
func TestSQSServer_HTFIFO_CapabilityGate_IsIdempotentOnExistingQueue(t *testing.T) {
58+
t.Parallel()
59+
nodes, _, _ := createNode(t, 1)
60+
defer shutdown(nodes)
61+
node := sqsLeaderNode(t, nodes)
62+
63+
attrs := map[string]string{
64+
"FifoQueue": "true",
65+
"PartitionCount": "4",
66+
}
67+
68+
// First create — succeeds because the leaderSQS map is empty
69+
// and the local binary advertises htfifo (vacuous gate).
70+
status, out := callSQS(t, node, sqsCreateQueueTarget, map[string]any{
71+
"QueueName": "htfifo-idempotent.fifo",
72+
"Attributes": attrs,
73+
})
74+
require.Equal(t, http.StatusOK, status,
75+
"first create on single-node cluster must succeed: body=%v", out)
76+
77+
// Poison the peer map with an unreachable address. Any gate
78+
// invocation from this point would call PollSQSHTFIFOCapability
79+
// against this dead peer and fail closed → 400. The
80+
// post-fix code path skips the gate on an existing-queue
81+
// match, so the next create must STILL succeed.
82+
require.NotNil(t, node.sqsServer)
83+
node.sqsServer.leaderSQS = map[string]string{
84+
"raft-fake": "127.0.0.1:1", // unreachable: connection refused
85+
}
86+
87+
status, out = callSQS(t, node, sqsCreateQueueTarget, map[string]any{
88+
"QueueName": "htfifo-idempotent.fifo",
89+
"Attributes": attrs,
90+
})
91+
require.Equal(t, http.StatusOK, status,
92+
"second create with identical attrs must be idempotent (gate must NOT run on existing-queue match) — Codex P1 PR #734; body=%v", out)
93+
94+
// Sanity: the gate IS still in effect for genuinely new
95+
// partitioned queues — a different name with the poisoned
96+
// peer map must fail the gate.
97+
status, out = callSQS(t, node, sqsCreateQueueTarget, map[string]any{
98+
"QueueName": "htfifo-newqueue.fifo",
99+
"Attributes": attrs,
100+
})
101+
require.Equal(t, http.StatusBadRequest, status,
102+
"new partitioned queue must hit the gate when a peer is unreachable: body=%v", out)
103+
if got, _ := out["__type"].(string); got != sqsErrInvalidAttributeValue {
104+
t.Fatalf("new queue: __type=%q (expected InvalidAttributeValue)", got)
105+
}
106+
}
107+
38108
// TestSQSServer_HTFIFO_CapabilityGate_AllowsPartitionCountOne pins
39109
// the no-op-partition-count path: PartitionCount=1 is the legacy
40110
// single-partition layout and bypasses the capability gate

0 commit comments

Comments
 (0)