-
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdist_swim_refute_test.go
More file actions
244 lines (196 loc) · 6.97 KB
/
dist_swim_refute_test.go
File metadata and controls
244 lines (196 loc) · 6.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
package integration
import (
"context"
"net/http"
"strconv"
"strings"
"testing"
"time"
"github.com/goccy/go-json"
"github.com/hyp3rd/hypercache/internal/cluster"
"github.com/hyp3rd/hypercache/pkg/backend"
)
// TestDistSWIM_HTTPGossipExchange validates the HTTP-gossip wire:
// node-A pushes its membership snapshot to node-B over the dist
// HTTP transport, and node-B's `acceptGossip` merges the entries.
//
// Pre-Phase-E this path was a no-op — runGossipTick only worked
// when the transport was an InProcessTransport, so cross-process
// clusters got no membership dissemination at all.
func TestDistSWIM_HTTPGossipExchange(t *testing.T) {
t.Parallel()
addrA := allocatePort(t)
addrB := allocatePort(t)
a := mustGossipNode(t, "swim-A", addrA, []string{"swim-B@" + addrB})
b := mustGossipNode(t, "swim-B", addrB, []string{"swim-A@" + addrA})
// Inject a synthetic third node into A's membership only.
// Gossip from A to B must propagate it.
ghost := cluster.NewNode("swim-ghost", "127.0.0.1:1")
a.Membership().Upsert(ghost)
if memberExists(b, "swim-ghost") {
t.Fatalf("test setup invariant broken: B already sees ghost before gossip")
}
// Push A's snapshot to B over HTTP.
transport, ok := getTransport(a)
if !ok {
t.Fatalf("node A's transport is not a *DistHTTPTransport")
}
members := snapshotMembers(a)
err := transport.Gossip(context.Background(), "swim-B", members)
if err != nil {
t.Fatalf("gossip A→B: %v", err)
}
// B should now know the ghost via merged gossip.
if !memberExists(b, "swim-ghost") {
t.Fatalf("expected B to see ghost after gossip; current view: %v", listMemberIDs(b))
}
}
// TestDistSWIM_SelfRefute pins the self-refutation contract:
// when a peer's gossip claims this node is Suspect at incarnation
// N (>= local incarnation), the local node bumps its incarnation
// and re-marks itself Alive — so subsequent gossip ticks
// disseminate the refutation cluster-wide.
//
// Pre-Phase-E `acceptGossip` skipped entries about the local node
// (`continue` on ID match), so a falsely-suspected node could
// never clear suspicion via gossip; only a fresh probe could.
func TestDistSWIM_SelfRefute(t *testing.T) {
t.Parallel()
addr := allocatePort(t)
dm := mustGossipNode(t, "swim-self", addr, nil)
initialIncarnation := dm.Membership().List()[0].Incarnation
// Forge a peer's gossip view: "swim-self is Suspect at the
// current incarnation". This is what a peer would say after
// a heartbeat probe failure (Phase B.1 path).
suspectClaim := []backend.GossipMember{
{
ID: "swim-self",
Address: addr,
State: "suspect",
Incarnation: initialIncarnation,
},
}
// Drive the wire: post the gossip directly via the receiver's
// /internal/gossip endpoint so the assertion exercises the
// production code path (acceptGossip via decodeGetBody-style
// JSON decode), not just the in-memory function.
postGossip(t, addr, suspectClaim)
// After accepting the suspect claim, the local node must have
// bumped its incarnation AND be back in NodeAlive state.
for _, n := range dm.Membership().List() {
if string(n.ID) != "swim-self" {
continue
}
if n.Incarnation <= initialIncarnation {
t.Fatalf("expected incarnation > %d after self-refute, got %d", initialIncarnation, n.Incarnation)
}
if n.State != cluster.NodeAlive {
t.Fatalf("expected NodeAlive after self-refute, got %s", n.State.String())
}
return
}
t.Fatalf("local node missing from membership after gossip")
}
// mustGossipNode is the shared constructor — same shape as
// makePhase1Node but tuned for the SWIM tests (replication=1,
// no rebalance, fast heartbeat). Returns the unwrapped *DistMemory
// so tests can poke at membership directly.
func mustGossipNode(t *testing.T, id, addr string, seeds []string) *backend.DistMemory {
t.Helper()
bm, err := backend.NewDistMemory(
context.Background(),
backend.WithDistNode(id, addr),
backend.WithDistSeeds(seeds),
backend.WithDistReplication(1),
backend.WithDistVirtualNodes(8),
backend.WithDistHeartbeat(5*time.Second, 15*time.Second, 30*time.Second),
)
if err != nil {
t.Fatalf("new node %s: %v", id, err)
}
dm, ok := bm.(*backend.DistMemory)
if !ok {
t.Fatalf("cast %s: %T", id, bm)
}
t.Cleanup(func() { _ = dm.Stop(context.Background()) })
waitForDistNodeHealth(context.Background(), t, addr)
return dm
}
// memberExists reports whether the given node ID appears in the
// dist memory's membership view.
func memberExists(dm *backend.DistMemory, id string) bool {
for _, n := range dm.Membership().List() {
if string(n.ID) == id {
return true
}
}
return false
}
// listMemberIDs is a debug helper for failure messages.
func listMemberIDs(dm *backend.DistMemory) []string {
members := dm.Membership().List()
out := make([]string, 0, len(members))
for _, n := range members {
out = append(out, string(n.ID))
}
return out
}
// snapshotMembers projects a node's membership through the
// transport's wire shape — same conversion the production
// runGossipTick uses for the HTTP path.
func snapshotMembers(dm *backend.DistMemory) []backend.GossipMember {
members := dm.Membership().List()
out := make([]backend.GossipMember, 0, len(members))
for _, n := range members {
out = append(out, backend.GossipMember{
ID: string(n.ID),
Address: n.Address,
State: n.State.String(),
Incarnation: n.Incarnation,
})
}
return out
}
// getTransport unwraps the dist memory's auto-created HTTP
// transport. Test-only — the production code keeps the transport
// behind an atomic.Pointer slot.
func getTransport(dm *backend.DistMemory) (*backend.DistHTTPTransport, bool) {
// We don't have a public accessor; route through the
// receiver-port wire test helper. The HTTP transport was
// auto-created by tryStartHTTP, so we can build a fresh
// instance with the same resolver to call Gossip on.
resolver := func(nodeID string) (string, bool) {
for _, n := range dm.Membership().List() {
if string(n.ID) == nodeID {
return "http://" + n.Address, true
}
}
return "", false
}
return backend.NewDistHTTPTransport(0, resolver), true
}
// postGossip drives the dist HTTP server's `/internal/gossip`
// endpoint directly with a JSON-encoded snapshot — same wire
// shape an HTTP gossip producer would send.
func postGossip(t *testing.T, addr string, members []backend.GossipMember) {
t.Helper()
body, err := json.Marshal(members)
if err != nil {
t.Fatalf("marshal gossip: %v", err)
}
url := "http://" + addr + "/internal/gossip"
req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, url, strings.NewReader(string(body)))
if err != nil {
t.Fatalf("build req: %v", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Content-Length", strconv.Itoa(len(body)))
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("post gossip: %v", err)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
t.Fatalf("gossip post status=%d", resp.StatusCode)
}
}