Skip to content

Commit e9f33eb

Browse files
authored
feat(sqs): htfifo capability poller (Phase 3.D PR 4-B-3a) (#721)
## Summary Phase 3.D PR 4-B-3a — adds the stateless `htfifo` capability poller that PR 5's CreateQueue gate consumes. Stacks on the now-merged #715 (PR 4-B-2, partition resolver). This PR is purely additive: new helper file, new test file, no existing code touched. Next is PR 4-B-3b (leadership-refusal hook + flag flip). ## What's added - `adapter/sqs_capability_poller.go`: - `HTFIFOCapabilityReport{AllAdvertise, Peers}` — binary go/no-go signal + per-peer detail for operator triage. - `HTFIFOCapabilityPeerStatus{Address, HasHTFIFO, Capabilities, Error}` — one peer's polling result. - `PollSQSHTFIFOCapability(ctx, client, peers)` — concurrent goroutine-per-peer poll, indexed-channel result aggregation (race-free). - Per-peer timeout `defaultSQSCapabilityPollTimeout = 3s` so a single hung peer can't stall the cluster-wide poll. - Body capped at 1 KiB via `io.LimitReader` so a misconfigured peer can't drain memory. - Bare `host:port` and full `http://…` / `https://…` URLs both accepted. - Fail-closed on every failure mode: timeout, transport error, non-200, malformed JSON, missing capability. Empty peer list → vacuously `AllAdvertise=true` (caller validates list completeness). ## What's NOT added (deferred) - `htfifoCapabilityAdvertised` stays `false`. PR 4-B-3b adds the §8 leadership-refusal hook + per-acquisition observer in `kv/raftengine/etcd` and flips the flag. - `CreateQueue` does NOT yet call this helper. PR 5 lifts the `PartitionCount > 1` dormancy gate AND wires the capability check in the same commit (per the §11 rollout plan's "gate-and-lift atomically" rule). ## Test plan 9 top-level tests covering the contract surface: - [x] `TestPollSQSHTFIFOCapability_AllAdvertise` — happy path, multiple peers. - [x] `TestPollSQSHTFIFOCapability_OneMissingFailsClosed` — old-binary peer with empty capabilities drops `AllAdvertise`. - [x] `TestPollSQSHTFIFOCapability_HTTPErrorFailsClosed` — HTTP 500, connection refused, malformed JSON all surface as `Error`. - [x] `TestPollSQSHTFIFOCapability_TimeoutFailsClosed` — hung peer respects per-peer timeout, full poll bounded. - [x] `TestPollSQSHTFIFOCapability_EmptyPeersIsVacuouslyTrue` — empty peer list contract. - [x] `TestPollSQSHTFIFOCapability_EmptyPeerAddressFailsClosed` — `""` entry in peers slice surfaces explicit Error. - [x] `TestPollSQSHTFIFOCapability_FullURLPeer` — `http://` and `https://` URLs accepted alongside bare `host:port`. - [x] `TestPollSQSHTFIFOCapability_ConcurrentPolling` — 5×200ms peers finish in well under 1s. - [x] `TestPollSQSHTFIFOCapability_RespectsBodyLimit` — 10 KiB response truncated mid-string surfaces as JSON parse error, not garbage decode. - [x] `TestBuildSQSHealthURL` — URL construction edge cases. - [x] `go test -race ./adapter/...` pass. - [x] `golangci-lint ./adapter/...` clean. ## Self-review (per CLAUDE.md) 1. **Data loss** — read-only HTTP poll; no FSM/Pebble/retention path. No issue. 2. **Concurrency / distributed failures** — peer polls run in independent goroutines; results land via an indexed channel so slice writes are obviously race-free. Per-peer timeout enforced via `context.WithTimeout` so a slow peer can't stall the rest. Body capped via `io.LimitReader`. No issue. 3. **Performance** — N peers polled concurrently, not serially; the test pins this. Per-peer cost is one HTTP round-trip + a JSON parse of a tiny body. No hot-path impact (CreateQueue is a control-plane operation, not request hot path). No issue. 4. **Data consistency** — fail-closed on every failure mode preserves the §8.5 "any peer that doesn't respond is treated as not-yet-upgraded" rule. The vacuously-true empty-peer-list case is documented and the caller's responsibility. No issue. 5. **Test coverage** — every documented failure path (HTTP error, transport error, JSON parse, timeout, missing capability, empty peer, body-size cap) is pinned. Concurrent polling is pinned (would have caught a regression to serial). URL construction edges pinned.
2 parents bce448f + 2d9e06d commit e9f33eb

2 files changed

Lines changed: 590 additions & 0 deletions

File tree

adapter/sqs_capability_poller.go

Lines changed: 258 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,258 @@
1+
package adapter
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"io"
7+
"log/slog"
8+
"net/http"
9+
"strings"
10+
"sync"
11+
"time"
12+
13+
"github.com/cockroachdb/errors"
14+
json "github.com/goccy/go-json"
15+
)
16+
17+
// HTFIFOCapabilityReport summarises the result of polling each peer's
18+
// /sqs_health endpoint for the htfifo capability. Used by the
19+
// CreateQueue capability gate (Phase 3.D PR 5) and by operator
20+
// tooling that needs to confirm a rolling upgrade has finished
21+
// before enabling partitioned FIFO queues.
22+
//
23+
// AllAdvertise is the binary go/no-go signal for the gate; Peers
24+
// carries per-node detail for log lines and operator triage.
25+
type HTFIFOCapabilityReport struct {
26+
// AllAdvertise is true iff every peer in the input list
27+
// returned a /sqs_health body whose `capabilities` array
28+
// contains the htfifo capability string. False on any timeout,
29+
// HTTP error, malformed body, or missing-capability — the
30+
// gate fails closed.
31+
//
32+
// Vacuously true on an empty peer list. The caller (CreateQueue
33+
// gate) is responsible for ensuring the peer list reflects the
34+
// current cluster membership before consulting this report.
35+
AllAdvertise bool
36+
37+
// Peers is the per-peer status, indexed in input order. Each
38+
// entry has either HasHTFIFO=true (peer advertised the
39+
// capability) or a non-empty Error explaining why the peer
40+
// did not pass. Capabilities is the raw list returned by the
41+
// peer when the body was parseable.
42+
Peers []HTFIFOCapabilityPeerStatus
43+
}
44+
45+
// HTFIFOCapabilityPeerStatus is one peer's polling result.
46+
type HTFIFOCapabilityPeerStatus struct {
47+
// Address is the peer's host:port as supplied to the poller.
48+
Address string
49+
50+
// HasHTFIFO is true iff the peer's /sqs_health JSON body's
51+
// capabilities array contained the htfifo capability string.
52+
HasHTFIFO bool
53+
54+
// Capabilities is the parsed capabilities array. Nil on any
55+
// failure before JSON parsing, or non-nil but missing
56+
// htfifo when the peer is on an older binary.
57+
Capabilities []string
58+
59+
// Error is empty on a clean success (HTTP 200 + parseable
60+
// JSON, regardless of whether HasHTFIFO is true) and non-empty
61+
// on any failure (transport error, non-200 status, malformed
62+
// JSON, or context cancellation).
63+
Error string
64+
}
65+
66+
// defaultSQSCapabilityPollTimeout caps how long the poller waits on
67+
// any single peer when PollerConfig.PerPeerTimeout is zero. The
68+
// §8.5 design's "fail-closed default for nodes that don't respond
69+
// within a short timeout" turns into a concrete bound here.
70+
// Operators wanting a longer or shorter wait can override via
71+
// PollerConfig; the cap is enforced in addition to any
72+
// caller-supplied context deadline so a single slow peer cannot
73+
// stall the whole poll.
74+
const defaultSQSCapabilityPollTimeout = 3 * time.Second
75+
76+
// PollerConfig tunes PollSQSHTFIFOCapability for a specific call
77+
// site. All fields are optional — the zero value picks safe
78+
// defaults. Tests use the explicit PerPeerTimeout to exercise the
79+
// per-peer cap independently of any caller-supplied context
80+
// deadline.
81+
type PollerConfig struct {
82+
// HTTPClient is the client used for /sqs_health GETs. Nil
83+
// falls back to http.DefaultClient. Callers wanting connection
84+
// pooling, custom Transport, or shorter Client.Timeout pass
85+
// their own.
86+
HTTPClient *http.Client
87+
88+
// PerPeerTimeout caps how long any single peer's poll runs
89+
// before being abandoned. Zero defaults to
90+
// defaultSQSCapabilityPollTimeout (3s). Tests pass a small
91+
// value (e.g. 100ms) so the per-peer cap path can be
92+
// exercised quickly without a parent context deadline.
93+
PerPeerTimeout time.Duration
94+
}
95+
96+
// PollSQSHTFIFOCapability polls each peer's /sqs_health endpoint
97+
// concurrently and reports whether all advertise htfifo. The
98+
// helper is stateless — every call dials its peers fresh, so a
99+
// transient network blip on one call does not poison subsequent
100+
// calls.
101+
//
102+
// Per-peer behaviour:
103+
// - GET http://<peer>/sqs_health with Accept: application/json
104+
// - Expect HTTP 200 and a parseable JSON body matching
105+
// {"status":"ok","capabilities":[...]}.
106+
// - HasHTFIFO is the membership of htfifo in capabilities.
107+
// - Any failure (transport error, non-200, malformed JSON,
108+
// timeout, context cancellation) records the reason in Error
109+
// and leaves HasHTFIFO=false. The poller never returns a
110+
// fatal error from PollSQSHTFIFOCapability itself; the report
111+
// carries every per-peer outcome instead.
112+
//
113+
// Concurrency: peers are polled in goroutines; results land via
114+
// an indexed channel so the slice writes are obviously race-free.
115+
//
116+
// Timeouts: each peer poll is bounded by
117+
// min(ctx.Deadline(), now+cfg.PerPeerTimeout). A long ctx deadline
118+
// does not extend the per-peer cap, and an absent ctx deadline
119+
// still triggers fail-closed at the per-peer cap.
120+
func PollSQSHTFIFOCapability(ctx context.Context, peers []string, cfg PollerConfig) *HTFIFOCapabilityReport {
121+
client := cfg.HTTPClient
122+
if client == nil {
123+
client = http.DefaultClient
124+
}
125+
perPeerTimeout := cfg.PerPeerTimeout
126+
if perPeerTimeout <= 0 {
127+
perPeerTimeout = defaultSQSCapabilityPollTimeout
128+
}
129+
report := &HTFIFOCapabilityReport{
130+
Peers: make([]HTFIFOCapabilityPeerStatus, len(peers)),
131+
}
132+
if len(peers) == 0 {
133+
// Vacuously: every-of-empty is true. Operator decides
134+
// whether their peer list is meaningful.
135+
report.AllAdvertise = true
136+
return report
137+
}
138+
139+
type indexedStatus struct {
140+
idx int
141+
status HTFIFOCapabilityPeerStatus
142+
}
143+
results := make(chan indexedStatus, len(peers))
144+
var wg sync.WaitGroup
145+
for i, peer := range peers {
146+
wg.Add(1)
147+
go func(idx int, addr string) {
148+
defer wg.Done()
149+
results <- indexedStatus{
150+
idx: idx,
151+
status: pollOneSQSPeerForHTFIFO(ctx, client, addr, perPeerTimeout),
152+
}
153+
}(i, peer)
154+
}
155+
wg.Wait()
156+
close(results)
157+
158+
allAdvertise := true
159+
for r := range results {
160+
report.Peers[r.idx] = r.status
161+
if !r.status.HasHTFIFO {
162+
allAdvertise = false
163+
}
164+
}
165+
report.AllAdvertise = allAdvertise
166+
return report
167+
}
168+
169+
// pollOneSQSPeerForHTFIFO polls a single peer's /sqs_health and
170+
// returns its capability status. Any error is captured in the
171+
// returned struct's Error field — this function never returns a
172+
// Go error itself so the caller can map peers to results in one
173+
// pass without checking len(errors).
174+
func pollOneSQSPeerForHTFIFO(ctx context.Context, client *http.Client, peer string, perPeerTimeout time.Duration) HTFIFOCapabilityPeerStatus {
175+
status := HTFIFOCapabilityPeerStatus{Address: peer}
176+
177+
if peer == "" {
178+
status.Error = "empty peer address"
179+
return status
180+
}
181+
182+
pollCtx, cancel := context.WithTimeout(ctx, perPeerTimeout)
183+
defer cancel()
184+
185+
url := buildSQSHealthURL(peer)
186+
req, err := http.NewRequestWithContext(pollCtx, http.MethodGet, url, http.NoBody)
187+
if err != nil {
188+
status.Error = errors.Wrapf(err, "build request for %q", peer).Error()
189+
return status
190+
}
191+
req.Header.Set("Accept", "application/json")
192+
193+
resp, err := client.Do(req)
194+
if err != nil {
195+
status.Error = errors.Wrapf(err, "GET %q", url).Error()
196+
return status
197+
}
198+
// Close the body via a deferred closure so a non-nil close
199+
// error surfaces in the cluster logs rather than being
200+
// dropped — masking it could hide leaking connections under
201+
// load (gemini medium on PR #721). Body is also drained on
202+
// every early return below so the http.Transport can reuse
203+
// the underlying TCP connection (claude minor on PR #721).
204+
defer func() {
205+
if cerr := resp.Body.Close(); cerr != nil {
206+
slog.Warn("sqs capability poller: response body close failed",
207+
"peer", peer, "err", cerr)
208+
}
209+
}()
210+
211+
if resp.StatusCode != http.StatusOK {
212+
// Drain the body before returning so the transport can
213+
// reuse the connection. Non-200 bodies under our 1 KiB
214+
// LimitReader are tiny, so the discard cost is negligible.
215+
_, _ = io.Copy(io.Discard, io.LimitReader(resp.Body, sqsCapabilityMaxBodyBytes))
216+
status.Error = fmt.Sprintf("%s returned HTTP %d", url, resp.StatusCode)
217+
return status
218+
}
219+
220+
body, err := io.ReadAll(io.LimitReader(resp.Body, sqsCapabilityMaxBodyBytes))
221+
if err != nil {
222+
status.Error = errors.Wrapf(err, "read body from %q", url).Error()
223+
return status
224+
}
225+
226+
var parsed sqsHealthBody
227+
if err := json.Unmarshal(body, &parsed); err != nil {
228+
status.Error = fmt.Sprintf("malformed JSON from %s: %v", url, err)
229+
return status
230+
}
231+
232+
status.Capabilities = parsed.Capabilities
233+
for _, c := range parsed.Capabilities {
234+
if c == sqsCapabilityHTFIFO {
235+
status.HasHTFIFO = true
236+
break
237+
}
238+
}
239+
return status
240+
}
241+
242+
// sqsCapabilityMaxBodyBytes caps how much of the /sqs_health
243+
// response we read before bailing. The current body shape is a
244+
// short JSON object; an unbounded read would let a misconfigured
245+
// peer return megabytes. 1 KiB is far above the realistic body
246+
// size and far below "expensive to read".
247+
const sqsCapabilityMaxBodyBytes = 1 << 10
248+
249+
// buildSQSHealthURL prefixes peer with the http:// scheme when the
250+
// caller passed a bare host:port (the common case for
251+
// --raftSqsMap entries). Callers that need https:// can pass the
252+
// fully-qualified URL.
253+
func buildSQSHealthURL(peer string) string {
254+
if strings.HasPrefix(peer, "http://") || strings.HasPrefix(peer, "https://") {
255+
return strings.TrimRight(peer, "/") + sqsHealthPath
256+
}
257+
return "http://" + peer + sqsHealthPath
258+
}

0 commit comments

Comments
 (0)