Skip to content

Commit 9e5ecd1

Browse files
committed
feat(sqs): htfifo capability poller (Phase 3.D PR 4-B-3a)
Adds a stateless helper that polls each peer's /sqs_health endpoint and reports whether all advertise the htfifo capability. This is the building block PR 5's CreateQueue gate uses to refuse PartitionCount > 1 until every node in the cluster has the HT-FIFO data plane. What changes adapter/sqs_capability_poller.go (new file): - HTFIFOCapabilityReport: AllAdvertise + per-peer detail. - HTFIFOCapabilityPeerStatus: address, HasHTFIFO flag, raw capabilities slice, Error string for failure detail. - PollSQSHTFIFOCapability(ctx, client, peers): polls each peer concurrently. Returns AllAdvertise=false on any timeout, HTTP error, malformed JSON, or missing-capability — fail-closed per §8.5. - Vacuously AllAdvertise=true on empty peer list (caller is responsible for ensuring the peer list is meaningful). - Per-peer timeout capped by defaultSQSCapabilityPollTimeout (3s) so a single hung peer cannot stall the whole poll. - Body capped at 1 KiB via io.LimitReader so a misconfigured peer cannot drain memory. - Bare host:port and full http://… / https://… URLs both accepted — operators can front the endpoint with TLS or a proxy without the helper having to know. What does NOT change yet - htfifoCapabilityAdvertised stays false. PR 4-B-3b adds the §8 leadership-refusal hook (startup + per-acquisition observer) and flips this flag. - CreateQueue does NOT yet call PollSQSHTFIFOCapability — PR 5 lifts the PartitionCount > 1 dormancy gate AND wires the capability check in the same commit. Tests adapter/sqs_capability_poller_test.go: 9 top-level tests across the contract surface. - AllAdvertise happy path with multiple peers. - One-bad-apple: a peer with empty capabilities drops AllAdvertise. - Transport failures (HTTP 500, connection refused, malformed JSON) all fail closed with non-empty Error. - Hung peer respects per-peer timeout — test bound is well below what a serial poll would take. - Empty peer list → AllAdvertise vacuously true. - Empty peer address → fail closed with explicit Error. - Full-URL peer (http:// or https://) accepted alongside bare host:port. - Concurrent polling: 5 peers each delaying 200ms must finish in well under 1 second (serial would take 1s+). - Body-size limit: a 10 KiB response truncated mid-string is surfaced as a JSON parse error, not a half-decoded value. - TestBuildSQSHealthURL covers the URL construction edge cases.
1 parent bce448f commit 9e5ecd1

2 files changed

Lines changed: 504 additions & 0 deletions

File tree

adapter/sqs_capability_poller.go

Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
package adapter
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"io"
7+
"net/http"
8+
"strings"
9+
"sync"
10+
"time"
11+
12+
"github.com/cockroachdb/errors"
13+
json "github.com/goccy/go-json"
14+
)
15+
16+
// HTFIFOCapabilityReport summarises the result of polling each peer's
17+
// /sqs_health endpoint for the htfifo capability. Used by the
18+
// CreateQueue capability gate (Phase 3.D PR 5) and by operator
19+
// tooling that needs to confirm a rolling upgrade has finished
20+
// before enabling partitioned FIFO queues.
21+
//
22+
// AllAdvertise is the binary go/no-go signal for the gate; Peers
23+
// carries per-node detail for log lines and operator triage.
24+
type HTFIFOCapabilityReport struct {
25+
// AllAdvertise is true iff every peer in the input list
26+
// returned a /sqs_health body whose `capabilities` array
27+
// contains the htfifo capability string. False on any timeout,
28+
// HTTP error, malformed body, or missing-capability — the
29+
// gate fails closed.
30+
//
31+
// Vacuously true on an empty peer list. The caller (CreateQueue
32+
// gate) is responsible for ensuring the peer list reflects the
33+
// current cluster membership before consulting this report.
34+
AllAdvertise bool
35+
36+
// Peers is the per-peer status, indexed in input order. Each
37+
// entry has either HasHTFIFO=true (peer advertised the
38+
// capability) or a non-empty Error explaining why the peer
39+
// did not pass. Capabilities is the raw list returned by the
40+
// peer when the body was parseable.
41+
Peers []HTFIFOCapabilityPeerStatus
42+
}
43+
44+
// HTFIFOCapabilityPeerStatus is one peer's polling result.
45+
type HTFIFOCapabilityPeerStatus struct {
46+
// Address is the peer's host:port as supplied to the poller.
47+
Address string
48+
49+
// HasHTFIFO is true iff the peer's /sqs_health JSON body's
50+
// capabilities array contained the htfifo capability string.
51+
HasHTFIFO bool
52+
53+
// Capabilities is the parsed capabilities array. Nil on any
54+
// failure before JSON parsing, or non-nil but missing
55+
// htfifo when the peer is on an older binary.
56+
Capabilities []string
57+
58+
// Error is empty on a clean success (HTTP 200 + parseable
59+
// JSON, regardless of whether HasHTFIFO is true) and non-empty
60+
// on any failure (transport error, non-200 status, malformed
61+
// JSON, or context cancellation).
62+
Error string
63+
}
64+
65+
// defaultSQSCapabilityPollTimeout caps how long the poller waits on
66+
// any single peer. The §8.5 design's "fail-closed default for
67+
// nodes that don't respond within a short timeout" turns into a
68+
// concrete bound here. Operators wanting a longer wait can pass
69+
// their own context with a deadline; the per-peer cap is enforced
70+
// in addition so a single slow peer cannot stall the whole poll.
71+
const defaultSQSCapabilityPollTimeout = 3 * time.Second
72+
73+
// PollSQSHTFIFOCapability polls each peer's /sqs_health endpoint
74+
// concurrently and reports whether all advertise htfifo. The
75+
// helper is stateless — every call dials its peers fresh, so a
76+
// transient network blip on one call does not poison subsequent
77+
// calls.
78+
//
79+
// Per-peer behaviour:
80+
// - GET http://<peer>/sqs_health with Accept: application/json
81+
// - Expect HTTP 200 and a parseable JSON body matching
82+
// {"status":"ok","capabilities":[...]}.
83+
// - HasHTFIFO is the membership of htfifo in capabilities.
84+
// - Any failure (transport error, non-200, malformed JSON,
85+
// timeout, context cancellation) records the reason in Error
86+
// and leaves HasHTFIFO=false. The poller never returns a
87+
// fatal error from PollSQSHTFIFOCapability itself; the report
88+
// carries every per-peer outcome instead.
89+
//
90+
// Concurrency: peers are polled in goroutines; results land via
91+
// an indexed channel so the slice writes are obviously race-free.
92+
//
93+
// Timeouts: each peer poll is bounded by min(ctx.Deadline(),
94+
// defaultSQSCapabilityPollTimeout). A long ctx deadline does not
95+
// extend the per-peer cap.
96+
func PollSQSHTFIFOCapability(ctx context.Context, client *http.Client, peers []string) *HTFIFOCapabilityReport {
97+
if client == nil {
98+
client = http.DefaultClient
99+
}
100+
report := &HTFIFOCapabilityReport{
101+
Peers: make([]HTFIFOCapabilityPeerStatus, len(peers)),
102+
}
103+
if len(peers) == 0 {
104+
// Vacuously: every-of-empty is true. Operator decides
105+
// whether their peer list is meaningful.
106+
report.AllAdvertise = true
107+
return report
108+
}
109+
110+
type indexedStatus struct {
111+
idx int
112+
status HTFIFOCapabilityPeerStatus
113+
}
114+
results := make(chan indexedStatus, len(peers))
115+
var wg sync.WaitGroup
116+
for i, peer := range peers {
117+
wg.Add(1)
118+
go func(idx int, addr string) {
119+
defer wg.Done()
120+
results <- indexedStatus{
121+
idx: idx,
122+
status: pollOneSQSPeerForHTFIFO(ctx, client, addr),
123+
}
124+
}(i, peer)
125+
}
126+
wg.Wait()
127+
close(results)
128+
129+
allAdvertise := true
130+
for r := range results {
131+
report.Peers[r.idx] = r.status
132+
if !r.status.HasHTFIFO {
133+
allAdvertise = false
134+
}
135+
}
136+
report.AllAdvertise = allAdvertise
137+
return report
138+
}
139+
140+
// pollOneSQSPeerForHTFIFO polls a single peer's /sqs_health and
141+
// returns its capability status. Any error is captured in the
142+
// returned struct's Error field — this function never returns a
143+
// Go error itself so the caller can map peers to results in one
144+
// pass without checking len(errors).
145+
func pollOneSQSPeerForHTFIFO(ctx context.Context, client *http.Client, peer string) HTFIFOCapabilityPeerStatus {
146+
status := HTFIFOCapabilityPeerStatus{Address: peer}
147+
148+
if peer == "" {
149+
status.Error = "empty peer address"
150+
return status
151+
}
152+
153+
pollCtx, cancel := context.WithTimeout(ctx, defaultSQSCapabilityPollTimeout)
154+
defer cancel()
155+
156+
url := buildSQSHealthURL(peer)
157+
req, err := http.NewRequestWithContext(pollCtx, http.MethodGet, url, http.NoBody)
158+
if err != nil {
159+
status.Error = errors.Wrapf(err, "build request for %q", peer).Error()
160+
return status
161+
}
162+
req.Header.Set("Accept", "application/json")
163+
164+
resp, err := client.Do(req)
165+
if err != nil {
166+
status.Error = errors.Wrapf(err, "GET %q", url).Error()
167+
return status
168+
}
169+
defer func() { _ = resp.Body.Close() }()
170+
171+
if resp.StatusCode != http.StatusOK {
172+
status.Error = fmt.Sprintf("%s returned HTTP %d", url, resp.StatusCode)
173+
return status
174+
}
175+
176+
body, err := io.ReadAll(io.LimitReader(resp.Body, sqsCapabilityMaxBodyBytes))
177+
if err != nil {
178+
status.Error = errors.Wrapf(err, "read body from %q", url).Error()
179+
return status
180+
}
181+
182+
var parsed sqsHealthBody
183+
if err := json.Unmarshal(body, &parsed); err != nil {
184+
status.Error = fmt.Sprintf("malformed JSON from %s: %v", url, err)
185+
return status
186+
}
187+
188+
status.Capabilities = parsed.Capabilities
189+
for _, c := range parsed.Capabilities {
190+
if c == sqsCapabilityHTFIFO {
191+
status.HasHTFIFO = true
192+
break
193+
}
194+
}
195+
return status
196+
}
197+
198+
// sqsCapabilityMaxBodyBytes caps how much of the /sqs_health
199+
// response we read before bailing. The current body shape is a
200+
// short JSON object; an unbounded read would let a misconfigured
201+
// peer return megabytes. 1 KiB is far above the realistic body
202+
// size and far below "expensive to read".
203+
const sqsCapabilityMaxBodyBytes = 1 << 10
204+
205+
// buildSQSHealthURL prefixes peer with the http:// scheme when the
206+
// caller passed a bare host:port (the common case for
207+
// --raftSqsMap entries). Callers that need https:// can pass the
208+
// fully-qualified URL.
209+
func buildSQSHealthURL(peer string) string {
210+
if strings.HasPrefix(peer, "http://") || strings.HasPrefix(peer, "https://") {
211+
return strings.TrimRight(peer, "/") + sqsHealthPath
212+
}
213+
return "http://" + peer + sqsHealthPath
214+
}

0 commit comments

Comments
 (0)