Skip to content

Commit f81592f

Browse files
committed
feat(dist): Phase B production-readiness — SWIM indirect probes, hint-queue retry, wire compression, and queueHint race fix
Phase B.1 — SWIM-style indirect heartbeat probes: - Add WithDistIndirectProbes(k, timeout) option; when a direct probe fails, up to k random alive relays probe the target on the caller's behalf — target is only marked suspect if every relay also fails - Add /internal/probe HTTP endpoint and IndirectHealth() transport method - Refuted direct failures now refresh LastSeen rather than escalating - Expose dist.heartbeat.indirect_probe.{success,failure,refuted} metrics Phase B.2 — migration failure retry via hint queue: - migrateIfNeeded queues a hint on ForwardSet failure instead of logging-and-dropping silently - replicateTo hint enqueue broadened from ErrBackendNotFound-only to any transport error (timeouts, 5xx, connection resets) - Fix race in queueHint: snapshot hintBytes under hintsMu before unlock to prevent concurrent adjustHintAccounting in the replay loop from racing the metric write Phase B.3 — on-wire gzip compression for the dist HTTP transport: - Add DistHTTPLimits.CompressionThreshold; ForwardSet gzip-compresses Set request bodies exceeding the threshold; server decompresses transparently via fiber v3 Content-Encoding auto-decoding Refactor: extract membershipSnapshot() helper from Metrics() to keep function length under the lint cap Add contract tests for all three phases and the queueHint race fix
1 parent 4d5b5f9 commit f81592f

9 files changed

Lines changed: 877 additions & 48 deletions

CHANGELOG.md

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,17 @@ adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
66

77
## [Unreleased]
88

9+
### Fixed
10+
11+
- **Race in `queueHint` between hint enqueue and hint replay.** Pre-fix,
12+
the metric write `dm.metrics.hintedBytes.Store(dm.hintBytes)` happened
13+
*after* releasing `hintsMu`, so a concurrent `adjustHintAccounting`
14+
call from the replay loop could race the read. Capturing the value
15+
under the lock closes the race. Surfaced when migration failures
16+
began funneling through `queueHint` (Phase B.2 below) — previously
17+
the migration path swallowed errors silently, so the hint enqueue
18+
rate from rebalance ticks was much lower.
19+
920
### Added
1021

1122
- **Structured logging on the dist backend.** New `WithDistLogger(*slog.Logger)`
@@ -46,6 +57,46 @@ adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
4657
against a stopped backend. Library default is a no-op meter, so
4758
metrics cost nothing unless the caller opts in. Phase A.3 of the
4859
production-readiness work.
60+
- **SWIM-style indirect heartbeat probes.** New
61+
`WithDistIndirectProbes(k, timeout)` option enables the indirect-
62+
probe refutation path: when a direct heartbeat to a peer fails,
63+
this node asks `k` random alive peers to probe the target on its
64+
behalf, and only marks the target suspect if every relay also
65+
fails. Filters caller-side network blips (transient NIC reset,
66+
single stuck connection in this node's pool) that would otherwise
67+
cause spurious suspect/dead transitions. New transport method
68+
`IndirectHealth(ctx, relayNodeID, targetNodeID)` and HTTP endpoint
69+
`GET /internal/probe?target=<id>` carry the probe; auth-wrapped
70+
identically to the rest of `/internal/*`. New metrics
71+
`dist.heartbeat.indirect_probe.success`, `.failure`, `.refuted`
72+
expose probe outcomes. `k = 0` (default) preserves the pre-Phase-B
73+
behavior. Phase B.1 of the production-readiness work — note that
74+
the heartbeat path still carries the `experimental` marker until
75+
self-refutation via incarnation-disseminating gossip lands in a
76+
later phase.
77+
- **Migration failures now retry through the hint queue.** When a
78+
rebalance forwards a key to its new primary and the transport
79+
returns *any* error (not just `ErrBackendNotFound`), the item is
80+
enqueued onto the existing hint-replay queue keyed by the new
81+
primary, instead of being logged and dropped. The hint-replay
82+
loop drains it on its configured schedule until the hint TTL
83+
expires. Same broadening applies to the `replicateTo` fan-out on
84+
the primary `Set` path — transient HTTP failures (timeout, 5xx,
85+
connection reset) no longer silently drop replicas. Phase B.2 of
86+
the production-readiness work.
87+
- **On-wire compression for the dist HTTP transport.** New
88+
`DistHTTPLimits.CompressionThreshold` field opts the auto-created
89+
HTTP client into gzip-compressing Set request bodies whose
90+
serialized payload exceeds the configured byte threshold. The
91+
client sets `Content-Encoding: gzip` and the server transparently
92+
decompresses (via fiber v3's auto-decoding `Body()`). Threshold
93+
`0` (default) preserves the pre-Phase-B wire format byte-for-byte.
94+
Operators on bandwidth-constrained links with values above ~1 KiB
95+
typically see meaningful reductions; below-threshold values pay
96+
no compression cost. Roll out the threshold to all peers before
97+
raising it on any peer — a server with compression disabled will
98+
reject a gzip body with HTTP 400. Phase B.3 of the
99+
production-readiness work.
49100

50101
## [0.5.0] — 2026-05-05
51102

pkg/backend/dist_http_server.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"crypto/subtle"
66
"crypto/tls"
7+
"errors"
78
"log/slog"
89
"net"
910
"net/http"
@@ -230,6 +231,20 @@ type DistHTTPLimits struct {
230231
// ClientAuth=tls.RequireAndVerifyClientCert. The auto-client uses
231232
// the same cert as its client cert via Certificates[0].
232233
TLSConfig *tls.Config
234+
235+
// CompressionThreshold opts the dist HTTP transport into gzip
236+
// compression of Set request bodies whose serialized payload size
237+
// exceeds this many bytes. The client sets `Content-Encoding:
238+
// gzip` and the server transparently decompresses before
239+
// unmarshaling. 0 disables compression — matches the pre-Phase-B
240+
// wire format byte-for-byte. Operators on bandwidth-constrained
241+
// links with large values (>1 KiB) typically see meaningful
242+
// reductions; values smaller than the threshold pay no cost.
243+
//
244+
// Server compatibility: a server with compression disabled will
245+
// reject a gzip-encoded body with HTTP 400. Roll out the threshold
246+
// to all peers before raising it on any peer.
247+
CompressionThreshold int
233248
}
234249

235250
// withDefaults fills any zero-valued field on l with the package default.
@@ -337,6 +352,7 @@ func (s *distHTTPServer) start(bindCtx context.Context, dm *DistMemory) error {
337352
s.registerGet(dm)
338353
s.registerRemove(dm)
339354
s.registerHealth()
355+
s.registerProbe(dm)
340356
s.registerMerkle(dm)
341357

342358
return s.listen(bindCtx)
@@ -346,6 +362,11 @@ func (s *distHTTPServer) start(bindCtx context.Context, dm *DistMemory) error {
346362
// fan-outs to replicas. Uses s.ctx (server-lifecycle) as the backend
347363
// operation context — see the comment on distHTTPServer.ctx for why we
348364
// can't use the per-request fiber.Ctx here.
365+
//
366+
// Compression note: fiber v3's Body() auto-decompresses based on the
367+
// inbound `Content-Encoding` header, so this handler does not need
368+
// explicit gzip handling — it sees the plaintext JSON regardless of
369+
// whether the client compressed (CompressionThreshold > 0) or not.
349370
func (s *distHTTPServer) handleSet(fctx fiber.Ctx, dm *DistMemory) error {
350371
var req httpSetRequest
351372

@@ -434,6 +455,38 @@ func (s *distHTTPServer) registerHealth() {
434455
s.app.Get("/health", s.wrapAuth(func(fctx fiber.Ctx) error { return fctx.SendString("ok") }))
435456
}
436457

458+
// registerProbe wires `/internal/probe?target=<id>` — the indirect-probe
459+
// relay endpoint used by the SWIM heartbeat path. The relay node calls
460+
// its own transport's Health(target) and reports the result. 200 = relay
461+
// reached the target; 502 = relay's probe failed; 404 = target unknown
462+
// to the relay; 400 = missing/empty target query parameter. Auth-wrapped
463+
// like the rest of `/internal/*` because indirectly probing arbitrary
464+
// node IDs through a member is a directory-enumeration vector.
465+
func (s *distHTTPServer) registerProbe(dm *DistMemory) {
466+
s.app.Get("/internal/probe", s.wrapAuth(func(fctx fiber.Ctx) error {
467+
target := fctx.Query("target")
468+
if target == "" {
469+
return fctx.Status(fiber.StatusBadRequest).JSON(fiber.Map{constants.ErrorLabel: "missing target"})
470+
}
471+
472+
transport := dm.loadTransport()
473+
if transport == nil {
474+
return fctx.SendStatus(fiber.StatusServiceUnavailable)
475+
}
476+
477+
err := transport.Health(s.ctx, target)
478+
if err != nil {
479+
if errors.Is(err, sentinel.ErrBackendNotFound) {
480+
return fctx.SendStatus(fiber.StatusNotFound)
481+
}
482+
483+
return fctx.SendStatus(fiber.StatusBadGateway)
484+
}
485+
486+
return fctx.SendString("ok")
487+
}))
488+
}
489+
437490
func (s *distHTTPServer) registerMerkle(dm *DistMemory) {
438491
s.app.Get("/internal/merkle", s.wrapAuth(func(fctx fiber.Ctx) error {
439492
tree := dm.BuildMerkleTree()

pkg/backend/dist_http_transport.go

Lines changed: 81 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package backend
22

33
import (
44
"bytes"
5+
"compress/gzip"
56
"context"
67
"crypto/tls"
78
"io"
@@ -30,6 +31,10 @@ type DistHTTPTransport struct {
3031
// lives on distHTTPServer; the two share the same DistHTTPAuth
3132
// struct when constructed via NewDistHTTPTransportWithAuth.
3233
auth DistHTTPAuth
34+
// compressionThreshold is the body-size byte threshold above
35+
// which Set request bodies are gzip-compressed. <=0 disables —
36+
// matches the pre-Phase-B wire format byte-for-byte.
37+
compressionThreshold int
3338
}
3439

3540
const statusThreshold = 300
@@ -105,10 +110,11 @@ func NewDistHTTPTransportWithAuth(limits DistHTTPLimits, auth DistHTTPAuth, reso
105110
}
106111

107112
return &DistHTTPTransport{
108-
client: client,
109-
baseURLFn: resolver,
110-
respBodyLimit: limits.ResponseLimit,
111-
auth: auth,
113+
client: client,
114+
baseURLFn: resolver,
115+
respBodyLimit: limits.ResponseLimit,
116+
auth: auth,
117+
compressionThreshold: limits.CompressionThreshold,
112118
}
113119
}
114120

@@ -162,14 +168,23 @@ func (t *DistHTTPTransport) ForwardSet(ctx context.Context, nodeID string, item
162168
return ewrap.Wrap(err, "marshal set request")
163169
}
164170

171+
reqBodyReader, gzipped, err := t.maybeGzip(payloadBytes)
172+
if err != nil {
173+
return ewrap.Wrap(err, "gzip set body")
174+
}
175+
165176
// prefer canonical endpoint; legacy /internal/cache/set still served
166-
hreq, err := t.newNodeRequest(ctx, http.MethodPost, nodeID, "/internal/set", nil, bytes.NewReader(payloadBytes))
177+
hreq, err := t.newNodeRequest(ctx, http.MethodPost, nodeID, "/internal/set", nil, reqBodyReader)
167178
if err != nil {
168179
return ewrap.Wrap(err, errMsgNewRequest)
169180
}
170181

171182
hreq.Header.Set("Content-Type", "application/json")
172183

184+
if gzipped {
185+
hreq.Header.Set("Content-Encoding", "gzip")
186+
}
187+
173188
resp, err := t.doTrusted(hreq)
174189
if err != nil {
175190
return err
@@ -334,6 +349,37 @@ func (t *DistHTTPTransport) Health(ctx context.Context, nodeID string) error {
334349
return nil
335350
}
336351

352+
// IndirectHealth asks the relay node to probe the target on this
353+
// caller's behalf. The dist HTTP server's `/internal/probe?target=<id>`
354+
// endpoint runs a Health() call on its own transport and returns 200 if
355+
// the target is reachable from the relay's vantage point. Used by the
356+
// SWIM indirect-probe path to filter caller-side network blips before
357+
// marking a peer suspect.
358+
func (t *DistHTTPTransport) IndirectHealth(ctx context.Context, relayNodeID, targetNodeID string) error {
359+
hreq, err := t.newNodeRequest(ctx, http.MethodGet, relayNodeID, "/internal/probe",
360+
url.Values{"target": []string{targetNodeID}}, nil)
361+
if err != nil {
362+
return ewrap.Wrap(err, errMsgNewRequest)
363+
}
364+
365+
resp, err := t.doTrusted(hreq)
366+
if err != nil {
367+
return err
368+
}
369+
370+
defer drainBody(t.limitedBody(resp))
371+
372+
if resp.StatusCode == http.StatusNotFound {
373+
return sentinel.ErrBackendNotFound
374+
}
375+
376+
if resp.StatusCode >= statusThreshold {
377+
return ewrap.Newf("indirect probe status %d", resp.StatusCode)
378+
}
379+
380+
return nil
381+
}
382+
337383
// FetchMerkle retrieves a Merkle tree snapshot from a remote node.
338384
func (t *DistHTTPTransport) FetchMerkle(ctx context.Context, nodeID string) (*MerkleTree, error) {
339385
if t == nil {
@@ -416,6 +462,36 @@ func (t *DistHTTPTransport) limitedBody(resp *http.Response) io.ReadCloser {
416462
return http.MaxBytesReader(nil, resp.Body, t.respBodyLimit)
417463
}
418464

465+
// maybeGzip returns a reader for the request body and a boolean
466+
// indicating whether the body was gzip-compressed. Compression
467+
// applies when compressionThreshold > 0 and the payload exceeds it.
468+
// Below the threshold the original bytes round-trip unchanged so
469+
// peers without compression support remain compatible. Errors come
470+
// only from the gzip writer (which closes around an in-memory
471+
// buffer, so they are practically impossible) — propagated for
472+
// completeness.
473+
func (t *DistHTTPTransport) maybeGzip(payload []byte) (io.Reader, bool, error) {
474+
if t.compressionThreshold <= 0 || len(payload) <= t.compressionThreshold {
475+
return bytes.NewReader(payload), false, nil
476+
}
477+
478+
var buf bytes.Buffer
479+
480+
gz := gzip.NewWriter(&buf)
481+
482+
_, writeErr := gz.Write(payload)
483+
if writeErr != nil {
484+
return nil, false, ewrap.Wrap(writeErr, "gzip write")
485+
}
486+
487+
closeErr := gz.Close()
488+
if closeErr != nil {
489+
return nil, false, ewrap.Wrap(closeErr, "gzip close")
490+
}
491+
492+
return &buf, true, nil
493+
}
494+
419495
func (t *DistHTTPTransport) resolveBaseURL(nodeID string) (*url.URL, error) {
420496
if t == nil || t.baseURLFn == nil {
421497
return nil, errNoTransport

0 commit comments

Comments
 (0)