Skip to content

Commit 2d9e06d

Browse files
committed
fix(sqs): drain non-200 body, log close err, options struct
PR #721 round 1 review fixes: 1) Gemini medium — Body.Close error silently ignored defer func() { _ = resp.Body.Close() }() dropped any close error. A failed Close indicates a connection that the http.Transport will tear down rather than reuse — under load this can mask leaking connections / file descriptors. Logged via slog.Warn with the peer address so operators can grep the cluster log when triaging. 2) Claude minor — non-200 body not drained resp.Body.Close() without first draining the body prevents the http.Transport from reusing the underlying TCP connection. In a control-plane path (one CreateQueue call per gate check) this is acceptable, but if the gate ever fans out across many peers under load, the failed-peer branch would force connection teardown on every error response. Drain via io.Copy(io.Discard, io.LimitReader(resp.Body, sqsCapabilityMaxBodyBytes)) before the early return so the transport can reuse the connection. 3) Claude nit — per-peer cap not exercised independently TestPollSQSHTFIFOCapability_TimeoutFailsClosed used a 500ms parent ctx — the request actually timed out via the parent context, not the per-peer cap. The default 3s per-peer cap was never independently exercised by tests. Refactored signature to PollSQSHTFIFOCapability(ctx, peers, cfg PollerConfig). PollerConfig{HTTPClient, PerPeerTimeout} is the single options surface — zero values pick safe defaults. Renamed the existing test to TestPollSQSHTFIFOCapability_ParentContextDeadlineFailsClosed and added TestPollSQSHTFIFOCapability_PerPeerTimeoutFailsClosed which uses context.Background() and PerPeerTimeout=100ms to exercise the cap independently. PerPeerTimeout is also a sensible operator knob — different cluster latencies want different bounds. Caller-side: only *_test.go files use the function today; PR 5's CreateQueue gate will pick the appropriate timeout when it wires this up. 4) Claude minor — buildSQSHealthURL double-path edge case A caller passing a URL that already includes the health path (e.g. "http://node:5050/sqs_health") would receive a doubled path. Added an explicit test case to TestBuildSQSHealthURL documenting the behavior and the contract ("pass a base URL or host:port, never a full request URL"). A future refactor can intentionally change the contract; the test will catch it. 5) Audit per the lessons-learned discipline PollSQSHTFIFOCapability is exported but has no production callers yet — only the test file references it. grep confirmed the API change is safe.
1 parent 9e5ecd1 commit 2d9e06d

2 files changed

Lines changed: 121 additions & 35 deletions

File tree

adapter/sqs_capability_poller.go

Lines changed: 57 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"io"
7+
"log/slog"
78
"net/http"
89
"strings"
910
"sync"
@@ -63,13 +64,35 @@ type HTFIFOCapabilityPeerStatus struct {
6364
}
6465

6566
// defaultSQSCapabilityPollTimeout caps how long the poller waits on
66-
// any single peer. The §8.5 design's "fail-closed default for
67-
// nodes that don't respond within a short timeout" turns into a
68-
// concrete bound here. Operators wanting a longer wait can pass
69-
// their own context with a deadline; the per-peer cap is enforced
70-
// in addition so a single slow peer cannot stall the whole poll.
67+
// any single peer when PollerConfig.PerPeerTimeout is zero. The
68+
// §8.5 design's "fail-closed default for nodes that don't respond
69+
// within a short timeout" turns into a concrete bound here.
70+
// Operators wanting a longer or shorter wait can override via
71+
// PollerConfig; the cap is enforced in addition to any
72+
// caller-supplied context deadline so a single slow peer cannot
73+
// stall the whole poll.
7174
const defaultSQSCapabilityPollTimeout = 3 * time.Second
7275

76+
// PollerConfig tunes PollSQSHTFIFOCapability for a specific call
77+
// site. All fields are optional — the zero value picks safe
78+
// defaults. Tests use the explicit PerPeerTimeout to exercise the
79+
// per-peer cap independently of any caller-supplied context
80+
// deadline.
81+
type PollerConfig struct {
82+
// HTTPClient is the client used for /sqs_health GETs. Nil
83+
// falls back to http.DefaultClient. Callers wanting connection
84+
// pooling, custom Transport, or shorter Client.Timeout pass
85+
// their own.
86+
HTTPClient *http.Client
87+
88+
// PerPeerTimeout caps how long any single peer's poll runs
89+
// before being abandoned. Zero defaults to
90+
// defaultSQSCapabilityPollTimeout (3s). Tests pass a small
91+
// value (e.g. 100ms) so the per-peer cap path can be
92+
// exercised quickly without a parent context deadline.
93+
PerPeerTimeout time.Duration
94+
}
95+
7396
// PollSQSHTFIFOCapability polls each peer's /sqs_health endpoint
7497
// concurrently and reports whether all advertise htfifo. The
7598
// helper is stateless — every call dials its peers fresh, so a
@@ -90,13 +113,19 @@ const defaultSQSCapabilityPollTimeout = 3 * time.Second
90113
// Concurrency: peers are polled in goroutines; results land via
91114
// an indexed channel so the slice writes are obviously race-free.
92115
//
93-
// Timeouts: each peer poll is bounded by min(ctx.Deadline(),
94-
// defaultSQSCapabilityPollTimeout). A long ctx deadline does not
95-
// extend the per-peer cap.
96-
func PollSQSHTFIFOCapability(ctx context.Context, client *http.Client, peers []string) *HTFIFOCapabilityReport {
116+
// Timeouts: each peer poll is bounded by
117+
// min(ctx.Deadline(), now+cfg.PerPeerTimeout). A long ctx deadline
118+
// does not extend the per-peer cap, and an absent ctx deadline
119+
// still triggers fail-closed at the per-peer cap.
120+
func PollSQSHTFIFOCapability(ctx context.Context, peers []string, cfg PollerConfig) *HTFIFOCapabilityReport {
121+
client := cfg.HTTPClient
97122
if client == nil {
98123
client = http.DefaultClient
99124
}
125+
perPeerTimeout := cfg.PerPeerTimeout
126+
if perPeerTimeout <= 0 {
127+
perPeerTimeout = defaultSQSCapabilityPollTimeout
128+
}
100129
report := &HTFIFOCapabilityReport{
101130
Peers: make([]HTFIFOCapabilityPeerStatus, len(peers)),
102131
}
@@ -119,7 +148,7 @@ func PollSQSHTFIFOCapability(ctx context.Context, client *http.Client, peers []s
119148
defer wg.Done()
120149
results <- indexedStatus{
121150
idx: idx,
122-
status: pollOneSQSPeerForHTFIFO(ctx, client, addr),
151+
status: pollOneSQSPeerForHTFIFO(ctx, client, addr, perPeerTimeout),
123152
}
124153
}(i, peer)
125154
}
@@ -142,15 +171,15 @@ func PollSQSHTFIFOCapability(ctx context.Context, client *http.Client, peers []s
142171
// returned struct's Error field — this function never returns a
143172
// Go error itself so the caller can map peers to results in one
144173
// pass without checking len(errors).
145-
func pollOneSQSPeerForHTFIFO(ctx context.Context, client *http.Client, peer string) HTFIFOCapabilityPeerStatus {
174+
func pollOneSQSPeerForHTFIFO(ctx context.Context, client *http.Client, peer string, perPeerTimeout time.Duration) HTFIFOCapabilityPeerStatus {
146175
status := HTFIFOCapabilityPeerStatus{Address: peer}
147176

148177
if peer == "" {
149178
status.Error = "empty peer address"
150179
return status
151180
}
152181

153-
pollCtx, cancel := context.WithTimeout(ctx, defaultSQSCapabilityPollTimeout)
182+
pollCtx, cancel := context.WithTimeout(ctx, perPeerTimeout)
154183
defer cancel()
155184

156185
url := buildSQSHealthURL(peer)
@@ -166,9 +195,24 @@ func pollOneSQSPeerForHTFIFO(ctx context.Context, client *http.Client, peer stri
166195
status.Error = errors.Wrapf(err, "GET %q", url).Error()
167196
return status
168197
}
169-
defer func() { _ = resp.Body.Close() }()
198+
// Close the body via a deferred closure so a non-nil close
199+
// error surfaces in the cluster logs rather than being
200+
// dropped — masking it could hide leaking connections under
201+
// load (gemini medium on PR #721). Body is also drained on
202+
// every early return below so the http.Transport can reuse
203+
// the underlying TCP connection (claude minor on PR #721).
204+
defer func() {
205+
if cerr := resp.Body.Close(); cerr != nil {
206+
slog.Warn("sqs capability poller: response body close failed",
207+
"peer", peer, "err", cerr)
208+
}
209+
}()
170210

171211
if resp.StatusCode != http.StatusOK {
212+
// Drain the body before returning so the transport can
213+
// reuse the connection. Non-200 bodies under our 1 KiB
214+
// LimitReader are tiny, so the discard cost is negligible.
215+
_, _ = io.Copy(io.Discard, io.LimitReader(resp.Body, sqsCapabilityMaxBodyBytes))
172216
status.Error = fmt.Sprintf("%s returned HTTP %d", url, resp.StatusCode)
173217
return status
174218
}

adapter/sqs_capability_poller_test.go

Lines changed: 64 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func TestPollSQSHTFIFOCapability_AllAdvertise(t *testing.T) {
4646
_, addr1 := newSQSHealthServer(t, sqsHealthBody{Status: "ok", Capabilities: []string{sqsCapabilityHTFIFO}})
4747
_, addr2 := newSQSHealthServer(t, sqsHealthBody{Status: "ok", Capabilities: []string{sqsCapabilityHTFIFO}})
4848

49-
report := PollSQSHTFIFOCapability(context.Background(), nil, []string{addr1, addr2})
49+
report := PollSQSHTFIFOCapability(context.Background(), []string{addr1, addr2}, PollerConfig{})
5050
require.True(t, report.AllAdvertise,
5151
"all peers advertise → AllAdvertise must be true")
5252
require.Len(t, report.Peers, 2)
@@ -66,7 +66,7 @@ func TestPollSQSHTFIFOCapability_OneMissingFailsClosed(t *testing.T) {
6666
_, addrGood := newSQSHealthServer(t, sqsHealthBody{Status: "ok", Capabilities: []string{sqsCapabilityHTFIFO}})
6767
_, addrOld := newSQSHealthServer(t, sqsHealthBody{Status: "ok", Capabilities: []string{}})
6868

69-
report := PollSQSHTFIFOCapability(context.Background(), nil, []string{addrGood, addrOld})
69+
report := PollSQSHTFIFOCapability(context.Background(), []string{addrGood, addrOld}, PollerConfig{})
7070
require.False(t, report.AllAdvertise,
7171
"one peer without the capability must drop AllAdvertise")
7272
require.Len(t, report.Peers, 2)
@@ -105,8 +105,8 @@ func TestPollSQSHTFIFOCapability_HTTPErrorFailsClosed(t *testing.T) {
105105
t.Cleanup(garbageSrv.Close)
106106
addrGarbage := strings.TrimPrefix(garbageSrv.URL, "http://")
107107

108-
report := PollSQSHTFIFOCapability(context.Background(), nil,
109-
[]string{addr500, addrUnreachable, addrGarbage})
108+
report := PollSQSHTFIFOCapability(context.Background(),
109+
[]string{addr500, addrUnreachable, addrGarbage}, PollerConfig{})
110110

111111
require.False(t, report.AllAdvertise,
112112
"any transport / parse failure must fail closed")
@@ -121,15 +121,13 @@ func TestPollSQSHTFIFOCapability_HTTPErrorFailsClosed(t *testing.T) {
121121
require.Contains(t, report.Peers[2].Error, "malformed JSON")
122122
}
123123

124-
// TestPollSQSHTFIFOCapability_TimeoutFailsClosed pins the
125-
// "fail-closed default for nodes that don't respond within a short
126-
// timeout" rule from §8.5: a peer that hangs past the per-peer
127-
// timeout must drop AllAdvertise without holding up the entire
128-
// poll for longer than that bound.
129-
func TestPollSQSHTFIFOCapability_TimeoutFailsClosed(t *testing.T) {
124+
// TestPollSQSHTFIFOCapability_ParentContextDeadlineFailsClosed
125+
// pins that an expiring parent ctx cancels the request — the
126+
// poll respects whichever bound (parent ctx or per-peer cap)
127+
// fires first.
128+
func TestPollSQSHTFIFOCapability_ParentContextDeadlineFailsClosed(t *testing.T) {
130129
t.Parallel()
131130

132-
// Peer that delays past the test's timeout.
133131
hangSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
134132
select {
135133
case <-r.Context().Done():
@@ -141,20 +139,52 @@ func TestPollSQSHTFIFOCapability_TimeoutFailsClosed(t *testing.T) {
141139
t.Cleanup(hangSrv.Close)
142140
addrHang := strings.TrimPrefix(hangSrv.URL, "http://")
143141

144-
// Use a context with a short bound to force the per-peer
145-
// timeout path quickly. The test should finish in well under
146-
// the 5s the server would have waited.
142+
// Parent ctx expires before the per-peer cap (default 3s).
147143
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
148144
defer cancel()
149145

150146
start := time.Now()
151-
report := PollSQSHTFIFOCapability(ctx, nil, []string{addrHang})
147+
report := PollSQSHTFIFOCapability(ctx, []string{addrHang}, PollerConfig{})
152148
elapsed := time.Since(start)
153149

154150
require.False(t, report.AllAdvertise)
155151
require.Less(t, elapsed, 4*time.Second,
156-
"poll must respect the per-peer timeout — a hung peer must "+
157-
"not stall for the full server-side delay")
152+
"poll must respect the parent ctx deadline")
153+
require.NotEmpty(t, report.Peers[0].Error)
154+
}
155+
156+
// TestPollSQSHTFIFOCapability_PerPeerTimeoutFailsClosed pins the
157+
// per-peer cap independently of any parent ctx deadline. With
158+
// context.Background() (no deadline) and PollerConfig.PerPeerTimeout
159+
// set short, the poll MUST still abandon a hung peer at the cap —
160+
// otherwise a missing parent deadline would let a single slow peer
161+
// stall a CreateQueue gate indefinitely (claude nit on PR #721
162+
// round 1).
163+
func TestPollSQSHTFIFOCapability_PerPeerTimeoutFailsClosed(t *testing.T) {
164+
t.Parallel()
165+
166+
hangSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
167+
select {
168+
case <-r.Context().Done():
169+
return
170+
case <-time.After(5 * time.Second):
171+
w.WriteHeader(http.StatusOK)
172+
}
173+
}))
174+
t.Cleanup(hangSrv.Close)
175+
addrHang := strings.TrimPrefix(hangSrv.URL, "http://")
176+
177+
// No parent deadline; per-peer cap is the only bound.
178+
start := time.Now()
179+
report := PollSQSHTFIFOCapability(context.Background(), []string{addrHang},
180+
PollerConfig{PerPeerTimeout: 100 * time.Millisecond})
181+
elapsed := time.Since(start)
182+
183+
require.False(t, report.AllAdvertise)
184+
require.Less(t, elapsed, 2*time.Second,
185+
"poll must respect the per-peer cap when there is no "+
186+
"parent ctx deadline — a missing deadline must NOT let "+
187+
"a hung peer stall the CreateQueue gate")
158188
require.NotEmpty(t, report.Peers[0].Error)
159189
}
160190

@@ -165,7 +195,7 @@ func TestPollSQSHTFIFOCapability_TimeoutFailsClosed(t *testing.T) {
165195
// report.
166196
func TestPollSQSHTFIFOCapability_EmptyPeersIsVacuouslyTrue(t *testing.T) {
167197
t.Parallel()
168-
report := PollSQSHTFIFOCapability(context.Background(), nil, nil)
198+
report := PollSQSHTFIFOCapability(context.Background(), nil, PollerConfig{})
169199
require.True(t, report.AllAdvertise)
170200
require.Empty(t, report.Peers)
171201
}
@@ -176,7 +206,7 @@ func TestPollSQSHTFIFOCapability_EmptyPeersIsVacuouslyTrue(t *testing.T) {
176206
// a malformed URL and a confusing transport error.
177207
func TestPollSQSHTFIFOCapability_EmptyPeerAddressFailsClosed(t *testing.T) {
178208
t.Parallel()
179-
report := PollSQSHTFIFOCapability(context.Background(), nil, []string{""})
209+
report := PollSQSHTFIFOCapability(context.Background(), []string{""}, PollerConfig{})
180210
require.False(t, report.AllAdvertise)
181211
require.Len(t, report.Peers, 1)
182212
require.Equal(t, "empty peer address", report.Peers[0].Error)
@@ -191,7 +221,7 @@ func TestPollSQSHTFIFOCapability_FullURLPeer(t *testing.T) {
191221
srv, _ := newSQSHealthServer(t, sqsHealthBody{
192222
Status: "ok", Capabilities: []string{sqsCapabilityHTFIFO},
193223
})
194-
report := PollSQSHTFIFOCapability(context.Background(), nil, []string{srv.URL})
224+
report := PollSQSHTFIFOCapability(context.Background(), []string{srv.URL}, PollerConfig{})
195225
require.True(t, report.AllAdvertise)
196226
require.True(t, report.Peers[0].HasHTFIFO)
197227
}
@@ -225,7 +255,7 @@ func TestPollSQSHTFIFOCapability_ConcurrentPolling(t *testing.T) {
225255
}
226256

227257
start := time.Now()
228-
report := PollSQSHTFIFOCapability(context.Background(), nil, peers)
258+
report := PollSQSHTFIFOCapability(context.Background(), peers, PollerConfig{})
229259
elapsed := time.Since(start)
230260

231261
require.True(t, report.AllAdvertise)
@@ -252,6 +282,18 @@ func TestBuildSQSHealthURL(t *testing.T) {
252282
{"http://node.example:5050", "http://node.example:5050" + sqsHealthPath},
253283
{"http://node.example:5050/", "http://node.example:5050" + sqsHealthPath},
254284
{"https://node.example", "https://node.example" + sqsHealthPath},
285+
// Caller passing a URL that ALREADY includes the health
286+
// path: documented behaviour is that the helper appends
287+
// the path again (claude minor on PR #721 round 1). The
288+
// contract is "pass a base URL or a host:port, never a
289+
// full request URL". This case pins the behaviour so a
290+
// future refactor can either keep it (and reject misuse
291+
// via CreateQueue input validation) or change the
292+
// contract intentionally.
293+
{
294+
"http://node.example:5050" + sqsHealthPath,
295+
"http://node.example:5050" + sqsHealthPath + sqsHealthPath,
296+
},
255297
}
256298
for _, tc := range cases {
257299
t.Run(tc.peer, func(t *testing.T) {
@@ -282,7 +324,7 @@ func TestPollSQSHTFIFOCapability_RespectsBodyLimit(t *testing.T) {
282324
t.Cleanup(bigSrv.Close)
283325
addr := strings.TrimPrefix(bigSrv.URL, "http://")
284326

285-
report := PollSQSHTFIFOCapability(context.Background(), nil, []string{addr})
327+
report := PollSQSHTFIFOCapability(context.Background(), []string{addr}, PollerConfig{})
286328
require.False(t, report.AllAdvertise)
287329
require.Contains(t, report.Peers[0].Error, "malformed JSON",
288330
"truncated body must surface as JSON parse error, not as "+

0 commit comments

Comments
 (0)