From 9e5ecd1a4ec90d090b2fb86ae5a4b27df7f7e743 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Thu, 30 Apr 2026 20:06:18 +0900 Subject: [PATCH 1/2] feat(sqs): htfifo capability poller (Phase 3.D PR 4-B-3a) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a stateless helper that polls each peer's /sqs_health endpoint and reports whether all advertise the htfifo capability. This is the building block PR 5's CreateQueue gate uses to refuse PartitionCount > 1 until every node in the cluster has the HT-FIFO data plane. What changes adapter/sqs_capability_poller.go (new file): - HTFIFOCapabilityReport: AllAdvertise + per-peer detail. - HTFIFOCapabilityPeerStatus: address, HasHTFIFO flag, raw capabilities slice, Error string for failure detail. - PollSQSHTFIFOCapability(ctx, client, peers): polls each peer concurrently. Returns AllAdvertise=false on any timeout, HTTP error, malformed JSON, or missing-capability — fail-closed per §8.5. - Vacuously AllAdvertise=true on empty peer list (caller is responsible for ensuring the peer list is meaningful). - Per-peer timeout capped by defaultSQSCapabilityPollTimeout (3s) so a single hung peer cannot stall the whole poll. - Body capped at 1 KiB via io.LimitReader so a misconfigured peer cannot drain memory. - Bare host:port and full http://… / https://… URLs both accepted — operators can front the endpoint with TLS or a proxy without the helper having to know. What does NOT change yet - htfifoCapabilityAdvertised stays false. PR 4-B-3b adds the §8 leadership-refusal hook (startup + per-acquisition observer) and flips this flag. - CreateQueue does NOT yet call PollSQSHTFIFOCapability — PR 5 lifts the PartitionCount > 1 dormancy gate AND wires the capability check in the same commit. Tests adapter/sqs_capability_poller_test.go: 9 top-level tests across the contract surface. - AllAdvertise happy path with multiple peers. - One-bad-apple: a peer with empty capabilities drops AllAdvertise. - Transport failures (HTTP 500, connection refused, malformed JSON) all fail closed with non-empty Error. - Hung peer respects per-peer timeout — test bound is well below what a serial poll would take. - Empty peer list → AllAdvertise vacuously true. - Empty peer address → fail closed with explicit Error. - Full-URL peer (http:// or https://) accepted alongside bare host:port. - Concurrent polling: 5 peers each delaying 200ms must finish in well under 1 second (serial would take 1s+). - Body-size limit: a 10 KiB response truncated mid-string is surfaced as a JSON parse error, not a half-decoded value. - TestBuildSQSHealthURL covers the URL construction edge cases. --- adapter/sqs_capability_poller.go | 214 +++++++++++++++++++ adapter/sqs_capability_poller_test.go | 290 ++++++++++++++++++++++++++ 2 files changed, 504 insertions(+) create mode 100644 adapter/sqs_capability_poller.go create mode 100644 adapter/sqs_capability_poller_test.go diff --git a/adapter/sqs_capability_poller.go b/adapter/sqs_capability_poller.go new file mode 100644 index 00000000..47d6f836 --- /dev/null +++ b/adapter/sqs_capability_poller.go @@ -0,0 +1,214 @@ +package adapter + +import ( + "context" + "fmt" + "io" + "net/http" + "strings" + "sync" + "time" + + "github.com/cockroachdb/errors" + json "github.com/goccy/go-json" +) + +// HTFIFOCapabilityReport summarises the result of polling each peer's +// /sqs_health endpoint for the htfifo capability. Used by the +// CreateQueue capability gate (Phase 3.D PR 5) and by operator +// tooling that needs to confirm a rolling upgrade has finished +// before enabling partitioned FIFO queues. +// +// AllAdvertise is the binary go/no-go signal for the gate; Peers +// carries per-node detail for log lines and operator triage. +type HTFIFOCapabilityReport struct { + // AllAdvertise is true iff every peer in the input list + // returned a /sqs_health body whose `capabilities` array + // contains the htfifo capability string. False on any timeout, + // HTTP error, malformed body, or missing-capability — the + // gate fails closed. + // + // Vacuously true on an empty peer list. The caller (CreateQueue + // gate) is responsible for ensuring the peer list reflects the + // current cluster membership before consulting this report. + AllAdvertise bool + + // Peers is the per-peer status, indexed in input order. Each + // entry has either HasHTFIFO=true (peer advertised the + // capability) or a non-empty Error explaining why the peer + // did not pass. Capabilities is the raw list returned by the + // peer when the body was parseable. + Peers []HTFIFOCapabilityPeerStatus +} + +// HTFIFOCapabilityPeerStatus is one peer's polling result. +type HTFIFOCapabilityPeerStatus struct { + // Address is the peer's host:port as supplied to the poller. + Address string + + // HasHTFIFO is true iff the peer's /sqs_health JSON body's + // capabilities array contained the htfifo capability string. + HasHTFIFO bool + + // Capabilities is the parsed capabilities array. Nil on any + // failure before JSON parsing, or non-nil but missing + // htfifo when the peer is on an older binary. + Capabilities []string + + // Error is empty on a clean success (HTTP 200 + parseable + // JSON, regardless of whether HasHTFIFO is true) and non-empty + // on any failure (transport error, non-200 status, malformed + // JSON, or context cancellation). + Error string +} + +// defaultSQSCapabilityPollTimeout caps how long the poller waits on +// any single peer. The §8.5 design's "fail-closed default for +// nodes that don't respond within a short timeout" turns into a +// concrete bound here. Operators wanting a longer wait can pass +// their own context with a deadline; the per-peer cap is enforced +// in addition so a single slow peer cannot stall the whole poll. +const defaultSQSCapabilityPollTimeout = 3 * time.Second + +// PollSQSHTFIFOCapability polls each peer's /sqs_health endpoint +// concurrently and reports whether all advertise htfifo. The +// helper is stateless — every call dials its peers fresh, so a +// transient network blip on one call does not poison subsequent +// calls. +// +// Per-peer behaviour: +// - GET http:///sqs_health with Accept: application/json +// - Expect HTTP 200 and a parseable JSON body matching +// {"status":"ok","capabilities":[...]}. +// - HasHTFIFO is the membership of htfifo in capabilities. +// - Any failure (transport error, non-200, malformed JSON, +// timeout, context cancellation) records the reason in Error +// and leaves HasHTFIFO=false. The poller never returns a +// fatal error from PollSQSHTFIFOCapability itself; the report +// carries every per-peer outcome instead. +// +// Concurrency: peers are polled in goroutines; results land via +// an indexed channel so the slice writes are obviously race-free. +// +// Timeouts: each peer poll is bounded by min(ctx.Deadline(), +// defaultSQSCapabilityPollTimeout). A long ctx deadline does not +// extend the per-peer cap. +func PollSQSHTFIFOCapability(ctx context.Context, client *http.Client, peers []string) *HTFIFOCapabilityReport { + if client == nil { + client = http.DefaultClient + } + report := &HTFIFOCapabilityReport{ + Peers: make([]HTFIFOCapabilityPeerStatus, len(peers)), + } + if len(peers) == 0 { + // Vacuously: every-of-empty is true. Operator decides + // whether their peer list is meaningful. + report.AllAdvertise = true + return report + } + + type indexedStatus struct { + idx int + status HTFIFOCapabilityPeerStatus + } + results := make(chan indexedStatus, len(peers)) + var wg sync.WaitGroup + for i, peer := range peers { + wg.Add(1) + go func(idx int, addr string) { + defer wg.Done() + results <- indexedStatus{ + idx: idx, + status: pollOneSQSPeerForHTFIFO(ctx, client, addr), + } + }(i, peer) + } + wg.Wait() + close(results) + + allAdvertise := true + for r := range results { + report.Peers[r.idx] = r.status + if !r.status.HasHTFIFO { + allAdvertise = false + } + } + report.AllAdvertise = allAdvertise + return report +} + +// pollOneSQSPeerForHTFIFO polls a single peer's /sqs_health and +// returns its capability status. Any error is captured in the +// returned struct's Error field — this function never returns a +// Go error itself so the caller can map peers to results in one +// pass without checking len(errors). +func pollOneSQSPeerForHTFIFO(ctx context.Context, client *http.Client, peer string) HTFIFOCapabilityPeerStatus { + status := HTFIFOCapabilityPeerStatus{Address: peer} + + if peer == "" { + status.Error = "empty peer address" + return status + } + + pollCtx, cancel := context.WithTimeout(ctx, defaultSQSCapabilityPollTimeout) + defer cancel() + + url := buildSQSHealthURL(peer) + req, err := http.NewRequestWithContext(pollCtx, http.MethodGet, url, http.NoBody) + if err != nil { + status.Error = errors.Wrapf(err, "build request for %q", peer).Error() + return status + } + req.Header.Set("Accept", "application/json") + + resp, err := client.Do(req) + if err != nil { + status.Error = errors.Wrapf(err, "GET %q", url).Error() + return status + } + defer func() { _ = resp.Body.Close() }() + + if resp.StatusCode != http.StatusOK { + status.Error = fmt.Sprintf("%s returned HTTP %d", url, resp.StatusCode) + return status + } + + body, err := io.ReadAll(io.LimitReader(resp.Body, sqsCapabilityMaxBodyBytes)) + if err != nil { + status.Error = errors.Wrapf(err, "read body from %q", url).Error() + return status + } + + var parsed sqsHealthBody + if err := json.Unmarshal(body, &parsed); err != nil { + status.Error = fmt.Sprintf("malformed JSON from %s: %v", url, err) + return status + } + + status.Capabilities = parsed.Capabilities + for _, c := range parsed.Capabilities { + if c == sqsCapabilityHTFIFO { + status.HasHTFIFO = true + break + } + } + return status +} + +// sqsCapabilityMaxBodyBytes caps how much of the /sqs_health +// response we read before bailing. The current body shape is a +// short JSON object; an unbounded read would let a misconfigured +// peer return megabytes. 1 KiB is far above the realistic body +// size and far below "expensive to read". +const sqsCapabilityMaxBodyBytes = 1 << 10 + +// buildSQSHealthURL prefixes peer with the http:// scheme when the +// caller passed a bare host:port (the common case for +// --raftSqsMap entries). Callers that need https:// can pass the +// fully-qualified URL. +func buildSQSHealthURL(peer string) string { + if strings.HasPrefix(peer, "http://") || strings.HasPrefix(peer, "https://") { + return strings.TrimRight(peer, "/") + sqsHealthPath + } + return "http://" + peer + sqsHealthPath +} diff --git a/adapter/sqs_capability_poller_test.go b/adapter/sqs_capability_poller_test.go new file mode 100644 index 00000000..cbdcc90d --- /dev/null +++ b/adapter/sqs_capability_poller_test.go @@ -0,0 +1,290 @@ +package adapter + +import ( + "context" + "fmt" + "io" + "net/http" + "net/http/httptest" + "strings" + "sync/atomic" + "testing" + "time" + + json "github.com/goccy/go-json" + "github.com/stretchr/testify/require" +) + +// newSQSHealthServer builds an httptest.Server that responds to +// GET /sqs_health with the given JSON body when Accept includes +// application/json. Returns the server and its address (host:port, +// suitable for the poller's bare-address path). +func newSQSHealthServer(t *testing.T, body sqsHealthBody) (*httptest.Server, string) { + t.Helper() + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != sqsHealthPath { + http.Error(w, "not found", http.StatusNotFound) + return + } + if r.Method != http.MethodGet && r.Method != http.MethodHead { + http.Error(w, "method", http.StatusMethodNotAllowed) + return + } + w.Header().Set("Content-Type", "application/json; charset=utf-8") + _ = json.NewEncoder(w).Encode(body) + })) + t.Cleanup(srv.Close) + addr := strings.TrimPrefix(srv.URL, "http://") + return srv, addr +} + +// TestPollSQSHTFIFOCapability_AllAdvertise pins the happy path: +// every peer responds with htfifo in capabilities → AllAdvertise +// is true and each peer's HasHTFIFO is true. +func TestPollSQSHTFIFOCapability_AllAdvertise(t *testing.T) { + t.Parallel() + _, addr1 := newSQSHealthServer(t, sqsHealthBody{Status: "ok", Capabilities: []string{sqsCapabilityHTFIFO}}) + _, addr2 := newSQSHealthServer(t, sqsHealthBody{Status: "ok", Capabilities: []string{sqsCapabilityHTFIFO}}) + + report := PollSQSHTFIFOCapability(context.Background(), nil, []string{addr1, addr2}) + require.True(t, report.AllAdvertise, + "all peers advertise → AllAdvertise must be true") + require.Len(t, report.Peers, 2) + for i, p := range report.Peers { + require.True(t, p.HasHTFIFO, "peer %d HasHTFIFO must be true", i) + require.Empty(t, p.Error, "peer %d Error must be empty", i) + require.Equal(t, []string{sqsCapabilityHTFIFO}, p.Capabilities) + } +} + +// TestPollSQSHTFIFOCapability_OneMissingFailsClosed pins the +// one-bad-apple invariant: a single peer missing the capability +// drops AllAdvertise to false. The other peers' detail is still +// returned for operator triage. +func TestPollSQSHTFIFOCapability_OneMissingFailsClosed(t *testing.T) { + t.Parallel() + _, addrGood := newSQSHealthServer(t, sqsHealthBody{Status: "ok", Capabilities: []string{sqsCapabilityHTFIFO}}) + _, addrOld := newSQSHealthServer(t, sqsHealthBody{Status: "ok", Capabilities: []string{}}) + + report := PollSQSHTFIFOCapability(context.Background(), nil, []string{addrGood, addrOld}) + require.False(t, report.AllAdvertise, + "one peer without the capability must drop AllAdvertise") + require.Len(t, report.Peers, 2) + require.True(t, report.Peers[0].HasHTFIFO) + require.False(t, report.Peers[1].HasHTFIFO, + "old peer's HasHTFIFO must be false") + require.Empty(t, report.Peers[1].Error, + "old peer responded successfully — Error must be empty even "+ + "though HasHTFIFO is false") + require.Equal(t, []string{}, report.Peers[1].Capabilities, + "empty capabilities slice is the legitimate \"old binary\" signal") +} + +// TestPollSQSHTFIFOCapability_HTTPErrorFailsClosed pins the +// transport-failure path: a peer that returns 500, refuses +// connections, or returns a malformed body all drop AllAdvertise +// and record the reason in Error. +func TestPollSQSHTFIFOCapability_HTTPErrorFailsClosed(t *testing.T) { + t.Parallel() + + // Peer that returns 500. + errSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + http.Error(w, "boom", http.StatusInternalServerError) + })) + t.Cleanup(errSrv.Close) + addr500 := strings.TrimPrefix(errSrv.URL, "http://") + + // Peer that doesn't exist (connection refused). + addrUnreachable := "127.0.0.1:1" // port 1 → connection refused on most systems + + // Peer that returns garbage JSON. + garbageSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "application/json") + _, _ = io.WriteString(w, "not json {{{") + })) + t.Cleanup(garbageSrv.Close) + addrGarbage := strings.TrimPrefix(garbageSrv.URL, "http://") + + report := PollSQSHTFIFOCapability(context.Background(), nil, + []string{addr500, addrUnreachable, addrGarbage}) + + require.False(t, report.AllAdvertise, + "any transport / parse failure must fail closed") + for _, p := range report.Peers { + require.False(t, p.HasHTFIFO) + require.NotEmpty(t, p.Error, + "peer %s: every failure branch must record an Error string "+ + "so operators can triage", p.Address) + } + + require.Contains(t, report.Peers[0].Error, "HTTP 500") + require.Contains(t, report.Peers[2].Error, "malformed JSON") +} + +// TestPollSQSHTFIFOCapability_TimeoutFailsClosed pins the +// "fail-closed default for nodes that don't respond within a short +// timeout" rule from §8.5: a peer that hangs past the per-peer +// timeout must drop AllAdvertise without holding up the entire +// poll for longer than that bound. +func TestPollSQSHTFIFOCapability_TimeoutFailsClosed(t *testing.T) { + t.Parallel() + + // Peer that delays past the test's timeout. + hangSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + select { + case <-r.Context().Done(): + return + case <-time.After(5 * time.Second): + w.WriteHeader(http.StatusOK) + } + })) + t.Cleanup(hangSrv.Close) + addrHang := strings.TrimPrefix(hangSrv.URL, "http://") + + // Use a context with a short bound to force the per-peer + // timeout path quickly. The test should finish in well under + // the 5s the server would have waited. + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + start := time.Now() + report := PollSQSHTFIFOCapability(ctx, nil, []string{addrHang}) + elapsed := time.Since(start) + + require.False(t, report.AllAdvertise) + require.Less(t, elapsed, 4*time.Second, + "poll must respect the per-peer timeout — a hung peer must "+ + "not stall for the full server-side delay") + require.NotEmpty(t, report.Peers[0].Error) +} + +// TestPollSQSHTFIFOCapability_EmptyPeersIsVacuouslyTrue pins the +// no-peers behaviour: with no peers to consult, every-of-empty is +// vacuously true. The caller (CreateQueue gate) is responsible for +// ensuring the peer list is meaningful before consulting the +// report. +func TestPollSQSHTFIFOCapability_EmptyPeersIsVacuouslyTrue(t *testing.T) { + t.Parallel() + report := PollSQSHTFIFOCapability(context.Background(), nil, nil) + require.True(t, report.AllAdvertise) + require.Empty(t, report.Peers) +} + +// TestPollSQSHTFIFOCapability_EmptyPeerAddressFailsClosed pins +// that an empty string in the peer slice is treated as a config +// error and surfaced via Error. Otherwise a "" entry would produce +// a malformed URL and a confusing transport error. +func TestPollSQSHTFIFOCapability_EmptyPeerAddressFailsClosed(t *testing.T) { + t.Parallel() + report := PollSQSHTFIFOCapability(context.Background(), nil, []string{""}) + require.False(t, report.AllAdvertise) + require.Len(t, report.Peers, 1) + require.Equal(t, "empty peer address", report.Peers[0].Error) +} + +// TestPollSQSHTFIFOCapability_FullURLPeer pins that a caller can +// pass a fully-qualified URL (http:// or https://) instead of the +// bare host:port form. Lets operators front the endpoint with TLS +// or a proxy without the helper having to know about it. +func TestPollSQSHTFIFOCapability_FullURLPeer(t *testing.T) { + t.Parallel() + srv, _ := newSQSHealthServer(t, sqsHealthBody{ + Status: "ok", Capabilities: []string{sqsCapabilityHTFIFO}, + }) + report := PollSQSHTFIFOCapability(context.Background(), nil, []string{srv.URL}) + require.True(t, report.AllAdvertise) + require.True(t, report.Peers[0].HasHTFIFO) +} + +// TestPollSQSHTFIFOCapability_ConcurrentPolling pins that peers +// are polled in parallel — N peers each with a 200ms delay must +// finish in well under N*200ms. Without concurrent polling, a +// rolling upgrade with many nodes would gate every CreateQueue on +// a serial walk. +func TestPollSQSHTFIFOCapability_ConcurrentPolling(t *testing.T) { + t.Parallel() + const peerCount = 5 + const perPeerDelay = 200 * time.Millisecond + + hits := atomic.Int64{} + mkSrv := func() string { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + hits.Add(1) + time.Sleep(perPeerDelay) + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(sqsHealthBody{ + Status: "ok", Capabilities: []string{sqsCapabilityHTFIFO}, + }) + })) + t.Cleanup(srv.Close) + return strings.TrimPrefix(srv.URL, "http://") + } + peers := make([]string, peerCount) + for i := range peers { + peers[i] = mkSrv() + } + + start := time.Now() + report := PollSQSHTFIFOCapability(context.Background(), nil, peers) + elapsed := time.Since(start) + + require.True(t, report.AllAdvertise) + require.Equal(t, int64(peerCount), hits.Load()) + // Serial poll would take peerCount * perPeerDelay = 1000ms. + // Concurrent should be ~perPeerDelay (with some scheduler + // slack). Allow generous bound. + require.Less(t, elapsed, time.Duration(peerCount-1)*perPeerDelay, + "poll elapsed %v — peers must be polled concurrently, "+ + "not serially", elapsed) +} + +// TestBuildSQSHealthURL pins the URL-construction edge cases. +// The poller is otherwise opaque about its URL formation; this +// test is the hook for the bare-host-port vs full-URL contract. +func TestBuildSQSHealthURL(t *testing.T) { + t.Parallel() + cases := []struct { + peer string + want string + }{ + {"127.0.0.1:5050", "http://127.0.0.1:5050" + sqsHealthPath}, + {"node.example:5050", "http://node.example:5050" + sqsHealthPath}, + {"http://node.example:5050", "http://node.example:5050" + sqsHealthPath}, + {"http://node.example:5050/", "http://node.example:5050" + sqsHealthPath}, + {"https://node.example", "https://node.example" + sqsHealthPath}, + } + for _, tc := range cases { + t.Run(tc.peer, func(t *testing.T) { + t.Parallel() + require.Equal(t, tc.want, buildSQSHealthURL(tc.peer)) + }) + } +} + +// TestPollSQSHTFIFOCapability_RespectsBodyLimit pins that a +// pathologically-large /sqs_health response is bounded — the +// poller will not consume megabytes from a misconfigured peer. +// Tests the io.LimitReader cap. +func TestPollSQSHTFIFOCapability_RespectsBodyLimit(t *testing.T) { + t.Parallel() + // Server emits a body that exceeds the limit. The poller's + // LimitReader will truncate it, JSON parse will fail, and the + // peer will fail closed — pin that path. + bigSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "application/json") + // Valid JSON prefix but a giant string field that the + // LimitReader will cut mid-value, leaving the body + // unparseable. + _, _ = fmt.Fprint(w, `{"status":"ok","capabilities":["`) + _, _ = fmt.Fprint(w, strings.Repeat("X", 10*sqsCapabilityMaxBodyBytes)) + _, _ = fmt.Fprint(w, `"]}`) + })) + t.Cleanup(bigSrv.Close) + addr := strings.TrimPrefix(bigSrv.URL, "http://") + + report := PollSQSHTFIFOCapability(context.Background(), nil, []string{addr}) + require.False(t, report.AllAdvertise) + require.Contains(t, report.Peers[0].Error, "malformed JSON", + "truncated body must surface as JSON parse error, not as "+ + "a successful read of garbage capabilities") +} From 2d9e06d83b0e1453bac2fff4c760cb03ba1fbca0 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Thu, 30 Apr 2026 20:16:56 +0900 Subject: [PATCH 2/2] fix(sqs): drain non-200 body, log close err, options struct MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR #721 round 1 review fixes: 1) Gemini medium — Body.Close error silently ignored defer func() { _ = resp.Body.Close() }() dropped any close error. A failed Close indicates a connection that the http.Transport will tear down rather than reuse — under load this can mask leaking connections / file descriptors. Logged via slog.Warn with the peer address so operators can grep the cluster log when triaging. 2) Claude minor — non-200 body not drained resp.Body.Close() without first draining the body prevents the http.Transport from reusing the underlying TCP connection. In a control-plane path (one CreateQueue call per gate check) this is acceptable, but if the gate ever fans out across many peers under load, the failed-peer branch would force connection teardown on every error response. Drain via io.Copy(io.Discard, io.LimitReader(resp.Body, sqsCapabilityMaxBodyBytes)) before the early return so the transport can reuse the connection. 3) Claude nit — per-peer cap not exercised independently TestPollSQSHTFIFOCapability_TimeoutFailsClosed used a 500ms parent ctx — the request actually timed out via the parent context, not the per-peer cap. The default 3s per-peer cap was never independently exercised by tests. Refactored signature to PollSQSHTFIFOCapability(ctx, peers, cfg PollerConfig). PollerConfig{HTTPClient, PerPeerTimeout} is the single options surface — zero values pick safe defaults. Renamed the existing test to TestPollSQSHTFIFOCapability_ParentContextDeadlineFailsClosed and added TestPollSQSHTFIFOCapability_PerPeerTimeoutFailsClosed which uses context.Background() and PerPeerTimeout=100ms to exercise the cap independently. PerPeerTimeout is also a sensible operator knob — different cluster latencies want different bounds. Caller-side: only *_test.go files use the function today; PR 5's CreateQueue gate will pick the appropriate timeout when it wires this up. 4) Claude minor — buildSQSHealthURL double-path edge case A caller passing a URL that already includes the health path (e.g. "http://node:5050/sqs_health") would receive a doubled path. Added an explicit test case to TestBuildSQSHealthURL documenting the behavior and the contract ("pass a base URL or host:port, never a full request URL"). A future refactor can intentionally change the contract; the test will catch it. 5) Audit per the lessons-learned discipline PollSQSHTFIFOCapability is exported but has no production callers yet — only the test file references it. grep confirmed the API change is safe. --- adapter/sqs_capability_poller.go | 70 ++++++++++++++++++---- adapter/sqs_capability_poller_test.go | 86 ++++++++++++++++++++------- 2 files changed, 121 insertions(+), 35 deletions(-) diff --git a/adapter/sqs_capability_poller.go b/adapter/sqs_capability_poller.go index 47d6f836..0a3aeed9 100644 --- a/adapter/sqs_capability_poller.go +++ b/adapter/sqs_capability_poller.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "log/slog" "net/http" "strings" "sync" @@ -63,13 +64,35 @@ type HTFIFOCapabilityPeerStatus struct { } // defaultSQSCapabilityPollTimeout caps how long the poller waits on -// any single peer. The §8.5 design's "fail-closed default for -// nodes that don't respond within a short timeout" turns into a -// concrete bound here. Operators wanting a longer wait can pass -// their own context with a deadline; the per-peer cap is enforced -// in addition so a single slow peer cannot stall the whole poll. +// any single peer when PollerConfig.PerPeerTimeout is zero. The +// §8.5 design's "fail-closed default for nodes that don't respond +// within a short timeout" turns into a concrete bound here. +// Operators wanting a longer or shorter wait can override via +// PollerConfig; the cap is enforced in addition to any +// caller-supplied context deadline so a single slow peer cannot +// stall the whole poll. const defaultSQSCapabilityPollTimeout = 3 * time.Second +// PollerConfig tunes PollSQSHTFIFOCapability for a specific call +// site. All fields are optional — the zero value picks safe +// defaults. Tests use the explicit PerPeerTimeout to exercise the +// per-peer cap independently of any caller-supplied context +// deadline. +type PollerConfig struct { + // HTTPClient is the client used for /sqs_health GETs. Nil + // falls back to http.DefaultClient. Callers wanting connection + // pooling, custom Transport, or shorter Client.Timeout pass + // their own. + HTTPClient *http.Client + + // PerPeerTimeout caps how long any single peer's poll runs + // before being abandoned. Zero defaults to + // defaultSQSCapabilityPollTimeout (3s). Tests pass a small + // value (e.g. 100ms) so the per-peer cap path can be + // exercised quickly without a parent context deadline. + PerPeerTimeout time.Duration +} + // PollSQSHTFIFOCapability polls each peer's /sqs_health endpoint // concurrently and reports whether all advertise htfifo. The // helper is stateless — every call dials its peers fresh, so a @@ -90,13 +113,19 @@ const defaultSQSCapabilityPollTimeout = 3 * time.Second // Concurrency: peers are polled in goroutines; results land via // an indexed channel so the slice writes are obviously race-free. // -// Timeouts: each peer poll is bounded by min(ctx.Deadline(), -// defaultSQSCapabilityPollTimeout). A long ctx deadline does not -// extend the per-peer cap. -func PollSQSHTFIFOCapability(ctx context.Context, client *http.Client, peers []string) *HTFIFOCapabilityReport { +// Timeouts: each peer poll is bounded by +// min(ctx.Deadline(), now+cfg.PerPeerTimeout). A long ctx deadline +// does not extend the per-peer cap, and an absent ctx deadline +// still triggers fail-closed at the per-peer cap. +func PollSQSHTFIFOCapability(ctx context.Context, peers []string, cfg PollerConfig) *HTFIFOCapabilityReport { + client := cfg.HTTPClient if client == nil { client = http.DefaultClient } + perPeerTimeout := cfg.PerPeerTimeout + if perPeerTimeout <= 0 { + perPeerTimeout = defaultSQSCapabilityPollTimeout + } report := &HTFIFOCapabilityReport{ Peers: make([]HTFIFOCapabilityPeerStatus, len(peers)), } @@ -119,7 +148,7 @@ func PollSQSHTFIFOCapability(ctx context.Context, client *http.Client, peers []s defer wg.Done() results <- indexedStatus{ idx: idx, - status: pollOneSQSPeerForHTFIFO(ctx, client, addr), + status: pollOneSQSPeerForHTFIFO(ctx, client, addr, perPeerTimeout), } }(i, peer) } @@ -142,7 +171,7 @@ func PollSQSHTFIFOCapability(ctx context.Context, client *http.Client, peers []s // returned struct's Error field — this function never returns a // Go error itself so the caller can map peers to results in one // pass without checking len(errors). -func pollOneSQSPeerForHTFIFO(ctx context.Context, client *http.Client, peer string) HTFIFOCapabilityPeerStatus { +func pollOneSQSPeerForHTFIFO(ctx context.Context, client *http.Client, peer string, perPeerTimeout time.Duration) HTFIFOCapabilityPeerStatus { status := HTFIFOCapabilityPeerStatus{Address: peer} if peer == "" { @@ -150,7 +179,7 @@ func pollOneSQSPeerForHTFIFO(ctx context.Context, client *http.Client, peer stri return status } - pollCtx, cancel := context.WithTimeout(ctx, defaultSQSCapabilityPollTimeout) + pollCtx, cancel := context.WithTimeout(ctx, perPeerTimeout) defer cancel() url := buildSQSHealthURL(peer) @@ -166,9 +195,24 @@ func pollOneSQSPeerForHTFIFO(ctx context.Context, client *http.Client, peer stri status.Error = errors.Wrapf(err, "GET %q", url).Error() return status } - defer func() { _ = resp.Body.Close() }() + // Close the body via a deferred closure so a non-nil close + // error surfaces in the cluster logs rather than being + // dropped — masking it could hide leaking connections under + // load (gemini medium on PR #721). Body is also drained on + // every early return below so the http.Transport can reuse + // the underlying TCP connection (claude minor on PR #721). + defer func() { + if cerr := resp.Body.Close(); cerr != nil { + slog.Warn("sqs capability poller: response body close failed", + "peer", peer, "err", cerr) + } + }() if resp.StatusCode != http.StatusOK { + // Drain the body before returning so the transport can + // reuse the connection. Non-200 bodies under our 1 KiB + // LimitReader are tiny, so the discard cost is negligible. + _, _ = io.Copy(io.Discard, io.LimitReader(resp.Body, sqsCapabilityMaxBodyBytes)) status.Error = fmt.Sprintf("%s returned HTTP %d", url, resp.StatusCode) return status } diff --git a/adapter/sqs_capability_poller_test.go b/adapter/sqs_capability_poller_test.go index cbdcc90d..5defec6f 100644 --- a/adapter/sqs_capability_poller_test.go +++ b/adapter/sqs_capability_poller_test.go @@ -46,7 +46,7 @@ func TestPollSQSHTFIFOCapability_AllAdvertise(t *testing.T) { _, addr1 := newSQSHealthServer(t, sqsHealthBody{Status: "ok", Capabilities: []string{sqsCapabilityHTFIFO}}) _, addr2 := newSQSHealthServer(t, sqsHealthBody{Status: "ok", Capabilities: []string{sqsCapabilityHTFIFO}}) - report := PollSQSHTFIFOCapability(context.Background(), nil, []string{addr1, addr2}) + report := PollSQSHTFIFOCapability(context.Background(), []string{addr1, addr2}, PollerConfig{}) require.True(t, report.AllAdvertise, "all peers advertise → AllAdvertise must be true") require.Len(t, report.Peers, 2) @@ -66,7 +66,7 @@ func TestPollSQSHTFIFOCapability_OneMissingFailsClosed(t *testing.T) { _, addrGood := newSQSHealthServer(t, sqsHealthBody{Status: "ok", Capabilities: []string{sqsCapabilityHTFIFO}}) _, addrOld := newSQSHealthServer(t, sqsHealthBody{Status: "ok", Capabilities: []string{}}) - report := PollSQSHTFIFOCapability(context.Background(), nil, []string{addrGood, addrOld}) + report := PollSQSHTFIFOCapability(context.Background(), []string{addrGood, addrOld}, PollerConfig{}) require.False(t, report.AllAdvertise, "one peer without the capability must drop AllAdvertise") require.Len(t, report.Peers, 2) @@ -105,8 +105,8 @@ func TestPollSQSHTFIFOCapability_HTTPErrorFailsClosed(t *testing.T) { t.Cleanup(garbageSrv.Close) addrGarbage := strings.TrimPrefix(garbageSrv.URL, "http://") - report := PollSQSHTFIFOCapability(context.Background(), nil, - []string{addr500, addrUnreachable, addrGarbage}) + report := PollSQSHTFIFOCapability(context.Background(), + []string{addr500, addrUnreachable, addrGarbage}, PollerConfig{}) require.False(t, report.AllAdvertise, "any transport / parse failure must fail closed") @@ -121,15 +121,13 @@ func TestPollSQSHTFIFOCapability_HTTPErrorFailsClosed(t *testing.T) { require.Contains(t, report.Peers[2].Error, "malformed JSON") } -// TestPollSQSHTFIFOCapability_TimeoutFailsClosed pins the -// "fail-closed default for nodes that don't respond within a short -// timeout" rule from §8.5: a peer that hangs past the per-peer -// timeout must drop AllAdvertise without holding up the entire -// poll for longer than that bound. -func TestPollSQSHTFIFOCapability_TimeoutFailsClosed(t *testing.T) { +// TestPollSQSHTFIFOCapability_ParentContextDeadlineFailsClosed +// pins that an expiring parent ctx cancels the request — the +// poll respects whichever bound (parent ctx or per-peer cap) +// fires first. +func TestPollSQSHTFIFOCapability_ParentContextDeadlineFailsClosed(t *testing.T) { t.Parallel() - // Peer that delays past the test's timeout. hangSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { select { case <-r.Context().Done(): @@ -141,20 +139,52 @@ func TestPollSQSHTFIFOCapability_TimeoutFailsClosed(t *testing.T) { t.Cleanup(hangSrv.Close) addrHang := strings.TrimPrefix(hangSrv.URL, "http://") - // Use a context with a short bound to force the per-peer - // timeout path quickly. The test should finish in well under - // the 5s the server would have waited. + // Parent ctx expires before the per-peer cap (default 3s). ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) defer cancel() start := time.Now() - report := PollSQSHTFIFOCapability(ctx, nil, []string{addrHang}) + report := PollSQSHTFIFOCapability(ctx, []string{addrHang}, PollerConfig{}) elapsed := time.Since(start) require.False(t, report.AllAdvertise) require.Less(t, elapsed, 4*time.Second, - "poll must respect the per-peer timeout — a hung peer must "+ - "not stall for the full server-side delay") + "poll must respect the parent ctx deadline") + require.NotEmpty(t, report.Peers[0].Error) +} + +// TestPollSQSHTFIFOCapability_PerPeerTimeoutFailsClosed pins the +// per-peer cap independently of any parent ctx deadline. With +// context.Background() (no deadline) and PollerConfig.PerPeerTimeout +// set short, the poll MUST still abandon a hung peer at the cap — +// otherwise a missing parent deadline would let a single slow peer +// stall a CreateQueue gate indefinitely (claude nit on PR #721 +// round 1). +func TestPollSQSHTFIFOCapability_PerPeerTimeoutFailsClosed(t *testing.T) { + t.Parallel() + + hangSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + select { + case <-r.Context().Done(): + return + case <-time.After(5 * time.Second): + w.WriteHeader(http.StatusOK) + } + })) + t.Cleanup(hangSrv.Close) + addrHang := strings.TrimPrefix(hangSrv.URL, "http://") + + // No parent deadline; per-peer cap is the only bound. + start := time.Now() + report := PollSQSHTFIFOCapability(context.Background(), []string{addrHang}, + PollerConfig{PerPeerTimeout: 100 * time.Millisecond}) + elapsed := time.Since(start) + + require.False(t, report.AllAdvertise) + require.Less(t, elapsed, 2*time.Second, + "poll must respect the per-peer cap when there is no "+ + "parent ctx deadline — a missing deadline must NOT let "+ + "a hung peer stall the CreateQueue gate") require.NotEmpty(t, report.Peers[0].Error) } @@ -165,7 +195,7 @@ func TestPollSQSHTFIFOCapability_TimeoutFailsClosed(t *testing.T) { // report. func TestPollSQSHTFIFOCapability_EmptyPeersIsVacuouslyTrue(t *testing.T) { t.Parallel() - report := PollSQSHTFIFOCapability(context.Background(), nil, nil) + report := PollSQSHTFIFOCapability(context.Background(), nil, PollerConfig{}) require.True(t, report.AllAdvertise) require.Empty(t, report.Peers) } @@ -176,7 +206,7 @@ func TestPollSQSHTFIFOCapability_EmptyPeersIsVacuouslyTrue(t *testing.T) { // a malformed URL and a confusing transport error. func TestPollSQSHTFIFOCapability_EmptyPeerAddressFailsClosed(t *testing.T) { t.Parallel() - report := PollSQSHTFIFOCapability(context.Background(), nil, []string{""}) + report := PollSQSHTFIFOCapability(context.Background(), []string{""}, PollerConfig{}) require.False(t, report.AllAdvertise) require.Len(t, report.Peers, 1) require.Equal(t, "empty peer address", report.Peers[0].Error) @@ -191,7 +221,7 @@ func TestPollSQSHTFIFOCapability_FullURLPeer(t *testing.T) { srv, _ := newSQSHealthServer(t, sqsHealthBody{ Status: "ok", Capabilities: []string{sqsCapabilityHTFIFO}, }) - report := PollSQSHTFIFOCapability(context.Background(), nil, []string{srv.URL}) + report := PollSQSHTFIFOCapability(context.Background(), []string{srv.URL}, PollerConfig{}) require.True(t, report.AllAdvertise) require.True(t, report.Peers[0].HasHTFIFO) } @@ -225,7 +255,7 @@ func TestPollSQSHTFIFOCapability_ConcurrentPolling(t *testing.T) { } start := time.Now() - report := PollSQSHTFIFOCapability(context.Background(), nil, peers) + report := PollSQSHTFIFOCapability(context.Background(), peers, PollerConfig{}) elapsed := time.Since(start) require.True(t, report.AllAdvertise) @@ -252,6 +282,18 @@ func TestBuildSQSHealthURL(t *testing.T) { {"http://node.example:5050", "http://node.example:5050" + sqsHealthPath}, {"http://node.example:5050/", "http://node.example:5050" + sqsHealthPath}, {"https://node.example", "https://node.example" + sqsHealthPath}, + // Caller passing a URL that ALREADY includes the health + // path: documented behaviour is that the helper appends + // the path again (claude minor on PR #721 round 1). The + // contract is "pass a base URL or a host:port, never a + // full request URL". This case pins the behaviour so a + // future refactor can either keep it (and reject misuse + // via CreateQueue input validation) or change the + // contract intentionally. + { + "http://node.example:5050" + sqsHealthPath, + "http://node.example:5050" + sqsHealthPath + sqsHealthPath, + }, } for _, tc := range cases { t.Run(tc.peer, func(t *testing.T) { @@ -282,7 +324,7 @@ func TestPollSQSHTFIFOCapability_RespectsBodyLimit(t *testing.T) { t.Cleanup(bigSrv.Close) addr := strings.TrimPrefix(bigSrv.URL, "http://") - report := PollSQSHTFIFOCapability(context.Background(), nil, []string{addr}) + report := PollSQSHTFIFOCapability(context.Background(), []string{addr}, PollerConfig{}) require.False(t, report.AllAdvertise) require.Contains(t, report.Peers[0].Error, "malformed JSON", "truncated body must surface as JSON parse error, not as "+