Skip to content

Commit 820b9a3

Browse files
committed
Prefent duplicate peers
1 parent d4c7e7d commit 820b9a3

3 files changed

Lines changed: 138 additions & 41 deletions

File tree

pkg/raft/node.go

Lines changed: 25 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"net"
1010
"os"
1111
"path/filepath"
12-
"slices"
1312
"strings"
1413
"sync/atomic"
1514
"time"
@@ -114,22 +113,25 @@ func (n *Node) Start(_ context.Context) error {
114113
}
115114

116115
n.logger.Info().Msg("Boostrap raft cluster")
116+
thisNode := raft.Server{ID: raft.ServerID(n.config.NodeID), Address: raft.ServerAddress(n.config.RaftAddr)}
117117
cfg := raft.Configuration{
118118
Servers: []raft.Server{
119-
{
120-
ID: raft.ServerID(n.config.NodeID),
121-
Address: raft.ServerAddress(n.config.RaftAddr),
122-
},
119+
thisNode,
123120
},
124121
}
125122
for _, peer := range n.config.Peers {
126123
addr, err := splitPeerAddr(peer)
127124
if err != nil {
128-
return err
125+
return fmt.Errorf("peer %q : %w", peer, err)
126+
}
127+
if addr != thisNode {
128+
cfg.Servers = append(cfg.Servers, addr)
129129
}
130-
cfg.Servers = append(cfg.Servers, addr)
131130
}
132-
cfg.Servers = deduplicateServers(cfg.Servers)
131+
132+
if svrs := deduplicateServers(cfg.Servers); len(svrs) != len(cfg.Servers) {
133+
return fmt.Errorf("duplicate peers found in config: %v", cfg.Servers)
134+
}
133135

134136
if err := n.raft.BootstrapCluster(cfg).Error(); err != nil {
135137
return fmt.Errorf("bootstrap cluster: %w", err)
@@ -138,35 +140,6 @@ func (n *Node) Start(_ context.Context) error {
138140
return nil
139141
}
140142

141-
func (n *Node) awaitToBeClusterMember(ctx context.Context, nodeID raft.ServerID) error {
142-
start := time.Now()
143-
for {
144-
exists, err := n.isClusterMember(nodeID)
145-
if err != nil {
146-
return err
147-
}
148-
if exists {
149-
n.logger.Info().Msgf("node joined cluster after %s", time.Since(start))
150-
return nil
151-
}
152-
select {
153-
case <-ctx.Done():
154-
return ctx.Err()
155-
case <-time.After(time.Second / 10):
156-
}
157-
}
158-
}
159-
160-
func (n *Node) isClusterMember(nodeID raft.ServerID) (bool, error) {
161-
future := n.raft.GetConfiguration()
162-
if err := future.Error(); err != nil {
163-
return false, err
164-
}
165-
return slices.ContainsFunc(future.Configuration().Servers, func(server raft.Server) bool {
166-
return server.ID == nodeID
167-
}), nil
168-
}
169-
170143
func (n *Node) Stop() error {
171144
if n == nil {
172145
return nil
@@ -344,13 +317,26 @@ func splitPeerAddr(peer string) (raft.Server, error) {
344317
if len(parts) != 2 {
345318
return raft.Server{}, errors.New("expecting nodeID@address for peer")
346319
}
320+
321+
nodeID, address := strings.TrimSpace(parts[0]), strings.TrimSpace(parts[1])
322+
323+
if nodeID == "" {
324+
return raft.Server{}, errors.New("nodeID cannot be empty")
325+
}
326+
if address == "" {
327+
return raft.Server{}, errors.New("address cannot be empty")
328+
}
329+
347330
return raft.Server{
348-
ID: raft.ServerID(parts[0]),
349-
Address: raft.ServerAddress(parts[1]),
331+
ID: raft.ServerID(nodeID),
332+
Address: raft.ServerAddress(address),
350333
}, nil
351334
}
352335

353336
func deduplicateServers(servers []raft.Server) []raft.Server {
337+
if len(servers) == 0 {
338+
return []raft.Server{}
339+
}
354340
seen := make(map[raft.ServerID]struct{})
355341
unique := make([]raft.Server, 0, len(servers))
356342
for _, server := range servers {

pkg/raft/node_test.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package raft
2+
3+
import (
4+
"errors"
5+
"testing"
6+
7+
"github.com/hashicorp/raft"
8+
"github.com/stretchr/testify/assert"
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
func TestSplitPeerAddr(t *testing.T) {
13+
specs := map[string]struct {
14+
in string
15+
exp raft.Server
16+
expErr error
17+
}{
18+
"valid": {
19+
in: "node1@127.0.0.1:1234",
20+
exp: raft.Server{ID: raft.ServerID("node1"), Address: raft.ServerAddress("127.0.0.1:1234")},
21+
},
22+
"trims whitespace": {
23+
in: " node2 @ 10.0.0.2:9000 ",
24+
exp: raft.Server{ID: raft.ServerID("node2"), Address: raft.ServerAddress("10.0.0.2:9000")},
25+
},
26+
"missing at": {
27+
in: "node1",
28+
expErr: errors.New("expecting nodeID@address for peer"),
29+
},
30+
"empty node id": {
31+
in: "@127.0.0.1:1234",
32+
expErr: errors.New("nodeID cannot be empty"),
33+
},
34+
"empty address": {
35+
in: "node1@",
36+
expErr: errors.New("address cannot be empty"),
37+
},
38+
"multiple ats": {
39+
in: "a@b@c",
40+
expErr: errors.New("expecting nodeID@address for peer"),
41+
},
42+
"only spaces": {
43+
in: " @ ",
44+
expErr: errors.New("nodeID cannot be empty"),
45+
},
46+
}
47+
48+
for name, spec := range specs {
49+
t.Run(name, func(t *testing.T) {
50+
ctx := t.Context()
51+
_ = ctx // keep to follow guideline to prefer t.Context; function under test doesn't use context
52+
53+
got, err := splitPeerAddr(spec.in)
54+
if spec.expErr != nil {
55+
require.Error(t, err)
56+
assert.Equal(t, spec.expErr.Error(), err.Error())
57+
return
58+
}
59+
require.NoError(t, err)
60+
assert.Equal(t, spec.exp, got)
61+
})
62+
}
63+
}
64+
65+
func TestDeduplicateServers(t *testing.T) {
66+
67+
specs := map[string]struct {
68+
in []raft.Server
69+
exp []raft.Server
70+
}{
71+
"empty": {
72+
in: nil,
73+
exp: []raft.Server{},
74+
},
75+
"no duplicates": {
76+
in: []raft.Server{
77+
{ID: raft.ServerID("n1"), Address: raft.ServerAddress("a1")},
78+
{ID: raft.ServerID("n2"), Address: raft.ServerAddress("a2")},
79+
},
80+
exp: []raft.Server{
81+
{ID: raft.ServerID("n1"), Address: raft.ServerAddress("a1")},
82+
{ID: raft.ServerID("n2"), Address: raft.ServerAddress("a2")},
83+
},
84+
},
85+
"duplicates keep first": {
86+
in: []raft.Server{
87+
{ID: raft.ServerID("n1"), Address: raft.ServerAddress("a1")},
88+
{ID: raft.ServerID("n2"), Address: raft.ServerAddress("a2")},
89+
{ID: raft.ServerID("n1"), Address: raft.ServerAddress("a3")},
90+
{ID: raft.ServerID("n3"), Address: raft.ServerAddress("a4")},
91+
{ID: raft.ServerID("n2"), Address: raft.ServerAddress("a5")},
92+
},
93+
exp: []raft.Server{
94+
{ID: raft.ServerID("n1"), Address: raft.ServerAddress("a1")},
95+
{ID: raft.ServerID("n2"), Address: raft.ServerAddress("a2")},
96+
{ID: raft.ServerID("n3"), Address: raft.ServerAddress("a4")},
97+
},
98+
},
99+
}
100+
101+
for name, spec := range specs {
102+
t.Run(name, func(t *testing.T) {
103+
ctx := t.Context()
104+
_ = ctx
105+
106+
got := deduplicateServers(spec.in)
107+
assert.Equal(t, spec.exp, got)
108+
})
109+
}
110+
}

test/e2e/failover_e2e_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ func TestLeaseFailoverE2E(t *testing.T) {
170170
}
171171
}
172172
oldDetails := clusterNodes.Details(oldLeader)
173-
restartedNodeProcess := setupRaftSequencerNode(t, sut, workDir, oldLeader, oldDetails.raftAddr, jwtSecret, genesisHash, testEndpoints.GetDAAddress(), "", raftClusterRPCs, clusterNodes.Details(newLeader).p2pAddr, oldDetails.rpcAddr, oldDetails.p2pAddr, oldDetails.engineURL, oldDetails.ethAddr, false, passphraseFile)
173+
restartedNodeProcess := setupRaftSequencerNode(t, sut, workDir, oldLeader, oldDetails.raftAddr, jwtSecret, genesisHash, testEndpoints.GetDAAddress(), "", raftCluster, clusterNodes.Details(newLeader).p2pAddr, oldDetails.rpcAddr, oldDetails.p2pAddr, oldDetails.engineURL, oldDetails.ethAddr, false, passphraseFile)
174174
t.Log("Restarted old leader to sync with cluster: " + oldLeader)
175175

176176
if IsNodeUp(t, oldDetails.rpcAddr, NodeStartupTimeout) {
@@ -355,7 +355,7 @@ func setupRaftSequencerNode(
355355
t *testing.T,
356356
sut *SystemUnderTest,
357357
workDir, nodeID, raftAddr, jwtSecret, genesisHash, daAddress, bootstrapDir string,
358-
raftPeers []string,
358+
allRaftClusterMembers []string,
359359
p2pPeers, rpcAddr, p2pAddr, engineURL, ethURL string,
360360
bootstrap bool,
361361
passphraseFile string,
@@ -378,6 +378,7 @@ func setupRaftSequencerNode(
378378
if strings.HasPrefix(rpcAddr, "http://") {
379379
rpcAddr = rpcAddr[7:]
380380
}
381+
raftPeers := slices.DeleteFunc(slices.Clone(allRaftClusterMembers), func(v string) bool { return strings.Contains(v, nodeID+"@") || strings.TrimSpace(v) == "" })
381382

382383
// Start node with raft configuration
383384
process := sut.ExecCmdWithLogPrefix(nodeID, evmSingleBinaryPath,

0 commit comments

Comments
 (0)