Skip to content

Commit ee12a3a

Browse files
committed
feat(swim): add self-refutation and cross-process HTTP gossip dissemination
Implement SWIM self-refute so a node receiving a suspect/dead claim about itself at incarnation >= local bumps its incarnation and re-marks Alive, propagating the refutation cluster-wide via higher-incarnation-wins. Wire cross-process gossip over HTTP: - Add Gossip(ctx, targetID, members) to DistTransport interface - Add POST /internal/gossip server endpoint (auth-wrapped) - Introduce GossipMember wire DTO with projection helpers - runGossipTick now falls through to the HTTP transport for non-InProcessTransport clusters (previously a no-op) Swap encoding/json for github.com/goccy/go-json in the server binary. Remove the experimental qualifier from heartbeat/failure-detection docs — indirect probes (Phase B.1) and self-refutation (Phase E) together provide the SWIM guarantees the marker was tracking. Tests: TestDistSWIM_HTTPGossipExchange, TestDistSWIM_SelfRefute
1 parent a0dee60 commit ee12a3a

9 files changed

Lines changed: 506 additions & 12 deletions

File tree

CHANGELOG.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,36 @@ adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
88

99
### Added
1010

11+
- **SWIM self-refutation + cross-process gossip dissemination.**
12+
Closes the last `experimental` marker on the heartbeat path.
13+
Three pieces:
14+
- **`acceptGossip` self-refute** — incoming entries that
15+
reference the local node as Suspect or Dead at incarnation
16+
≥ ours now bump the local incarnation and re-mark Alive.
17+
Higher-incarnation-wins propagation in the same function
18+
disseminates the refutation cluster-wide, so a falsely-
19+
suspected node can clear suspicion through gossip alone
20+
(pre-fix the only path was a fresh probe).
21+
- **HTTP gossip wire** — new `Gossip(ctx, targetID, members)`
22+
method on `DistTransport`, new
23+
`POST /internal/gossip` server endpoint (auth-wrapped),
24+
new `GossipMember` wire DTO. `runGossipTick` now falls
25+
through to the HTTP path when the transport isn't an
26+
`InProcessTransport`, so cross-process clusters disseminate
27+
membership state — pre-Phase-E this was an in-process-only
28+
no-op.
29+
- The `experimental` qualifier is removed from
30+
`heartbeatLoop`'s comment + the heartbeat-section field
31+
doc; SWIM-style indirect probes (Phase B.1) and
32+
self-refutation (this round) together provide the SWIM
33+
properties the marker was tracking.
34+
Regression coverage at
35+
[tests/integration/dist_swim_refute_test.go](tests/integration/dist_swim_refute_test.go):
36+
`TestDistSWIM_HTTPGossipExchange` exercises the wire (A pushes
37+
membership to B over HTTP; B's view converges),
38+
`TestDistSWIM_SelfRefute` drives a forged "you are suspect"
39+
gossip into a node's `/internal/gossip` and asserts the local
40+
incarnation bumps + state returns to Alive.
1141
- **End-to-end resilience test** at
1242
[scripts/tests/20-test-cluster-resilience.sh](scripts/tests/20-test-cluster-resilience.sh)
1343
— kills a docker container mid-run, asserts the surviving 4

cmd/hypercache-server/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package main
1919
import (
2020
"context"
2121
"encoding/base64"
22-
"encoding/json"
2322
"errors"
2423
"fmt"
2524
"log/slog"
@@ -31,6 +30,7 @@ import (
3130
"syscall"
3231
"time"
3332

33+
"github.com/goccy/go-json"
3434
fiber "github.com/gofiber/fiber/v3"
3535

3636
"github.com/hyp3rd/hypercache"

cmd/hypercache-server/main_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
package main
22

33
import (
4-
"encoding/json"
54
"io"
65
"net/http"
76
"net/http/httptest"
87
"strings"
98
"testing"
109

10+
"github.com/goccy/go-json"
1111
fiber "github.com/gofiber/fiber/v3"
1212
)
1313

hypercache-server

688 Bytes
Binary file not shown.

pkg/backend/dist_http_server.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,7 @@ func (s *distHTTPServer) start(bindCtx context.Context, dm *DistMemory) error {
354354
s.registerHealth(dm)
355355
s.registerDrain(dm)
356356
s.registerProbe(dm)
357+
s.registerGossip(dm)
357358
s.registerMerkle(dm)
358359

359360
return s.listen(bindCtx)
@@ -516,6 +517,30 @@ func (s *distHTTPServer) registerProbe(dm *DistMemory) {
516517
}))
517518
}
518519

520+
// registerGossip wires `POST /internal/gossip` — the SWIM
521+
// membership-dissemination endpoint. The body is a JSON array of
522+
// GossipMember snapshots; the receiver's acceptGossip merges them
523+
// via higher-incarnation-wins and self-refutes if any entry
524+
// claims this node is suspect or dead.
525+
//
526+
// Auth-wrapped like the rest of `/internal/*` because gossip can
527+
// inject membership state — an unauthenticated peer could mark
528+
// real nodes as dead by spoofing a high-incarnation snapshot.
529+
func (s *distHTTPServer) registerGossip(dm *DistMemory) {
530+
s.app.Post("/internal/gossip", s.wrapAuth(func(fctx fiber.Ctx) error {
531+
var members []GossipMember
532+
533+
err := json.Unmarshal(fctx.Body(), &members)
534+
if err != nil {
535+
return fctx.Status(fiber.StatusBadRequest).JSON(fiber.Map{constants.ErrorLabel: err.Error()})
536+
}
537+
538+
dm.acceptGossip(gossipMembersToNodes(members))
539+
540+
return fctx.SendStatus(fiber.StatusOK)
541+
}))
542+
}
543+
519544
func (s *distHTTPServer) registerMerkle(dm *DistMemory) {
520545
s.app.Get("/internal/merkle", s.wrapAuth(func(fctx fiber.Ctx) error {
521546
tree := dm.BuildMerkleTree()

pkg/backend/dist_http_transport.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -380,6 +380,47 @@ func (t *DistHTTPTransport) IndirectHealth(ctx context.Context, relayNodeID, tar
380380
return nil
381381
}
382382

383+
// Gossip pushes a member-list snapshot to the target's
384+
// `/internal/gossip` endpoint. The receiver merges via
385+
// higher-incarnation-wins and may self-refute if the snapshot
386+
// claims it's suspect — see acceptGossip + refuteIfSuspected.
387+
//
388+
// The body is a JSON array of GossipMember; the wire shape is
389+
// stable (separate type from cluster.Node) so the cluster
390+
// package can add internal fields without breaking peers running
391+
// older binaries.
392+
func (t *DistHTTPTransport) Gossip(ctx context.Context, targetNodeID string, members []GossipMember) error {
393+
payload, err := json.Marshal(members)
394+
if err != nil {
395+
return ewrap.Wrap(err, "marshal gossip payload")
396+
}
397+
398+
hreq, err := t.newNodeRequest(ctx, http.MethodPost, targetNodeID, "/internal/gossip",
399+
nil, bytes.NewReader(payload))
400+
if err != nil {
401+
return ewrap.Wrap(err, errMsgNewRequest)
402+
}
403+
404+
hreq.Header.Set("Content-Type", "application/json")
405+
406+
resp, err := t.doTrusted(hreq)
407+
if err != nil {
408+
return err
409+
}
410+
411+
defer drainBody(t.limitedBody(resp))
412+
413+
if resp.StatusCode == http.StatusNotFound {
414+
return sentinel.ErrBackendNotFound
415+
}
416+
417+
if resp.StatusCode >= statusThreshold {
418+
return ewrap.Newf("gossip status %d", resp.StatusCode)
419+
}
420+
421+
return nil
422+
}
423+
383424
// FetchMerkle retrieves a Merkle tree snapshot from a remote node.
384425
func (t *DistHTTPTransport) FetchMerkle(ctx context.Context, nodeID string) (*MerkleTree, error) {
385426
if t == nil {

pkg/backend/dist_memory.go

Lines changed: 68 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,12 @@ type DistMemory struct {
7575
nodeID string
7676
seeds []string // static seed node addresses
7777

78-
// heartbeat / failure detection (experimental)
78+
// heartbeat / failure detection. Phase E added SWIM
79+
// self-refutation (refuteIfSuspected) and HTTP gossip
80+
// dissemination, retiring the prior "experimental" marker —
81+
// the path now disseminates suspect/dead transitions across
82+
// the cluster and lets a falsely-accused node bump its
83+
// incarnation to clear suspicion.
7984
hbInterval time.Duration
8085
hbSuspectAfter time.Duration
8186
hbDeadAfter time.Duration
@@ -3023,19 +3028,32 @@ func (dm *DistMemory) runGossipTick() {
30233028
}
30243029

30253030
target := candidates[idxBig.Int64()]
3031+
transport := dm.loadTransport()
3032+
snapshot := dm.membership.List()
30263033

3027-
ip, ok := dm.loadTransport().(*InProcessTransport)
3028-
if !ok {
3029-
return
3030-
}
3034+
// In-process fast path: skip the wire and call acceptGossip
3035+
// directly. Pre-Phase-E this was the ONLY path; the function
3036+
// bailed for any other transport type, so cross-process
3037+
// clusters never disseminated membership / never refuted
3038+
// suspect claims. The fall-through below now uses the
3039+
// transport's Gossip method, which routes via HTTP for the
3040+
// auto-created DistHTTPTransport.
3041+
if ip, ok := transport.(*InProcessTransport); ok {
3042+
if remote, ok2 := ip.backends[string(target.ID)]; ok2 {
3043+
remote.acceptGossip(snapshot)
3044+
}
30313045

3032-
remote, ok2 := ip.backends[string(target.ID)]
3033-
if !ok2 {
30343046
return
30353047
}
30363048

3037-
snapshot := dm.membership.List()
3038-
remote.acceptGossip(snapshot)
3049+
gossipErr := transport.Gossip(dm.lifeCtx, string(target.ID), nodesToGossipMembers(snapshot))
3050+
if gossipErr != nil {
3051+
dm.logger.Debug(
3052+
"gossip push failed",
3053+
slog.String("peer_id", string(target.ID)),
3054+
slog.Any("err", gossipErr),
3055+
)
3056+
}
30393057
}
30403058

30413059
func (dm *DistMemory) acceptGossip(nodes []*cluster.Node) {
@@ -3045,6 +3063,8 @@ func (dm *DistMemory) acceptGossip(nodes []*cluster.Node) {
30453063

30463064
for _, node := range nodes {
30473065
if node.ID == dm.localNode.ID {
3066+
dm.refuteIfSuspected(node)
3067+
30483068
continue
30493069
}
30503070

@@ -3079,6 +3099,41 @@ func (dm *DistMemory) acceptGossip(nodes []*cluster.Node) {
30793099
}
30803100
}
30813101

3102+
// refuteIfSuspected handles the SWIM self-refute path: when a peer
3103+
// gossips that THIS node is Suspect or Dead at incarnation N, bump
3104+
// our local incarnation to N+1 and re-upsert ourselves as Alive.
3105+
// Higher-incarnation-wins propagation in `acceptGossip` ensures the
3106+
// next gossip tick disseminates the refutation cluster-wide.
3107+
//
3108+
// Pre-fix this path was a no-op (`continue` on local-ID match) — a
3109+
// node that fell briefly behind heartbeat would be marked Suspect by
3110+
// peers and could not undo it through gossip; only a fresh probe
3111+
// would clear suspicion. Self-refute closes the loop required for
3112+
// the heartbeat marker to drop its `experimental` qualifier.
3113+
func (dm *DistMemory) refuteIfSuspected(claim *cluster.Node) {
3114+
if claim == nil || dm.localNode == nil {
3115+
return
3116+
}
3117+
3118+
if claim.State == cluster.NodeAlive {
3119+
return // peer agrees we're alive — nothing to refute
3120+
}
3121+
3122+
// Only refute when the peer's claim is at >= our incarnation;
3123+
// older claims are stale and ignored.
3124+
if claim.Incarnation < dm.localNode.Incarnation {
3125+
return
3126+
}
3127+
3128+
dm.membership.Mark(dm.localNode.ID, cluster.NodeAlive)
3129+
3130+
dm.logger.Info(
3131+
"self-refuted suspect/dead claim from peer",
3132+
slog.Uint64("claim_incarnation", claim.Incarnation),
3133+
slog.String("claim_state", claim.State.String()),
3134+
)
3135+
}
3136+
30823137
// chooseNewer picks the item with higher version; on version tie uses lexicographically smaller Origin as winner.
30833138
func (dm *DistMemory) chooseNewer(itemA, itemB *cache.Item) *cache.Item {
30843139
if itemA == nil {
@@ -3265,7 +3320,10 @@ func parseSeedSpec(raw string) seedSpec {
32653320
return seedSpec{id: id, addr: addr}
32663321
}
32673322

3268-
// heartbeatLoop probes peers and updates membership (best-effort experimental).
3323+
// heartbeatLoop probes peers and updates membership. SWIM-style
3324+
// indirect probes (Phase B.1) and self-refutation via gossip
3325+
// (Phase E) are wired into the surrounding helpers — this loop
3326+
// only schedules the per-tick work.
32693327
func (dm *DistMemory) heartbeatLoop(ctx context.Context, stopCh <-chan struct{}) { // reduced cognitive complexity via helpers
32703328
ticker := time.NewTicker(dm.hbInterval)
32713329
defer ticker.Stop()

pkg/backend/dist_transport.go

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ package backend
33
import (
44
"context"
55
"sync"
6+
"time"
67

8+
"github.com/hyp3rd/hypercache/internal/cluster"
79
"github.com/hyp3rd/hypercache/internal/sentinel"
810
cache "github.com/hyp3rd/hypercache/pkg/cache/v2"
911
)
@@ -21,9 +23,87 @@ type DistTransport interface {
2123
// the caller's local network was the issue, not the target.
2224
// Returns nil when the relay reports the target reachable.
2325
IndirectHealth(ctx context.Context, relayNodeID, targetNodeID string) error
26+
// Gossip pushes the caller's full member-list snapshot to
27+
// `targetNodeID`. The receiver merges it via higher-incarnation-
28+
// wins and self-refutes if the snapshot claims it is suspect.
29+
// Used by the cross-process gossip path; in-process clusters
30+
// short-circuit to a direct method call instead.
31+
Gossip(ctx context.Context, targetNodeID string, members []GossipMember) error
2432
FetchMerkle(ctx context.Context, nodeID string) (*MerkleTree, error)
2533
}
2634

35+
// GossipMember is the wire-friendly snapshot of a cluster.Node used
36+
// by the Gossip transport method. Stays a separate struct from
37+
// cluster.Node so the wire schema doesn't drift when the cluster
38+
// package adds internal fields.
39+
type GossipMember struct {
40+
ID string `json:"id"`
41+
Address string `json:"address"`
42+
State string `json:"state"`
43+
Incarnation uint64 `json:"incarnation"`
44+
}
45+
46+
// nodesToGossipMembers projects a cluster.Node snapshot down to the
47+
// wire shape. Nil entries are dropped — they shouldn't appear in
48+
// practice but the projection is defensive.
49+
func nodesToGossipMembers(nodes []*cluster.Node) []GossipMember {
50+
out := make([]GossipMember, 0, len(nodes))
51+
52+
for _, n := range nodes {
53+
if n == nil {
54+
continue
55+
}
56+
57+
out = append(out, GossipMember{
58+
ID: string(n.ID),
59+
Address: n.Address,
60+
State: n.State.String(),
61+
Incarnation: n.Incarnation,
62+
})
63+
}
64+
65+
return out
66+
}
67+
68+
// gossipMembersToNodes inflates a wire-shape snapshot back into
69+
// cluster.Node values for handoff to acceptGossip. Unknown state
70+
// strings fall back to NodeAlive — the receiver's
71+
// higher-incarnation-wins logic still applies, and a stuck-suspect
72+
// claim from a peer running an older state vocabulary degrades
73+
// gracefully to alive-at-this-incarnation.
74+
func gossipMembersToNodes(members []GossipMember) []*cluster.Node {
75+
out := make([]*cluster.Node, 0, len(members))
76+
77+
for _, m := range members {
78+
out = append(out, &cluster.Node{
79+
ID: cluster.NodeID(m.ID),
80+
Address: m.Address,
81+
State: parseGossipState(m.State),
82+
Incarnation: m.Incarnation,
83+
LastSeen: time.Now(),
84+
})
85+
}
86+
87+
return out
88+
}
89+
90+
// parseGossipState maps the wire state string back to the
91+
// internal NodeState enum. "alive" and unknown values both
92+
// resolve to NodeAlive (defensive — see gossipMembersToNodes);
93+
// the explicit "alive" branch is omitted to satisfy the
94+
// identical-switch-branches lint while keeping the same
95+
// semantic.
96+
func parseGossipState(s string) cluster.NodeState {
97+
switch s {
98+
case "suspect":
99+
return cluster.NodeSuspect
100+
case "dead":
101+
return cluster.NodeDead
102+
default:
103+
return cluster.NodeAlive
104+
}
105+
}
106+
27107
// InProcessTransport implements DistTransport for multiple DistMemory instances in the same process.
28108
type InProcessTransport struct {
29109
mu sync.RWMutex
@@ -119,6 +199,22 @@ func (t *InProcessTransport) IndirectHealth(ctx context.Context, relayNodeID, ta
119199
return rt.Health(ctx, targetNodeID)
120200
}
121201

202+
// Gossip delivers the snapshot directly to the target backend's
203+
// acceptGossip — this is the in-process equivalent of the HTTP
204+
// `/internal/gossip` endpoint, with the type translation done
205+
// inline so the rest of the SWIM machinery can stay agnostic to
206+
// transport choice.
207+
func (t *InProcessTransport) Gossip(_ context.Context, targetNodeID string, members []GossipMember) error {
208+
target, ok := t.lookup(targetNodeID)
209+
if !ok {
210+
return sentinel.ErrBackendNotFound
211+
}
212+
213+
target.acceptGossip(gossipMembersToNodes(members))
214+
215+
return nil
216+
}
217+
122218
// FetchMerkle fetches a remote merkle tree.
123219
func (t *InProcessTransport) FetchMerkle(_ context.Context, nodeID string) (*MerkleTree, error) {
124220
b, ok := t.lookup(nodeID)

0 commit comments

Comments
 (0)