Skip to content

Commit f526157

Browse files
authored
Merge pull request #111 from hyp3rd/feat/dist-mem-cache
Feat/dist mem cache
2 parents 5de2daf + ee12a3a commit f526157

13 files changed

Lines changed: 780 additions & 14 deletions

File tree

.github/workflows/cluster.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,16 @@ jobs:
4444
- name: Run cross-node smoke
4545
run: bash scripts/tests/10-test-cluster-api.sh
4646

47+
- name: Run resilience test (kill + restart a node)
48+
# The smoke phase above already validated propagation;
49+
# this phase validates that the cluster keeps serving
50+
# writes when a node is down and that the resurrected
51+
# node converges back. Catches regressions in the
52+
# hint-replay / anti-entropy paths under the actual
53+
# docker network — a class of bugs in-process tests
54+
# cannot reach.
55+
run: bash scripts/tests/20-test-cluster-resilience.sh
56+
4757
- name: Dump container logs (on failure)
4858
if: failure()
4959
run: |

CHANGELOG.md

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,48 @@ adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
88

99
### Added
1010

11+
- **SWIM self-refutation + cross-process gossip dissemination.**
12+
Closes the last `experimental` marker on the heartbeat path.
13+
Three pieces:
14+
- **`acceptGossip` self-refute** — incoming entries that
15+
reference the local node as Suspect or Dead at incarnation
16+
≥ ours now bump the local incarnation and re-mark Alive.
17+
Higher-incarnation-wins propagation in the same function
18+
disseminates the refutation cluster-wide, so a falsely-
19+
suspected node can clear suspicion through gossip alone
20+
(pre-fix the only path was a fresh probe).
21+
- **HTTP gossip wire** — new `Gossip(ctx, targetID, members)`
22+
method on `DistTransport`, new
23+
`POST /internal/gossip` server endpoint (auth-wrapped),
24+
new `GossipMember` wire DTO. `runGossipTick` now falls
25+
through to the HTTP path when the transport isn't an
26+
`InProcessTransport`, so cross-process clusters disseminate
27+
membership state — pre-Phase-E this was an in-process-only
28+
no-op.
29+
- The `experimental` qualifier is removed from
30+
`heartbeatLoop`'s comment + the heartbeat-section field
31+
doc; SWIM-style indirect probes (Phase B.1) and
32+
self-refutation (this round) together provide the SWIM
33+
properties the marker was tracking.
34+
Regression coverage at
35+
[tests/integration/dist_swim_refute_test.go](tests/integration/dist_swim_refute_test.go):
36+
`TestDistSWIM_HTTPGossipExchange` exercises the wire (A pushes
37+
membership to B over HTTP; B's view converges),
38+
`TestDistSWIM_SelfRefute` drives a forged "you are suspect"
39+
gossip into a node's `/internal/gossip` and asserts the local
40+
incarnation bumps + state returns to Alive.
41+
- **End-to-end resilience test** at
42+
[scripts/tests/20-test-cluster-resilience.sh](scripts/tests/20-test-cluster-resilience.sh)
43+
— kills a docker container mid-run, asserts the surviving 4
44+
nodes still serve every previously-written key AND every key
45+
written during the outage, then restarts the killed node and
46+
asserts it converges on the full state within 60 s. Validates
47+
Phase B.2 (hint-replay) and the post-restart anti-entropy
48+
paths against the *actual* docker network — a class of bugs
49+
in-process tests can't reach. 24 assertions across 6 phases.
50+
Wired into both `make test-cluster` (runs after the smoke,
51+
exit-code-propagated through the same teardown trap) and the
52+
`cluster` CI workflow as a follow-up step.
1153
- **Cross-process cluster smoke in CI**
1254
[.github/workflows/cluster.yml](.github/workflows/cluster.yml) boots
1355
the 5-node `docker-compose.cluster.yml` stack on every PR/push,

Makefile

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,15 @@ stop-dev-cluster:
4646
# initial review (factory dropped options, seeds without IDs,
4747
# json.RawMessage on non-owner GET).
4848
test-cluster: stop-dev-cluster
49-
@echo "spinning up cluster + running cross-node smoke"
49+
@echo "spinning up cluster + running cross-node smoke + resilience"
5050
@echo
5151
docker compose -f docker-compose.cluster.yml up --build -d
5252
@bash scripts/tests/wait-for-cluster.sh
5353
@rc=0; bash scripts/tests/10-test-cluster-api.sh || rc=$$?; \
54+
if [ $$rc -eq 0 ]; then \
55+
echo ""; echo "smoke ok — running resilience phase"; echo ""; \
56+
bash scripts/tests/20-test-cluster-resilience.sh || rc=$$?; \
57+
fi; \
5458
echo ""; echo "tearing down cluster (rc=$$rc)"; \
5559
docker compose -f docker-compose.cluster.yml down -v --rmi local --remove-orphans >/dev/null 2>&1 || true; \
5660
exit $$rc

cmd/hypercache-server/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package main
1919
import (
2020
"context"
2121
"encoding/base64"
22-
"encoding/json"
2322
"errors"
2423
"fmt"
2524
"log/slog"
@@ -31,6 +30,7 @@ import (
3130
"syscall"
3231
"time"
3332

33+
"github.com/goccy/go-json"
3434
fiber "github.com/gofiber/fiber/v3"
3535

3636
"github.com/hyp3rd/hypercache"

cmd/hypercache-server/main_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
package main
22

33
import (
4-
"encoding/json"
54
"io"
65
"net/http"
76
"net/http/httptest"
87
"strings"
98
"testing"
109

10+
"github.com/goccy/go-json"
1111
fiber "github.com/gofiber/fiber/v3"
1212
)
1313

hypercache-server

688 Bytes
Binary file not shown.

pkg/backend/dist_http_server.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,7 @@ func (s *distHTTPServer) start(bindCtx context.Context, dm *DistMemory) error {
354354
s.registerHealth(dm)
355355
s.registerDrain(dm)
356356
s.registerProbe(dm)
357+
s.registerGossip(dm)
357358
s.registerMerkle(dm)
358359

359360
return s.listen(bindCtx)
@@ -516,6 +517,30 @@ func (s *distHTTPServer) registerProbe(dm *DistMemory) {
516517
}))
517518
}
518519

520+
// registerGossip wires `POST /internal/gossip` — the SWIM
521+
// membership-dissemination endpoint. The body is a JSON array of
522+
// GossipMember snapshots; the receiver's acceptGossip merges them
523+
// via higher-incarnation-wins and self-refutes if any entry
524+
// claims this node is suspect or dead.
525+
//
526+
// Auth-wrapped like the rest of `/internal/*` because gossip can
527+
// inject membership state — an unauthenticated peer could mark
528+
// real nodes as dead by spoofing a high-incarnation snapshot.
529+
func (s *distHTTPServer) registerGossip(dm *DistMemory) {
530+
s.app.Post("/internal/gossip", s.wrapAuth(func(fctx fiber.Ctx) error {
531+
var members []GossipMember
532+
533+
err := json.Unmarshal(fctx.Body(), &members)
534+
if err != nil {
535+
return fctx.Status(fiber.StatusBadRequest).JSON(fiber.Map{constants.ErrorLabel: err.Error()})
536+
}
537+
538+
dm.acceptGossip(gossipMembersToNodes(members))
539+
540+
return fctx.SendStatus(fiber.StatusOK)
541+
}))
542+
}
543+
519544
func (s *distHTTPServer) registerMerkle(dm *DistMemory) {
520545
s.app.Get("/internal/merkle", s.wrapAuth(func(fctx fiber.Ctx) error {
521546
tree := dm.BuildMerkleTree()

pkg/backend/dist_http_transport.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -380,6 +380,47 @@ func (t *DistHTTPTransport) IndirectHealth(ctx context.Context, relayNodeID, tar
380380
return nil
381381
}
382382

383+
// Gossip pushes a member-list snapshot to the target's
384+
// `/internal/gossip` endpoint. The receiver merges via
385+
// higher-incarnation-wins and may self-refute if the snapshot
386+
// claims it's suspect — see acceptGossip + refuteIfSuspected.
387+
//
388+
// The body is a JSON array of GossipMember; the wire shape is
389+
// stable (separate type from cluster.Node) so the cluster
390+
// package can add internal fields without breaking peers running
391+
// older binaries.
392+
func (t *DistHTTPTransport) Gossip(ctx context.Context, targetNodeID string, members []GossipMember) error {
393+
payload, err := json.Marshal(members)
394+
if err != nil {
395+
return ewrap.Wrap(err, "marshal gossip payload")
396+
}
397+
398+
hreq, err := t.newNodeRequest(ctx, http.MethodPost, targetNodeID, "/internal/gossip",
399+
nil, bytes.NewReader(payload))
400+
if err != nil {
401+
return ewrap.Wrap(err, errMsgNewRequest)
402+
}
403+
404+
hreq.Header.Set("Content-Type", "application/json")
405+
406+
resp, err := t.doTrusted(hreq)
407+
if err != nil {
408+
return err
409+
}
410+
411+
defer drainBody(t.limitedBody(resp))
412+
413+
if resp.StatusCode == http.StatusNotFound {
414+
return sentinel.ErrBackendNotFound
415+
}
416+
417+
if resp.StatusCode >= statusThreshold {
418+
return ewrap.Newf("gossip status %d", resp.StatusCode)
419+
}
420+
421+
return nil
422+
}
423+
383424
// FetchMerkle retrieves a Merkle tree snapshot from a remote node.
384425
func (t *DistHTTPTransport) FetchMerkle(ctx context.Context, nodeID string) (*MerkleTree, error) {
385426
if t == nil {

pkg/backend/dist_memory.go

Lines changed: 68 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,12 @@ type DistMemory struct {
7575
nodeID string
7676
seeds []string // static seed node addresses
7777

78-
// heartbeat / failure detection (experimental)
78+
// heartbeat / failure detection. Phase E added SWIM
79+
// self-refutation (refuteIfSuspected) and HTTP gossip
80+
// dissemination, retiring the prior "experimental" marker —
81+
// the path now disseminates suspect/dead transitions across
82+
// the cluster and lets a falsely-accused node bump its
83+
// incarnation to clear suspicion.
7984
hbInterval time.Duration
8085
hbSuspectAfter time.Duration
8186
hbDeadAfter time.Duration
@@ -3023,19 +3028,32 @@ func (dm *DistMemory) runGossipTick() {
30233028
}
30243029

30253030
target := candidates[idxBig.Int64()]
3031+
transport := dm.loadTransport()
3032+
snapshot := dm.membership.List()
30263033

3027-
ip, ok := dm.loadTransport().(*InProcessTransport)
3028-
if !ok {
3029-
return
3030-
}
3034+
// In-process fast path: skip the wire and call acceptGossip
3035+
// directly. Pre-Phase-E this was the ONLY path; the function
3036+
// bailed for any other transport type, so cross-process
3037+
// clusters never disseminated membership / never refuted
3038+
// suspect claims. The fall-through below now uses the
3039+
// transport's Gossip method, which routes via HTTP for the
3040+
// auto-created DistHTTPTransport.
3041+
if ip, ok := transport.(*InProcessTransport); ok {
3042+
if remote, ok2 := ip.backends[string(target.ID)]; ok2 {
3043+
remote.acceptGossip(snapshot)
3044+
}
30313045

3032-
remote, ok2 := ip.backends[string(target.ID)]
3033-
if !ok2 {
30343046
return
30353047
}
30363048

3037-
snapshot := dm.membership.List()
3038-
remote.acceptGossip(snapshot)
3049+
gossipErr := transport.Gossip(dm.lifeCtx, string(target.ID), nodesToGossipMembers(snapshot))
3050+
if gossipErr != nil {
3051+
dm.logger.Debug(
3052+
"gossip push failed",
3053+
slog.String("peer_id", string(target.ID)),
3054+
slog.Any("err", gossipErr),
3055+
)
3056+
}
30393057
}
30403058

30413059
func (dm *DistMemory) acceptGossip(nodes []*cluster.Node) {
@@ -3045,6 +3063,8 @@ func (dm *DistMemory) acceptGossip(nodes []*cluster.Node) {
30453063

30463064
for _, node := range nodes {
30473065
if node.ID == dm.localNode.ID {
3066+
dm.refuteIfSuspected(node)
3067+
30483068
continue
30493069
}
30503070

@@ -3079,6 +3099,41 @@ func (dm *DistMemory) acceptGossip(nodes []*cluster.Node) {
30793099
}
30803100
}
30813101

3102+
// refuteIfSuspected handles the SWIM self-refute path: when a peer
3103+
// gossips that THIS node is Suspect or Dead at incarnation N, bump
3104+
// our local incarnation to N+1 and re-upsert ourselves as Alive.
3105+
// Higher-incarnation-wins propagation in `acceptGossip` ensures the
3106+
// next gossip tick disseminates the refutation cluster-wide.
3107+
//
3108+
// Pre-fix this path was a no-op (`continue` on local-ID match) — a
3109+
// node that fell briefly behind heartbeat would be marked Suspect by
3110+
// peers and could not undo it through gossip; only a fresh probe
3111+
// would clear suspicion. Self-refute closes the loop required for
3112+
// the heartbeat marker to drop its `experimental` qualifier.
3113+
func (dm *DistMemory) refuteIfSuspected(claim *cluster.Node) {
3114+
if claim == nil || dm.localNode == nil {
3115+
return
3116+
}
3117+
3118+
if claim.State == cluster.NodeAlive {
3119+
return // peer agrees we're alive — nothing to refute
3120+
}
3121+
3122+
// Only refute when the peer's claim is at >= our incarnation;
3123+
// older claims are stale and ignored.
3124+
if claim.Incarnation < dm.localNode.Incarnation {
3125+
return
3126+
}
3127+
3128+
dm.membership.Mark(dm.localNode.ID, cluster.NodeAlive)
3129+
3130+
dm.logger.Info(
3131+
"self-refuted suspect/dead claim from peer",
3132+
slog.Uint64("claim_incarnation", claim.Incarnation),
3133+
slog.String("claim_state", claim.State.String()),
3134+
)
3135+
}
3136+
30823137
// chooseNewer picks the item with higher version; on version tie uses lexicographically smaller Origin as winner.
30833138
func (dm *DistMemory) chooseNewer(itemA, itemB *cache.Item) *cache.Item {
30843139
if itemA == nil {
@@ -3265,7 +3320,10 @@ func parseSeedSpec(raw string) seedSpec {
32653320
return seedSpec{id: id, addr: addr}
32663321
}
32673322

3268-
// heartbeatLoop probes peers and updates membership (best-effort experimental).
3323+
// heartbeatLoop probes peers and updates membership. SWIM-style
3324+
// indirect probes (Phase B.1) and self-refutation via gossip
3325+
// (Phase E) are wired into the surrounding helpers — this loop
3326+
// only schedules the per-tick work.
32693327
func (dm *DistMemory) heartbeatLoop(ctx context.Context, stopCh <-chan struct{}) { // reduced cognitive complexity via helpers
32703328
ticker := time.NewTicker(dm.hbInterval)
32713329
defer ticker.Stop()

0 commit comments

Comments
 (0)