Skip to content

Commit 282e7c3

Browse files
Remove zombie node logic and add out-of-sync fallback selection (#85)
* zombie nodes * update * fix comments * update * update
1 parent c69f27e commit 282e7c3

6 files changed

Lines changed: 49 additions & 45 deletions

File tree

multinode/mock_pool_chain_info_provider_test.go

Lines changed: 15 additions & 14 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

multinode/multi_node.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ func (c *MultiNode[CHAIN_ID, RPC]) selectNode(ctx context.Context) (node Node[CH
226226
return nil, err
227227
}
228228

229-
c.lggr.Debugw("Switched to a new active node due to prev node heath issues", "prevNode", prevNodeName, "newNode", c.activeNode.String())
229+
c.lggr.Debugw("Switched to a new active node due to prev node health issues", "prevNode", prevNodeName, "newNode", c.activeNode.String())
230230
return c.activeNode, err
231231
}
232232

@@ -254,17 +254,19 @@ func (c *MultiNode[CHAIN_ID, RPC]) awaitNodeSelection(ctx context.Context) (Node
254254
}
255255
}
256256

257-
// LatestChainInfo - returns number of live nodes available in the pool, so we can prevent the last alive node in a pool from being marked as out-of-sync.
258-
// Return highest ChainInfo most recently received by the alive nodes.
257+
// LatestChainInfo returns the number of alive nodes in the pool (excluding the node identified by callerName
258+
// from the count) and the highest ChainInfo most recently received by alive nodes.
259259
// E.g. If Node A's the most recent block is 10 and highest 15 and for Node B it's - 12 and 14. This method will return 12.
260-
func (c *MultiNode[CHAIN_ID, RPC]) LatestChainInfo() (int, ChainInfo) {
260+
func (c *MultiNode[CHAIN_ID, RPC]) LatestChainInfo(callerName string) (int, ChainInfo) {
261261
var nLiveNodes int
262262
ch := ChainInfo{
263263
TotalDifficulty: big.NewInt(0),
264264
}
265265
for _, n := range c.primaryNodes {
266266
if s, nodeChainInfo := n.StateAndLatest(); s == nodeStateAlive {
267-
nLiveNodes++
267+
if n.Name() != callerName {
268+
nLiveNodes++
269+
}
268270
ch.BlockNumber = max(ch.BlockNumber, nodeChainInfo.BlockNumber)
269271
ch.FinalizedBlockNumber = max(ch.FinalizedBlockNumber, nodeChainInfo.FinalizedBlockNumber)
270272
ch.TotalDifficulty = MaxTotalDifficulty(ch.TotalDifficulty, nodeChainInfo.TotalDifficulty)

multinode/multi_node_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -602,14 +602,15 @@ func TestMultiNode_ChainInfo(t *testing.T) {
602602
for i := range testCases {
603603
tc := testCases[i]
604604
t.Run(tc.Name, func(t *testing.T) {
605-
for _, params := range tc.NodeParams {
605+
for i, params := range tc.NodeParams {
606606
node := newMockNode[ID, multiNodeRPCClient](t)
607607
mn.primaryNodes = append(mn.primaryNodes, node)
608+
node.On("Name").Return(fmt.Sprintf("node_%d", i)).Maybe()
608609
node.On("StateAndLatest").Return(params.State, params.LatestChainInfo)
609610
node.On("HighestUserObservations").Return(params.HighestUserObservations)
610611
}
611612

612-
nNodes, latestChainInfo := mn.LatestChainInfo()
613+
nNodes, latestChainInfo := mn.LatestChainInfo("")
613614
assert.Equal(t, tc.ExpectedNLiveNodes, nNodes)
614615
assert.Equal(t, tc.ExpectedLatestChainInfo, latestChainInfo)
615616

multinode/node_lifecycle.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
128128
if pollFailureThreshold > 0 && pollFailures >= pollFailureThreshold {
129129
lggr.Errorw(fmt.Sprintf("RPC endpoint failed to respond to %d consecutive polls", pollFailures), "pollFailures", pollFailures, "nodeState", n.getCachedState())
130130
if n.poolInfoProvider != nil {
131-
if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 && !n.isLoadBalancedRPC {
131+
if l, _ := n.poolInfoProvider.LatestChainInfo(n.name); l < 1 && !n.isLoadBalancedRPC {
132132
lggr.Criticalf("RPC endpoint failed to respond to polls; %s %s", msgCannotDisable, msgDegradedState)
133133
continue
134134
}
@@ -138,7 +138,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
138138
}
139139
if outOfSync, liveNodes := n.isOutOfSyncWithPool(); outOfSync {
140140
// note: there must be another live node for us to be out of sync
141-
if liveNodes < 2 && !n.isLoadBalancedRPC {
141+
if liveNodes < 1 && !n.isLoadBalancedRPC {
142142
lggr.Criticalf("RPC endpoint has fallen behind; %s %s", msgCannotDisable, msgDegradedState)
143143
continue
144144
}
@@ -166,7 +166,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
166166
if n.poolInfoProvider != nil {
167167
// if its the only node and its not a proxy, keep waiting for sync (check LatestChainInfo)
168168
// if its a proxy, then declare out of sync and try reconnecting because proxy might return a healthier rpc
169-
if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 && !n.isLoadBalancedRPC {
169+
if l, _ := n.poolInfoProvider.LatestChainInfo(n.name); l < 1 && !n.isLoadBalancedRPC {
170170
lggr.Criticalf("RPC endpoint detected out of sync; %s %s", msgCannotDisable, msgDegradedState)
171171
// We don't necessarily want to wait the full timeout to check again, we should
172172
// check regularly and log noisily in this state
@@ -194,7 +194,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
194194
if n.poolInfoProvider != nil {
195195
// if its the only node and its not a proxy, keep waiting for sync (check LatestChainInfo)
196196
// if its a proxy, then declare out of sync and try reconnecting because proxy might return a healthier rpc
197-
if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 && !n.isLoadBalancedRPC {
197+
if l, _ := n.poolInfoProvider.LatestChainInfo(n.name); l < 1 && !n.isLoadBalancedRPC {
198198
lggr.Criticalf("RPC's finalized state is out of sync; %s %s", msgCannotDisable, msgDegradedState)
199199
// We don't necessarily want to wait the full timeout to check again, we should
200200
// check regularly and log noisily in this state
@@ -342,7 +342,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) isOutOfSyncWithPool() (outOfSync bool, liveN
342342
return // disabled
343343
}
344344
// Check against best node
345-
ln, ci := n.poolInfoProvider.LatestChainInfo()
345+
ln, ci := n.poolInfoProvider.LatestChainInfo(n.name)
346346
localChainInfo, _ := n.rpc.GetInterceptedChainInfo()
347347
mode := n.nodePoolCfg.SelectionMode()
348348
switch mode {
@@ -459,7 +459,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(syncIssues syncStatus) {
459459
lggr.Debugw(msgReceivedBlock, "blockNumber", head.BlockNumber(), "blockDifficulty", head.BlockDifficulty(), "syncIssues", syncIssues)
460460
case <-time.After(zombieNodeCheckInterval(noNewHeadsTimeoutThreshold)):
461461
if n.poolInfoProvider != nil {
462-
if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 1 {
462+
if l, _ := n.poolInfoProvider.LatestChainInfo(n.name); l < 1 {
463463
if n.isLoadBalancedRPC {
464464
// in case all rpcs behind a load balanced rpc are out of sync, we need to declare out of sync to prevent false transition to alive
465465
n.declareOutOfSync(syncIssues)

multinode/node_lifecycle_test.go

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
191191
})
192192
defer func() { assert.NoError(t, node.close()) }()
193193
poolInfo := newMockPoolChainInfoProvider(t)
194-
poolInfo.On("LatestChainInfo").Return(1, ChainInfo{
194+
poolInfo.On("LatestChainInfo", mock.Anything).Return(0, ChainInfo{
195195
BlockNumber: 20,
196196
}).Once()
197197
node.SetPoolChainInfoProvider(poolInfo)
@@ -218,7 +218,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
218218
})
219219
defer func() { assert.NoError(t, node.close()) }()
220220
poolInfo := newMockPoolChainInfoProvider(t)
221-
poolInfo.On("LatestChainInfo").Return(1, ChainInfo{
221+
poolInfo.On("LatestChainInfo", mock.Anything).Return(0, ChainInfo{
222222
BlockNumber: 20,
223223
}).Once()
224224
node.SetPoolChainInfoProvider(poolInfo)
@@ -250,7 +250,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
250250
const mostRecentBlock = 20
251251
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: mostRecentBlock}, ChainInfo{BlockNumber: 30})
252252
poolInfo := newMockPoolChainInfoProvider(t)
253-
poolInfo.On("LatestChainInfo").Return(10, ChainInfo{
253+
poolInfo.On("LatestChainInfo", mock.Anything).Return(10, ChainInfo{
254254
BlockNumber: syncThreshold + mostRecentBlock + 1,
255255
TotalDifficulty: big.NewInt(10),
256256
})
@@ -285,7 +285,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
285285
const mostRecentBlock = 20
286286
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: mostRecentBlock}, ChainInfo{BlockNumber: 30})
287287
poolInfo := newMockPoolChainInfoProvider(t)
288-
poolInfo.On("LatestChainInfo").Return(1, ChainInfo{
288+
poolInfo.On("LatestChainInfo", mock.Anything).Return(0, ChainInfo{
289289
BlockNumber: syncThreshold + mostRecentBlock + 1,
290290
TotalDifficulty: big.NewInt(10),
291291
})
@@ -313,7 +313,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
313313
const mostRecentBlock = 20
314314
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: mostRecentBlock}, ChainInfo{BlockNumber: 30}).Twice()
315315
poolInfo := newMockPoolChainInfoProvider(t)
316-
poolInfo.On("LatestChainInfo").Return(1, ChainInfo{
316+
poolInfo.On("LatestChainInfo", mock.Anything).Return(0, ChainInfo{
317317
BlockNumber: syncThreshold + mostRecentBlock + 1,
318318
TotalDifficulty: big.NewInt(10),
319319
})
@@ -389,7 +389,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
389389
})
390390
defer func() { assert.NoError(t, node.close()) }()
391391
poolInfo := newMockPoolChainInfoProvider(t)
392-
poolInfo.On("LatestChainInfo").Return(1, ChainInfo{
392+
poolInfo.On("LatestChainInfo", mock.Anything).Return(0, ChainInfo{
393393
BlockNumber: 20,
394394
TotalDifficulty: big.NewInt(10),
395395
}).Once()
@@ -414,7 +414,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
414414
})
415415
defer func() { assert.NoError(t, node.close()) }()
416416
poolInfo := newMockPoolChainInfoProvider(t)
417-
poolInfo.On("LatestChainInfo").Return(1, ChainInfo{
417+
poolInfo.On("LatestChainInfo", mock.Anything).Return(0, ChainInfo{
418418
BlockNumber: 20,
419419
TotalDifficulty: big.NewInt(10),
420420
}).Once()
@@ -640,7 +640,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
640640
})
641641
defer func() { assert.NoError(t, node.close()) }()
642642
poolInfo := newMockPoolChainInfoProvider(t)
643-
poolInfo.On("LatestChainInfo").Return(1, ChainInfo{
643+
poolInfo.On("LatestChainInfo", mock.Anything).Return(0, ChainInfo{
644644
BlockNumber: 20,
645645
TotalDifficulty: big.NewInt(10),
646646
}).Once()
@@ -668,13 +668,12 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
668668
})
669669
defer func() { assert.NoError(t, node.close()) }()
670670
poolInfo := newMockPoolChainInfoProvider(t)
671-
poolInfo.On("LatestChainInfo").Return(1, ChainInfo{
671+
poolInfo.On("LatestChainInfo", mock.Anything).Return(0, ChainInfo{
672672
BlockNumber: 20,
673673
TotalDifficulty: big.NewInt(10),
674674
}).Once()
675675
node.SetPoolChainInfoProvider(poolInfo)
676676
// tries to redial in outOfSync
677-
// tries to redial in outOfSync
678677
rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Run(func(_ mock.Arguments) {
679678
assert.Equal(t, nodeStateOutOfSync, node.State())
680679
}).Once()
@@ -1043,7 +1042,7 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) {
10431042
})
10441043
defer func() { assert.NoError(t, node.close()) }()
10451044
poolInfo := newMockPoolChainInfoProvider(t)
1046-
poolInfo.On("LatestChainInfo").Return(0, ChainInfo{
1045+
poolInfo.On("LatestChainInfo", mock.Anything).Return(0, ChainInfo{
10471046
BlockNumber: 100,
10481047
TotalDifficulty: big.NewInt(200),
10491048
})
@@ -1081,7 +1080,7 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) {
10811080
})
10821081
defer func() { assert.NoError(t, node.close()) }()
10831082
poolInfo := newMockPoolChainInfoProvider(t)
1084-
poolInfo.On("LatestChainInfo").Return(0, ChainInfo{
1083+
poolInfo.On("LatestChainInfo", mock.Anything).Return(0, ChainInfo{
10851084
BlockNumber: 100,
10861085
TotalDifficulty: big.NewInt(200),
10871086
})
@@ -1121,7 +1120,7 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) {
11211120
defer func() { assert.NoError(t, node.close()) }()
11221121
poolInfo := newMockPoolChainInfoProvider(t)
11231122
const highestBlock = 20
1124-
poolInfo.On("LatestChainInfo").Return(1, ChainInfo{
1123+
poolInfo.On("LatestChainInfo", mock.Anything).Return(1, ChainInfo{
11251124
BlockNumber: highestBlock * 2,
11261125
TotalDifficulty: big.NewInt(200),
11271126
})
@@ -1774,7 +1773,7 @@ func TestUnit_NodeLifecycle_outOfSyncWithPool(t *testing.T) {
17741773
config: testNodeConfig{syncThreshold: 1},
17751774
})
17761775
poolInfo := newMockPoolChainInfoProvider(t)
1777-
poolInfo.On("LatestChainInfo").Return(1, ChainInfo{}).Once()
1776+
poolInfo.On("LatestChainInfo", mock.Anything).Return(1, ChainInfo{}).Once()
17781777
node.SetPoolChainInfoProvider(poolInfo)
17791778
assert.Panics(t, func() {
17801779
_, _ = node.isOutOfSyncWithPool()
@@ -1820,7 +1819,7 @@ func TestUnit_NodeLifecycle_outOfSyncWithPool(t *testing.T) {
18201819
},
18211820
})
18221821
poolInfo := newMockPoolChainInfoProvider(t)
1823-
poolInfo.On("LatestChainInfo").Return(nodesNum, ChainInfo{
1822+
poolInfo.On("LatestChainInfo", mock.Anything).Return(nodesNum, ChainInfo{
18241823
BlockNumber: highestBlock,
18251824
TotalDifficulty: big.NewInt(totalDifficulty),
18261825
})
@@ -1880,7 +1879,7 @@ func TestUnit_NodeLifecycle_outOfSyncWithPool(t *testing.T) {
18801879
})
18811880

18821881
poolInfo := newMockPoolChainInfoProvider(t)
1883-
poolInfo.On("LatestChainInfo").Return(nodesNum, ChainInfo{
1882+
poolInfo.On("LatestChainInfo", mock.Anything).Return(nodesNum, ChainInfo{
18841883
BlockNumber: highestBlock,
18851884
TotalDifficulty: big.NewInt(totalDifficulty),
18861885
})

multinode/types.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,11 +89,12 @@ type Head interface {
8989

9090
// PoolChainInfoProvider - provides aggregation of nodes pool ChainInfo
9191
type PoolChainInfoProvider interface {
92-
// LatestChainInfo - returns number of live nodes available in the pool, so we can prevent the last alive node in a pool from being
92+
// LatestChainInfo returns the number of live nodes available in the pool (excluding the node identified by
93+
// callerName from the count), so we can prevent the last alive node in a pool from being
9394
// moved to out-of-sync state. It is better to have one out-of-sync node than no nodes at all.
9495
// Returns highest latest ChainInfo within the alive nodes. E.g. most recent block number and highest block number
9596
// observed by Node A are 10 and 15; Node B - 12 and 14. This method will return 12.
96-
LatestChainInfo() (int, ChainInfo)
97+
LatestChainInfo(callerName string) (int, ChainInfo)
9798
// HighestUserObservations - returns highest ChainInfo ever observed by any user of MultiNode.
9899
HighestUserObservations() ChainInfo
99100
}

0 commit comments

Comments
 (0)