Skip to content

Commit d030e50

Browse files
update
1 parent 89eeb66 commit d030e50

5 files changed

Lines changed: 59 additions & 230 deletions

File tree

multinode/multi_node.go

Lines changed: 7 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package multinode
33
import (
44
"context"
55
"fmt"
6-
"math"
76
"math/big"
87
"slices"
98
"sync"
@@ -192,8 +191,6 @@ func (c *MultiNode[CHAIN_ID, RPC]) SelectRPC(ctx context.Context) (rpc RPC, err
192191
}
193192

194193
// selectNode returns the active Node, if it is still nodeStateAlive, otherwise it selects a new one from the NodeSelector.
195-
// If no alive node is available, it falls back to an out-of-sync node. If the current active node is out-of-sync,
196-
// it will try to upgrade to an alive node when one becomes available.
197194
func (c *MultiNode[CHAIN_ID, RPC]) selectNode(ctx context.Context) (node Node[CHAIN_ID, RPC], err error) {
198195
if c.selectionMode == NodeSelectionModeRandomRPC {
199196
return c.awaitNodeSelection(ctx)
@@ -214,17 +211,6 @@ func (c *MultiNode[CHAIN_ID, RPC]) selectNode(ctx context.Context) (node Node[CH
214211
return // another goroutine beat us here
215212
}
216213

217-
// If the current active node is out-of-sync, try to find an alive one first
218-
if node != nil && isUsableState(node.State()) {
219-
if aliveNode := c.nodeSelector.Select(); aliveNode != nil {
220-
c.activeNode.UnsubscribeAllExceptAliveLoop()
221-
c.activeNode = aliveNode
222-
c.lggr.Debugw("Upgraded from out-of-sync to alive node", "prevNode", node.String(), "newNode", aliveNode.String())
223-
return c.activeNode, nil
224-
}
225-
return // keep using the out-of-sync node
226-
}
227-
228214
var prevNodeName string
229215
if c.activeNode != nil {
230216
prevNodeName = c.activeNode.String()
@@ -241,8 +227,7 @@ func (c *MultiNode[CHAIN_ID, RPC]) selectNode(ctx context.Context) (node Node[CH
241227
}
242228

243229
// awaitNodeSelection blocks until nodeSelector returns a live node or all nodes
244-
// finish initializing. If no alive nodes are available, falls back to an out-of-sync node.
245-
// Returns ErrNodeError when no usable nodes are available.
230+
// finish initializing. Returns ErrNodeError when no live nodes are available.
246231
func (c *MultiNode[CHAIN_ID, RPC]) awaitNodeSelection(ctx context.Context) (Node[CHAIN_ID, RPC], error) {
247232
for {
248233
node := c.nodeSelector.Select()
@@ -259,54 +244,25 @@ func (c *MultiNode[CHAIN_ID, RPC]) awaitNodeSelection(ctx context.Context) (Node
259244
continue
260245
}
261246
}
262-
if fallback := c.selectOutOfSyncNode(); fallback != nil {
263-
c.lggr.Criticalw("No alive RPC nodes available, falling back to out-of-sync node", "node", fallback.String())
264-
c.eng.EmitHealthErr(fmt.Errorf("no alive nodes available for chain %s, using out-of-sync fallback", c.chainID.String()))
265-
return fallback, nil
266-
}
267247
c.lggr.Criticalw("No live RPC nodes available", "NodeSelectionMode", c.nodeSelector.Name())
268248
c.eng.EmitHealthErr(fmt.Errorf("no live nodes available for chain %s", c.chainID.String()))
269249
return nil, ErrNodeError
270250
}
271251
}
272252

273-
// selectOutOfSyncNode picks the best out-of-sync node by highest block number.
274-
// Returns nil if no out-of-sync nodes are available.
275-
func (c *MultiNode[CHAIN_ID, RPC]) selectOutOfSyncNode() Node[CHAIN_ID, RPC] {
276-
var bestNode Node[CHAIN_ID, RPC]
277-
var bestBlock int64 = math.MinInt64
278-
for _, n := range c.primaryNodes {
279-
if isUsableState(n.State()) {
280-
_, ci := n.StateAndLatest()
281-
if ci.BlockNumber > bestBlock {
282-
bestBlock = ci.BlockNumber
283-
bestNode = n
284-
}
285-
}
286-
}
287-
return bestNode
288-
}
289-
290-
// isUsableState returns true for out-of-sync states that can still serve requests as a fallback.
291-
// nodeStateFinalizedBlockOutOfSync is intentionally excluded to prevent local finality violations.
292-
func isUsableState(s nodeState) bool {
293-
return s == nodeStateOutOfSync
294-
}
295-
296-
// LatestChainInfo returns the number of alive nodes in the pool (excluding the node identified by callerName)
297-
// and the highest ChainInfo most recently received by those nodes.
253+
// LatestChainInfo returns the number of alive nodes in the pool (excluding the node identified by callerName
254+
// from the count) and the highest ChainInfo most recently received by alive nodes.
298255
// E.g. If Node A's the most recent block is 10 and highest 15 and for Node B it's - 12 and 14. This method will return 12.
299256
func (c *MultiNode[CHAIN_ID, RPC]) LatestChainInfo(callerName string) (int, ChainInfo) {
300257
var nLiveNodes int
301258
ch := ChainInfo{
302259
TotalDifficulty: big.NewInt(0),
303260
}
304261
for _, n := range c.primaryNodes {
305-
if n.Name() == callerName {
306-
continue
307-
}
308262
if s, nodeChainInfo := n.StateAndLatest(); s == nodeStateAlive {
309-
nLiveNodes++
263+
if n.Name() != callerName {
264+
nLiveNodes++
265+
}
310266
ch.BlockNumber = max(ch.BlockNumber, nodeChainInfo.BlockNumber)
311267
ch.FinalizedBlockNumber = max(ch.FinalizedBlockNumber, nodeChainInfo.FinalizedBlockNumber)
312268
ch.TotalDifficulty = MaxTotalDifficulty(ch.TotalDifficulty, nodeChainInfo.TotalDifficulty)
@@ -342,12 +298,7 @@ func (c *MultiNode[CHAIN_ID, RPC]) checkLease() {
342298

343299
c.activeMu.Lock()
344300
defer c.activeMu.Unlock()
345-
346-
if bestNode == nil {
347-
bestNode = c.selectOutOfSyncNode()
348-
}
349-
350-
if bestNode != nil && bestNode != c.activeNode {
301+
if bestNode != c.activeNode {
351302
if c.activeNode != nil {
352303
c.activeNode.UnsubscribeAllExceptAliveLoop()
353304
}

multinode/multi_node_test.go

Lines changed: 2 additions & 169 deletions
Original file line numberDiff line numberDiff line change
@@ -378,8 +378,8 @@ func TestMultiNode_selectNode(t *testing.T) {
378378
activeNode, err := mn.selectNode(ctx)
379379
require.NoError(t, err)
380380
require.Equal(t, oldBest.String(), activeNode.String())
381-
// old best is out-of-sync, a new alive node is available via selector
382-
oldBest.On("State").Return(nodeStateOutOfSync).Maybe()
381+
// old best died, so we should replace it
382+
oldBest.On("State").Return(nodeStateOutOfSync).Twice()
383383
nodeSelector.On("Select").Return(newBest).Once()
384384
newActiveNode, err := mn.selectNode(ctx)
385385
require.NoError(t, err)
@@ -404,143 +404,6 @@ func TestMultiNode_selectNode(t *testing.T) {
404404
require.Nil(t, node)
405405
tests.RequireLogMessage(t, observedLogs, "No live RPC nodes available")
406406
})
407-
t.Run("Falls back to out-of-sync node when no alive nodes available", func(t *testing.T) {
408-
t.Parallel()
409-
ctx := tests.Context(t)
410-
chainID := RandomID()
411-
lggr, observedLogs := logger.TestObserved(t, zap.WarnLevel)
412-
oosNode := newMockNode[ID, multiNodeRPCClient](t)
413-
oosNode.On("State").Return(nodeStateOutOfSync).Maybe()
414-
oosNode.On("StateAndLatest").Return(nodeStateOutOfSync, ChainInfo{BlockNumber: 50}).Maybe()
415-
oosNode.On("String").Return("oosNode").Maybe()
416-
unreachableNode := newMockNode[ID, multiNodeRPCClient](t)
417-
unreachableNode.On("State").Return(nodeStateUnreachable).Maybe()
418-
unreachableNode.On("String").Return("unreachableNode").Maybe()
419-
mn := newTestMultiNode(t, multiNodeOpts{
420-
selectionMode: NodeSelectionModeRoundRobin,
421-
chainID: chainID,
422-
nodes: []Node[ID, multiNodeRPCClient]{oosNode, unreachableNode},
423-
logger: lggr,
424-
})
425-
nodeSelector := newMockNodeSelector[ID, multiNodeRPCClient](t)
426-
nodeSelector.On("Select").Return(nil)
427-
mn.nodeSelector = nodeSelector
428-
selected, err := mn.selectNode(ctx)
429-
require.NoError(t, err)
430-
assert.Equal(t, oosNode, selected)
431-
tests.RequireLogMessage(t, observedLogs, "No alive RPC nodes available, falling back to out-of-sync node")
432-
})
433-
t.Run("Does not fall back to FinalizedBlockOutOfSync node", func(t *testing.T) {
434-
t.Parallel()
435-
ctx := tests.Context(t)
436-
chainID := RandomID()
437-
lggr, observedLogs := logger.TestObserved(t, zap.WarnLevel)
438-
fbOosNode := newMockNode[ID, multiNodeRPCClient](t)
439-
fbOosNode.On("State").Return(nodeStateFinalizedBlockOutOfSync).Maybe()
440-
fbOosNode.On("String").Return("fbOosNode").Maybe()
441-
mn := newTestMultiNode(t, multiNodeOpts{
442-
selectionMode: NodeSelectionModeRoundRobin,
443-
chainID: chainID,
444-
nodes: []Node[ID, multiNodeRPCClient]{fbOosNode},
445-
logger: lggr,
446-
})
447-
nodeSelector := newMockNodeSelector[ID, multiNodeRPCClient](t)
448-
nodeSelector.On("Select").Return(nil)
449-
nodeSelector.On("Name").Return("MockedNodeSelector")
450-
mn.nodeSelector = nodeSelector
451-
selected, err := mn.selectNode(ctx)
452-
require.EqualError(t, err, ErrNodeError.Error())
453-
require.Nil(t, selected)
454-
tests.RequireLogMessage(t, observedLogs, "No live RPC nodes available")
455-
})
456-
t.Run("Selects best out-of-sync node by highest block number", func(t *testing.T) {
457-
t.Parallel()
458-
ctx := tests.Context(t)
459-
chainID := RandomID()
460-
lggr, _ := logger.TestObserved(t, zap.WarnLevel)
461-
oosLow := newMockNode[ID, multiNodeRPCClient](t)
462-
oosLow.On("State").Return(nodeStateOutOfSync).Maybe()
463-
oosLow.On("StateAndLatest").Return(nodeStateOutOfSync, ChainInfo{BlockNumber: 30}).Maybe()
464-
oosLow.On("String").Return("oosLow").Maybe()
465-
oosHigh := newMockNode[ID, multiNodeRPCClient](t)
466-
oosHigh.On("State").Return(nodeStateOutOfSync).Maybe()
467-
oosHigh.On("StateAndLatest").Return(nodeStateOutOfSync, ChainInfo{BlockNumber: 90}).Maybe()
468-
oosHigh.On("String").Return("oosHigh").Maybe()
469-
mn := newTestMultiNode(t, multiNodeOpts{
470-
selectionMode: NodeSelectionModeRoundRobin,
471-
chainID: chainID,
472-
nodes: []Node[ID, multiNodeRPCClient]{oosLow, oosHigh},
473-
logger: lggr,
474-
})
475-
nodeSelector := newMockNodeSelector[ID, multiNodeRPCClient](t)
476-
nodeSelector.On("Select").Return(nil)
477-
mn.nodeSelector = nodeSelector
478-
selected, err := mn.selectNode(ctx)
479-
require.NoError(t, err)
480-
assert.Equal(t, oosHigh, selected)
481-
})
482-
t.Run("Keeps out-of-sync active node when no alive node becomes available", func(t *testing.T) {
483-
t.Parallel()
484-
ctx := tests.Context(t)
485-
chainID := RandomID()
486-
oosNode := newMockNode[ID, multiNodeRPCClient](t)
487-
oosNode.On("State").Return(nodeStateOutOfSync).Maybe()
488-
oosNode.On("StateAndLatest").Return(nodeStateOutOfSync, ChainInfo{BlockNumber: 50}).Maybe()
489-
oosNode.On("String").Return("oosNode").Maybe()
490-
oosNode2 := newMockNode[ID, multiNodeRPCClient](t)
491-
oosNode2.On("State").Return(nodeStateOutOfSync).Maybe()
492-
oosNode2.On("StateAndLatest").Return(nodeStateOutOfSync, ChainInfo{BlockNumber: 40}).Maybe()
493-
oosNode2.On("String").Return("oosNode2").Maybe()
494-
mn := newTestMultiNode(t, multiNodeOpts{
495-
selectionMode: NodeSelectionModeRoundRobin,
496-
chainID: chainID,
497-
nodes: []Node[ID, multiNodeRPCClient]{oosNode, oosNode2},
498-
})
499-
nodeSelector := newMockNodeSelector[ID, multiNodeRPCClient](t)
500-
nodeSelector.On("Select").Return(nil)
501-
mn.nodeSelector = nodeSelector
502-
// First call falls back to best out-of-sync (oosNode has higher block)
503-
first, err := mn.selectNode(ctx)
504-
require.NoError(t, err)
505-
assert.Equal(t, oosNode, first)
506-
// Second call: still no alive, keeps the same out-of-sync node (no switch to oosNode2)
507-
second, err := mn.selectNode(ctx)
508-
require.NoError(t, err)
509-
assert.Equal(t, oosNode, second)
510-
})
511-
t.Run("Upgrades from out-of-sync active to alive node when one becomes available", func(t *testing.T) {
512-
t.Parallel()
513-
ctx := tests.Context(t)
514-
chainID := RandomID()
515-
lggr, _ := logger.TestObserved(t, zap.DebugLevel)
516-
oosNode := newMockNode[ID, multiNodeRPCClient](t)
517-
oosNode.On("State").Return(nodeStateOutOfSync).Maybe()
518-
oosNode.On("StateAndLatest").Return(nodeStateOutOfSync, ChainInfo{BlockNumber: 50}).Maybe()
519-
oosNode.On("String").Return("oosNode").Maybe()
520-
oosNode.On("UnsubscribeAllExceptAliveLoop").Maybe()
521-
aliveNode := newMockNode[ID, multiNodeRPCClient](t)
522-
aliveNode.On("State").Return(nodeStateAlive).Maybe()
523-
aliveNode.On("String").Return("aliveNode").Maybe()
524-
mn := newTestMultiNode(t, multiNodeOpts{
525-
selectionMode: NodeSelectionModeRoundRobin,
526-
chainID: chainID,
527-
nodes: []Node[ID, multiNodeRPCClient]{oosNode, aliveNode},
528-
logger: lggr,
529-
})
530-
nodeSelector := newMockNodeSelector[ID, multiNodeRPCClient](t)
531-
// First select returns nil (no alive), second returns alive
532-
nodeSelector.On("Select").Return(nil).Once()
533-
mn.nodeSelector = nodeSelector
534-
// First selection falls back to out-of-sync
535-
first, err := mn.selectNode(ctx)
536-
require.NoError(t, err)
537-
assert.Equal(t, oosNode, first)
538-
// Now an alive node becomes available
539-
nodeSelector.On("Select").Return(aliveNode).Once()
540-
second, err := mn.selectNode(ctx)
541-
require.NoError(t, err)
542-
assert.Equal(t, aliveNode, second)
543-
})
544407
}
545408

546409
func TestMultiNode_RandomRPC(t *testing.T) {
@@ -617,36 +480,6 @@ func TestMultiNode_RandomRPC(t *testing.T) {
617480
node1.AssertNotCalled(t, "UnsubscribeAllExceptAliveLoop")
618481
node2.AssertNotCalled(t, "UnsubscribeAllExceptAliveLoop")
619482
})
620-
t.Run("RandomRPC falls back to out-of-sync node when no alive nodes available", func(t *testing.T) {
621-
t.Parallel()
622-
ctx := t.Context()
623-
chainID := RandomID()
624-
lggr, observedLogs := logger.TestObserved(t, zap.WarnLevel)
625-
oosNode := newMockNode[ID, multiNodeRPCClient](t)
626-
oosNode.On("State").Return(nodeStateOutOfSync).Maybe()
627-
oosNode.On("StateAndLatest").Return(nodeStateOutOfSync, ChainInfo{BlockNumber: 50}).Maybe()
628-
oosNode.On("String").Return("oosNode").Maybe()
629-
mn := newTestMultiNode(t, multiNodeOpts{
630-
selectionMode: NodeSelectionModeRandomRPC,
631-
chainID: chainID,
632-
nodes: []Node[ID, multiNodeRPCClient]{oosNode},
633-
logger: lggr,
634-
})
635-
nodeSelector := newMockNodeSelector[ID, multiNodeRPCClient](t)
636-
nodeSelector.On("Select").Return(nil)
637-
mn.nodeSelector = nodeSelector
638-
639-
first, err := mn.selectNode(ctx)
640-
require.NoError(t, err)
641-
assert.Equal(t, oosNode, first)
642-
tests.RequireLogMessage(t, observedLogs, "No alive RPC nodes available, falling back to out-of-sync node")
643-
644-
second, err := mn.selectNode(ctx)
645-
require.NoError(t, err)
646-
assert.Equal(t, oosNode, second)
647-
648-
oosNode.AssertNotCalled(t, "UnsubscribeAllExceptAliveLoop")
649-
})
650483
t.Run("RandomRPC reports error when no nodes available", func(t *testing.T) {
651484
t.Parallel()
652485
ctx := t.Context()

multinode/node_lifecycle.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,9 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
136136
n.declareUnreachable()
137137
return
138138
}
139-
if outOfSync, liveNodes := n.isOutOfSyncWithPool(); outOfSync {
140-
if liveNodes < 1 && !n.isLoadBalancedRPC {
139+
if outOfSync, liveNodes := n.isOutOfSyncWithPool(); outOfSync {
140+
// note: there must be another live node for us to be out of sync
141+
if liveNodes < 1 && !n.isLoadBalancedRPC {
141142
lggr.Criticalf("RPC endpoint has fallen behind; %s %s", msgCannotDisable, msgDegradedState)
142143
continue
143144
}
@@ -163,8 +164,12 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
163164
// threshold amount of time, mark it broken
164165
lggr.Errorw(fmt.Sprintf("RPC endpoint detected out of sync; no new heads received for %s (last head received was %v)", noNewHeadsTimeoutThreshold, localHighestChainInfo.BlockNumber), "nodeState", n.getCachedState(), "latestReceivedBlockNumber", localHighestChainInfo.BlockNumber, "noNewHeadsTimeoutThreshold", noNewHeadsTimeoutThreshold)
165166
if n.poolInfoProvider != nil {
167+
// if its the only node and its not a proxy, keep waiting for sync (check LatestChainInfo)
168+
// if its a proxy, then declare out of sync and try reconnecting because proxy might return a healthier rpc
166169
if l, _ := n.poolInfoProvider.LatestChainInfo(n.name); l < 1 && !n.isLoadBalancedRPC {
167170
lggr.Criticalf("RPC endpoint detected out of sync; %s %s", msgCannotDisable, msgDegradedState)
171+
// We don't necessarily want to wait the full timeout to check again, we should
172+
// check regularly and log noisily in this state
168173
headsSub.ResetTimer(zombieNodeCheckInterval(noNewHeadsTimeoutThreshold))
169174
continue
170175
}
@@ -187,8 +192,12 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
187192
// threshold amount of time, mark it broken
188193
lggr.Errorw(fmt.Sprintf("RPC's finalized state is out of sync; no new finalized heads received for %s (last finalized head received was %v)", noNewFinalizedBlocksTimeoutThreshold, localHighestChainInfo.FinalizedBlockNumber), "latestReceivedBlockNumber", localHighestChainInfo.BlockNumber)
189194
if n.poolInfoProvider != nil {
195+
// if its the only node and its not a proxy, keep waiting for sync (check LatestChainInfo)
196+
// if its a proxy, then declare out of sync and try reconnecting because proxy might return a healthier rpc
190197
if l, _ := n.poolInfoProvider.LatestChainInfo(n.name); l < 1 && !n.isLoadBalancedRPC {
191198
lggr.Criticalf("RPC's finalized state is out of sync; %s %s", msgCannotDisable, msgDegradedState)
199+
// We don't necessarily want to wait the full timeout to check again, we should
200+
// check regularly and log noisily in this state
192201
finalizedHeadsSub.ResetTimer(zombieNodeCheckInterval(noNewFinalizedBlocksTimeoutThreshold))
193202
continue
194203
}
@@ -452,6 +461,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(syncIssues syncStatus) {
452461
if n.poolInfoProvider != nil {
453462
if l, _ := n.poolInfoProvider.LatestChainInfo(n.name); l < 1 {
454463
if n.isLoadBalancedRPC {
464+
// in case all rpcs behind a load balanced rpc are out of sync, we need to declare out of sync to prevent false transition to alive
455465
n.declareOutOfSync(syncIssues)
456466
return
457467
}

0 commit comments

Comments
 (0)