Skip to content

Commit e23b74b

Browse files
committed
Revert "revert test/e2e/failover_e2e_test.go"
This reverts commit ba677cf.
1 parent ba677cf commit e23b74b

1 file changed

Lines changed: 47 additions & 29 deletions

File tree

test/e2e/failover_e2e_test.go

Lines changed: 47 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
ethtypes "github.com/ethereum/go-ethereum/core/types"
2727
"github.com/ethereum/go-ethereum/crypto"
2828
"github.com/ethereum/go-ethereum/ethclient"
29+
"github.com/libp2p/go-libp2p/core/peer"
2930
"github.com/stretchr/testify/assert"
3031
"github.com/stretchr/testify/require"
3132
"google.golang.org/protobuf/proto"
@@ -34,6 +35,7 @@ import (
3435
evmtest "github.com/evstack/ev-node/execution/evm/test"
3536
blobrpc "github.com/evstack/ev-node/pkg/da/jsonrpc"
3637
coreda "github.com/evstack/ev-node/pkg/da/types"
38+
"github.com/evstack/ev-node/pkg/p2p/key"
3739
"github.com/evstack/ev-node/pkg/rpc/client"
3840
rpcclient "github.com/evstack/ev-node/pkg/rpc/client"
3941
"github.com/evstack/ev-node/types"
@@ -82,26 +84,29 @@ func TestLeaseFailoverE2E(t *testing.T) {
8284
clusterNodes := &raftClusterNodes{
8385
nodes: make(map[string]*nodeDetails),
8486
}
85-
node1P2PAddr := env.Endpoints.GetRollkitP2PAddress()
86-
node2P2PAddr := env.Endpoints.GetFullNodeP2PAddress()
87-
node3P2PAddr := fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", mustGetAvailablePort(t))
87+
node1P2PListen := env.Endpoints.GetRollkitP2PAddress()
88+
node2P2PListen := env.Endpoints.GetFullNodeP2PAddress()
89+
node3P2PListen := fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", mustGetAvailablePort(t))
90+
node1P2PAddr := mustNodeP2PMultiAddr(t, workDir, "node1", node1P2PListen)
91+
node2P2PAddr := mustNodeP2PMultiAddr(t, workDir, "node2", node2P2PListen)
92+
node3P2PAddr := mustNodeP2PMultiAddr(t, workDir, "node3", node3P2PListen)
8893

8994
// Start node1 (bootstrap mode)
9095
go func() {
9196
p2pPeers := node2P2PAddr + "," + node3P2PAddr
9297
proc := setupRaftSequencerNode(t, sut, workDir, "node1", node1RaftAddr, env.SequencerJWT, env.GenesisHash, env.Endpoints.GetDAAddress(),
93-
bootstrapDir, raftCluster, p2pPeers, env.Endpoints.GetRollkitRPCListen(), env.Endpoints.GetRollkitP2PAddress(),
98+
bootstrapDir, raftCluster, p2pPeers, env.Endpoints.GetRollkitRPCListen(), node1P2PListen,
9499
env.Endpoints.GetSequencerEngineURL(), env.Endpoints.GetSequencerEthURL(), true, passphraseFile)
95-
clusterNodes.Set("node1", env.Endpoints.GetRollkitRPCAddress(), proc, env.Endpoints.GetSequencerEthURL(), node1RaftAddr, env.Endpoints.GetRollkitP2PAddress(), env.Endpoints.GetSequencerEngineURL(), env.Endpoints.GetSequencerEthURL())
100+
clusterNodes.Set("node1", env.Endpoints.GetRollkitRPCAddress(), proc, env.Endpoints.GetSequencerEthURL(), node1RaftAddr, node1P2PListen, node1P2PAddr, env.Endpoints.GetSequencerEngineURL(), env.Endpoints.GetSequencerEthURL())
96101
t.Log("Node1 is up")
97102
}()
98103

99104
// Start node2 (bootstrap node)
100105
go func() {
101106
t.Log("Starting Node2")
102107
p2pPeers := node1P2PAddr + "," + node3P2PAddr
103-
proc := setupRaftSequencerNode(t, sut, workDir, "node2", node2RaftAddr, env.FullNodeJWT, env.GenesisHash, env.Endpoints.GetDAAddress(), bootstrapDir, raftCluster, p2pPeers, env.Endpoints.GetFullNodeRPCListen(), env.Endpoints.GetFullNodeP2PAddress(), env.Endpoints.GetFullNodeEngineURL(), env.Endpoints.GetFullNodeEthURL(), true, passphraseFile)
104-
clusterNodes.Set("node2", env.Endpoints.GetFullNodeRPCAddress(), proc, env.Endpoints.GetFullNodeEthURL(), node2RaftAddr, env.Endpoints.GetFullNodeP2PAddress(), env.Endpoints.GetFullNodeEngineURL(), env.Endpoints.GetFullNodeEthURL())
108+
proc := setupRaftSequencerNode(t, sut, workDir, "node2", node2RaftAddr, env.FullNodeJWT, env.GenesisHash, env.Endpoints.GetDAAddress(), bootstrapDir, raftCluster, p2pPeers, env.Endpoints.GetFullNodeRPCListen(), node2P2PListen, env.Endpoints.GetFullNodeEngineURL(), env.Endpoints.GetFullNodeEthURL(), true, passphraseFile)
109+
clusterNodes.Set("node2", env.Endpoints.GetFullNodeRPCAddress(), proc, env.Endpoints.GetFullNodeEthURL(), node2RaftAddr, node2P2PListen, node2P2PAddr, env.Endpoints.GetFullNodeEngineURL(), env.Endpoints.GetFullNodeEthURL())
105110
t.Log("Node2 is up")
106111
}()
107112

@@ -112,8 +117,8 @@ func TestLeaseFailoverE2E(t *testing.T) {
112117
p2pPeers := node1P2PAddr + "," + node2P2PAddr
113118
node3RPCListen := fmt.Sprintf("127.0.0.1:%d", mustGetAvailablePort(t))
114119
ethEngineURL := fmt.Sprintf("http://127.0.0.1:%s", fullNode3EnginePort)
115-
proc := setupRaftSequencerNode(t, sut, workDir, "node3", node3RaftAddr, jwtSecret3, env.GenesisHash, env.Endpoints.GetDAAddress(), bootstrapDir, raftCluster, p2pPeers, node3RPCListen, node3P2PAddr, ethEngineURL, node3EthAddr, true, passphraseFile)
116-
clusterNodes.Set("node3", "http://"+node3RPCListen, proc, node3EthAddr, node3RaftAddr, node3P2PAddr, ethEngineURL, node3EthAddr)
120+
proc := setupRaftSequencerNode(t, sut, workDir, "node3", node3RaftAddr, jwtSecret3, env.GenesisHash, env.Endpoints.GetDAAddress(), bootstrapDir, raftCluster, p2pPeers, node3RPCListen, node3P2PListen, ethEngineURL, node3EthAddr, true, passphraseFile)
121+
clusterNodes.Set("node3", "http://"+node3RPCListen, proc, node3EthAddr, node3RaftAddr, node3P2PListen, node3P2PAddr, ethEngineURL, node3EthAddr)
117122
t.Log("Node3 is up")
118123
}()
119124

@@ -172,11 +177,11 @@ func TestLeaseFailoverE2E(t *testing.T) {
172177
}
173178
}
174179
oldDetails := clusterNodes.Details(oldLeader)
175-
restartedNodeProcess := setupRaftSequencerNode(t, sut, workDir, oldLeader, oldDetails.raftAddr, env.SequencerJWT, env.GenesisHash, env.Endpoints.GetDAAddress(), "", raftCluster, clusterNodes.Details(newLeader).p2pAddr, oldDetails.rpcAddr, oldDetails.p2pAddr, oldDetails.engineURL, oldDetails.ethAddr, false, passphraseFile)
180+
restartedNodeProcess := setupRaftSequencerNode(t, sut, workDir, oldLeader, oldDetails.raftAddr, env.SequencerJWT, env.GenesisHash, env.Endpoints.GetDAAddress(), "", raftCluster, clusterNodes.Details(newLeader).p2pPeerAddr, oldDetails.rpcAddr, oldDetails.p2pAddr, oldDetails.engineURL, oldDetails.ethAddr, false, passphraseFile)
176181
t.Log("Restarted old leader to sync with cluster: " + oldLeader)
177182

178183
if IsNodeUp(t, oldDetails.rpcAddr, NodeStartupTimeout) {
179-
clusterNodes.Set(oldLeader, oldDetails.rpcAddr, restartedNodeProcess, oldDetails.ethAddr, oldDetails.raftAddr, "", oldDetails.engineURL, oldDetails.ethAddr)
184+
clusterNodes.Set(oldLeader, oldDetails.rpcAddr, restartedNodeProcess, oldDetails.ethAddr, oldDetails.raftAddr, "", oldDetails.p2pPeerAddr, oldDetails.engineURL, oldDetails.ethAddr)
180185
} else {
181186
t.Log("+++ old leader did not recover on restart. Skipping node verification")
182187
}
@@ -276,26 +281,29 @@ func TestHASequencerRollingRestartE2E(t *testing.T) {
276281
clusterNodes := &raftClusterNodes{
277282
nodes: make(map[string]*nodeDetails),
278283
}
279-
node1P2PAddr := env.Endpoints.GetRollkitP2PAddress()
280-
node2P2PAddr := env.Endpoints.GetFullNodeP2PAddress()
281-
node3P2PAddr := fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", mustGetAvailablePort(t))
284+
node1P2PListen := env.Endpoints.GetRollkitP2PAddress()
285+
node2P2PListen := env.Endpoints.GetFullNodeP2PAddress()
286+
node3P2PListen := fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", mustGetAvailablePort(t))
287+
node1P2PAddr := mustNodeP2PMultiAddr(t, workDir, "node1", node1P2PListen)
288+
node2P2PAddr := mustNodeP2PMultiAddr(t, workDir, "node2", node2P2PListen)
289+
node3P2PAddr := mustNodeP2PMultiAddr(t, workDir, "node3", node3P2PListen)
282290

283291
// Start node1 (bootstrap mode)
284292
go func() {
285293
p2pPeers := node2P2PAddr + "," + node3P2PAddr
286294
proc := setupRaftSequencerNode(t, sut, workDir, "node1", node1RaftAddr, env.SequencerJWT, env.GenesisHash, env.Endpoints.GetDAAddress(),
287-
bootstrapDir, raftCluster, p2pPeers, env.Endpoints.GetRollkitRPCListen(), env.Endpoints.GetRollkitP2PAddress(),
295+
bootstrapDir, raftCluster, p2pPeers, env.Endpoints.GetRollkitRPCListen(), node1P2PListen,
288296
env.Endpoints.GetSequencerEngineURL(), env.Endpoints.GetSequencerEthURL(), true, passphraseFile)
289-
clusterNodes.Set("node1", env.Endpoints.GetRollkitRPCAddress(), proc, env.Endpoints.GetSequencerEthURL(), node1RaftAddr, env.Endpoints.GetRollkitP2PAddress(), env.Endpoints.GetSequencerEngineURL(), env.Endpoints.GetSequencerEthURL())
297+
clusterNodes.Set("node1", env.Endpoints.GetRollkitRPCAddress(), proc, env.Endpoints.GetSequencerEthURL(), node1RaftAddr, node1P2PListen, node1P2PAddr, env.Endpoints.GetSequencerEngineURL(), env.Endpoints.GetSequencerEthURL())
290298
t.Log("Node1 is up")
291299
}()
292300

293301
// Start node2 (bootstrap node)
294302
go func() {
295303
t.Log("Starting Node2")
296304
p2pPeers := node1P2PAddr + "," + node3P2PAddr
297-
proc := setupRaftSequencerNode(t, sut, workDir, "node2", node2RaftAddr, env.FullNodeJWT, env.GenesisHash, env.Endpoints.GetDAAddress(), bootstrapDir, raftCluster, p2pPeers, env.Endpoints.GetFullNodeRPCListen(), env.Endpoints.GetFullNodeP2PAddress(), env.Endpoints.GetFullNodeEngineURL(), env.Endpoints.GetFullNodeEthURL(), true, passphraseFile)
298-
clusterNodes.Set("node2", env.Endpoints.GetFullNodeRPCAddress(), proc, env.Endpoints.GetFullNodeEthURL(), node2RaftAddr, env.Endpoints.GetFullNodeP2PAddress(), env.Endpoints.GetFullNodeEngineURL(), env.Endpoints.GetFullNodeEthURL())
305+
proc := setupRaftSequencerNode(t, sut, workDir, "node2", node2RaftAddr, env.FullNodeJWT, env.GenesisHash, env.Endpoints.GetDAAddress(), bootstrapDir, raftCluster, p2pPeers, env.Endpoints.GetFullNodeRPCListen(), node2P2PListen, env.Endpoints.GetFullNodeEngineURL(), env.Endpoints.GetFullNodeEthURL(), true, passphraseFile)
306+
clusterNodes.Set("node2", env.Endpoints.GetFullNodeRPCAddress(), proc, env.Endpoints.GetFullNodeEthURL(), node2RaftAddr, node2P2PListen, node2P2PAddr, env.Endpoints.GetFullNodeEngineURL(), env.Endpoints.GetFullNodeEthURL())
299307
t.Log("Node2 is up")
300308
}()
301309

@@ -306,8 +314,8 @@ func TestHASequencerRollingRestartE2E(t *testing.T) {
306314
p2pPeers := node1P2PAddr + "," + node2P2PAddr
307315
node3RPCListen := fmt.Sprintf("127.0.0.1:%d", mustGetAvailablePort(t))
308316
ethEngineURL := fmt.Sprintf("http://127.0.0.1:%s", fullNode3EnginePort)
309-
proc := setupRaftSequencerNode(t, sut, workDir, "node3", node3RaftAddr, jwtSecret3, env.GenesisHash, env.Endpoints.GetDAAddress(), bootstrapDir, raftCluster, p2pPeers, node3RPCListen, node3P2PAddr, ethEngineURL, node3EthAddr, true, passphraseFile)
310-
clusterNodes.Set("node3", "http://"+node3RPCListen, proc, node3EthAddr, node3RaftAddr, node3P2PAddr, ethEngineURL, node3EthAddr)
317+
proc := setupRaftSequencerNode(t, sut, workDir, "node3", node3RaftAddr, jwtSecret3, env.GenesisHash, env.Endpoints.GetDAAddress(), bootstrapDir, raftCluster, p2pPeers, node3RPCListen, node3P2PListen, ethEngineURL, node3EthAddr, true, passphraseFile)
318+
clusterNodes.Set("node3", "http://"+node3RPCListen, proc, node3EthAddr, node3RaftAddr, node3P2PListen, node3P2PAddr, ethEngineURL, node3EthAddr)
311319
t.Log("Node3 is up")
312320
}()
313321

@@ -393,7 +401,7 @@ func TestHASequencerRollingRestartE2E(t *testing.T) {
393401
nodeDetails.engineURL, nodeDetails.ethAddr, false, passphraseFile)
394402

395403
clusterNodes.Set(nodeName, nodeDetails.rpcAddr, restartedProc, nodeDetails.ethAddr,
396-
nodeDetails.raftAddr, nodeDetails.p2pAddr, nodeDetails.engineURL, nodeDetails.ethAddr)
404+
nodeDetails.raftAddr, nodeDetails.p2pAddr, nodeDetails.p2pPeerAddr, nodeDetails.engineURL, nodeDetails.ethAddr)
397405
}
398406

399407
// Initial restart of all nodes
@@ -721,6 +729,16 @@ func initChain(t *testing.T, sut *SystemUnderTest, workDir string) string {
721729
require.NoError(t, err, "failed to init node", output)
722730
return passphraseFile
723731
}
732+
733+
func mustNodeP2PMultiAddr(t *testing.T, workDir, nodeID, listenAddr string) string {
734+
t.Helper()
735+
nodeKey, err := key.LoadOrGenNodeKey(filepath.Join(workDir, nodeID, "config"))
736+
require.NoError(t, err)
737+
peerID, err := peer.IDFromPrivateKey(nodeKey.PrivKey)
738+
require.NoError(t, err)
739+
return fmt.Sprintf("%s/p2p/%s", listenAddr, peerID.String())
740+
}
741+
724742
func setupRaftSequencerNode(
725743
t *testing.T,
726744
sut *SystemUnderTest,
@@ -843,6 +861,7 @@ type nodeDetails struct {
843861
xRPCClient atomic.Pointer[rpcclient.Client]
844862
running atomic.Bool
845863
p2pAddr string
864+
p2pPeerAddr string
846865
engineURL string
847866
ethURL string
848867
}
@@ -895,10 +914,10 @@ type raftClusterNodes struct {
895914
nodes map[string]*nodeDetails
896915
}
897916

898-
func (c *raftClusterNodes) Set(node string, listen string, proc *os.Process, eth string, raftAddr string, p2pAddr string, engineURL string, ethURL string) {
917+
func (c *raftClusterNodes) Set(node string, listen string, proc *os.Process, eth string, raftAddr string, p2pAddr string, p2pPeerAddr string, engineURL string, ethURL string) {
899918
c.mx.Lock()
900919
defer c.mx.Unlock()
901-
d := &nodeDetails{raftAddr: raftAddr, rpcAddr: listen, process: proc, ethAddr: eth, p2pAddr: p2pAddr, engineURL: engineURL, ethURL: ethURL}
920+
d := &nodeDetails{raftAddr: raftAddr, rpcAddr: listen, process: proc, ethAddr: eth, p2pAddr: p2pAddr, p2pPeerAddr: p2pPeerAddr, engineURL: engineURL, ethURL: ethURL}
902921
d.running.Store(true)
903922
c.nodes[node] = d
904923
}
@@ -946,11 +965,9 @@ func leader(t require.TestingT, nodes map[string]*nodeDetails) (string, *nodeDet
946965
}
947966
resp, err := client.Get(details.rpcAddr + "/raft/node")
948967
require.NoError(t, err)
949-
defer resp.Body.Close()
950-
951968
var status nodeStatus
952969
require.NoError(t, json.NewDecoder(resp.Body).Decode(&status))
953-
970+
_ = resp.Body.Close()
954971
if status.IsLeader {
955972
return node, details
956973
}
@@ -973,16 +990,17 @@ func must[T any](r T, err error) T {
973990
func IsNodeUp(t *testing.T, rpcAddr string, timeout time.Duration) bool {
974991
t.Helper()
975992
t.Logf("Query node is up: %s", rpcAddr)
976-
ctx, done := context.WithTimeout(context.Background(), timeout)
993+
ctx, done := context.WithTimeout(t.Context(), timeout)
977994
defer done()
978995

979-
ticker := time.Tick(min(timeout/10, 200*time.Millisecond))
996+
ticker := time.NewTicker(min(timeout/10, 200*time.Millisecond))
997+
defer ticker.Stop()
980998
c := client.NewClient(rpcAddr)
981999
require.NotNil(t, c)
9821000
var lastBlock uint64
9831001
for {
9841002
select {
985-
case <-ticker:
1003+
case <-ticker.C:
9861004
switch s, err := c.GetState(ctx); {
9871005
case err != nil: // ignore
9881006
case lastBlock == 0:

0 commit comments

Comments
 (0)