Skip to content

Commit e82bc07

Browse files
Add RandomRPC node selection mode (#84)
* implement random rpc * fix linter issues * fix linter issues * update
1 parent d78c798 commit e82bc07

5 files changed

Lines changed: 265 additions & 8 deletions

File tree

multinode/multi_node.go

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ func (c *MultiNode[CHAIN_ID, RPC]) start(ctx context.Context) error {
164164
}
165165
c.eng.Go(c.runLoop)
166166

167-
if c.leaseDuration.Seconds() > 0 && c.selectionMode != NodeSelectionModeRoundRobin {
167+
if c.leaseDuration.Seconds() > 0 && c.selectionMode != NodeSelectionModeRoundRobin && c.selectionMode != NodeSelectionModeRandomRPC {
168168
c.lggr.Infof("The MultiNode will switch to best node every %s", c.leaseDuration.String())
169169
c.eng.Go(c.checkLeaseLoop)
170170
} else {
@@ -192,6 +192,10 @@ func (c *MultiNode[CHAIN_ID, RPC]) SelectRPC(ctx context.Context) (rpc RPC, err
192192

193193
// selectNode returns the active Node, if it is still nodeStateAlive, otherwise it selects a new one from the NodeSelector.
194194
func (c *MultiNode[CHAIN_ID, RPC]) selectNode(ctx context.Context) (node Node[CHAIN_ID, RPC], err error) {
195+
if c.selectionMode == NodeSelectionModeRandomRPC {
196+
return c.awaitNodeSelection(ctx)
197+
}
198+
195199
c.activeMu.RLock()
196200
node = c.activeNode
197201
c.activeMu.RUnlock()
@@ -213,15 +217,26 @@ func (c *MultiNode[CHAIN_ID, RPC]) selectNode(ctx context.Context) (node Node[CH
213217
c.activeNode.UnsubscribeAllExceptAliveLoop()
214218
}
215219

220+
c.activeNode, err = c.awaitNodeSelection(ctx)
221+
if err != nil {
222+
return nil, err
223+
}
224+
225+
c.lggr.Debugw("Switched to a new active node due to prev node heath issues", "prevNode", prevNodeName, "newNode", c.activeNode.String())
226+
return c.activeNode, err
227+
}
228+
229+
// awaitNodeSelection blocks until nodeSelector returns a live node or all nodes
230+
// finish initializing. Returns ErrNodeError when no live nodes are available.
231+
func (c *MultiNode[CHAIN_ID, RPC]) awaitNodeSelection(ctx context.Context) (Node[CHAIN_ID, RPC], error) {
216232
for {
217-
c.activeNode = c.nodeSelector.Select()
218-
if c.activeNode != nil {
219-
break
233+
node := c.nodeSelector.Select()
234+
if node != nil {
235+
return node, nil
220236
}
221237
if slices.ContainsFunc(c.primaryNodes, func(n Node[CHAIN_ID, RPC]) bool {
222238
return n.State().isInitializing()
223239
}) {
224-
// initial dial still in-progress - retry until done
225240
select {
226241
case <-ctx.Done():
227242
return nil, ctx.Err()
@@ -233,9 +248,6 @@ func (c *MultiNode[CHAIN_ID, RPC]) selectNode(ctx context.Context) (node Node[CH
233248
c.eng.EmitHealthErr(fmt.Errorf("no live nodes available for chain %s", c.chainID.String()))
234249
return nil, ErrNodeError
235250
}
236-
237-
c.lggr.Debugw("Switched to a new active node due to prev node heath issues", "prevNode", prevNodeName, "newNode", c.activeNode.String())
238-
return c.activeNode, err
239251
}
240252

241253
// LatestChainInfo - returns number of live nodes available in the pool, so we can prevent the last alive node in a pool from being marked as out-of-sync.

multinode/multi_node_test.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,101 @@ func TestMultiNode_selectNode(t *testing.T) {
406406
})
407407
}
408408

409+
func TestMultiNode_RandomRPC(t *testing.T) {
410+
t.Parallel()
411+
t.Run("RandomRPC disables lease check", func(t *testing.T) {
412+
t.Parallel()
413+
chainID := RandomID()
414+
node := newHealthyNode(t, chainID)
415+
lggr, observedLogs := logger.TestObserved(t, zap.InfoLevel)
416+
mn := newTestMultiNode(t, multiNodeOpts{
417+
selectionMode: NodeSelectionModeRandomRPC,
418+
chainID: chainID,
419+
logger: lggr,
420+
nodes: []Node[ID, multiNodeRPCClient]{node},
421+
})
422+
servicetest.Run(t, mn)
423+
tests.RequireLogMessage(t, observedLogs, "Best node switching is disabled")
424+
})
425+
t.Run("RandomRPC is non-sticky, calls Select on every invocation", func(t *testing.T) {
426+
t.Parallel()
427+
ctx := t.Context()
428+
chainID := RandomID()
429+
node1 := newMockNode[ID, multiNodeRPCClient](t)
430+
node1.On("State").Return(nodeStateAlive).Maybe()
431+
node1.On("String").Return("node1").Maybe()
432+
node2 := newMockNode[ID, multiNodeRPCClient](t)
433+
node2.On("State").Return(nodeStateAlive).Maybe()
434+
node2.On("String").Return("node2").Maybe()
435+
mn := newTestMultiNode(t, multiNodeOpts{
436+
selectionMode: NodeSelectionModeRandomRPC,
437+
chainID: chainID,
438+
nodes: []Node[ID, multiNodeRPCClient]{node1, node2},
439+
})
440+
nodeSelector := newMockNodeSelector[ID, multiNodeRPCClient](t)
441+
nodeSelector.On("Select").Return(node1).Once()
442+
nodeSelector.On("Select").Return(node2).Once()
443+
mn.nodeSelector = nodeSelector
444+
445+
first, err := mn.selectNode(ctx)
446+
require.NoError(t, err)
447+
assert.Same(t, node1, first)
448+
449+
second, err := mn.selectNode(ctx)
450+
require.NoError(t, err)
451+
assert.Same(t, node2, second)
452+
})
453+
t.Run("RandomRPC does not unsubscribe previous node on selection", func(t *testing.T) {
454+
t.Parallel()
455+
ctx := t.Context()
456+
chainID := RandomID()
457+
node1 := newMockNode[ID, multiNodeRPCClient](t)
458+
node1.On("State").Return(nodeStateAlive).Maybe()
459+
node1.On("String").Return("node1").Maybe()
460+
node2 := newMockNode[ID, multiNodeRPCClient](t)
461+
node2.On("State").Return(nodeStateAlive).Maybe()
462+
node2.On("String").Return("node2").Maybe()
463+
mn := newTestMultiNode(t, multiNodeOpts{
464+
selectionMode: NodeSelectionModeRandomRPC,
465+
chainID: chainID,
466+
nodes: []Node[ID, multiNodeRPCClient]{node1, node2},
467+
})
468+
nodeSelector := newMockNodeSelector[ID, multiNodeRPCClient](t)
469+
nodeSelector.On("Select").Return(node1).Once()
470+
nodeSelector.On("Select").Return(node2).Once()
471+
mn.nodeSelector = nodeSelector
472+
473+
_, err := mn.selectNode(ctx)
474+
require.NoError(t, err)
475+
_, err = mn.selectNode(ctx)
476+
require.NoError(t, err)
477+
478+
// UnsubscribeAllExceptAliveLoop must NOT have been called on either node.
479+
// mockNode would fail the test if an unexpected call was made.
480+
node1.AssertNotCalled(t, "UnsubscribeAllExceptAliveLoop")
481+
node2.AssertNotCalled(t, "UnsubscribeAllExceptAliveLoop")
482+
})
483+
t.Run("RandomRPC reports error when no nodes available", func(t *testing.T) {
484+
t.Parallel()
485+
ctx := t.Context()
486+
chainID := RandomID()
487+
lggr, observedLogs := logger.TestObserved(t, zap.InfoLevel)
488+
mn := newTestMultiNode(t, multiNodeOpts{
489+
selectionMode: NodeSelectionModeRandomRPC,
490+
chainID: chainID,
491+
logger: lggr,
492+
})
493+
nodeSelector := newMockNodeSelector[ID, multiNodeRPCClient](t)
494+
nodeSelector.On("Select").Return(nil).Once()
495+
nodeSelector.On("Name").Return("MockedNodeSelector").Once()
496+
mn.nodeSelector = nodeSelector
497+
node, err := mn.selectNode(ctx)
498+
require.EqualError(t, err, ErrNodeError.Error())
499+
require.Nil(t, node)
500+
tests.RequireLogMessage(t, observedLogs, "No live RPC nodes available")
501+
})
502+
}
503+
409504
func TestMultiNode_ChainInfo(t *testing.T) {
410505
t.Parallel()
411506
type nodeParams struct {

multinode/node_selector.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ const (
99
NodeSelectionModeRoundRobin = "RoundRobin"
1010
NodeSelectionModeTotalDifficulty = "TotalDifficulty"
1111
NodeSelectionModePriorityLevel = "PriorityLevel"
12+
NodeSelectionModeRandomRPC = "RandomRPC"
1213
)
1314

1415
type NodeSelector[
@@ -35,6 +36,8 @@ func newNodeSelector[
3536
return NewTotalDifficultyNodeSelector[CHAIN_ID, RPC](nodes)
3637
case NodeSelectionModePriorityLevel:
3738
return NewPriorityLevelNodeSelector[CHAIN_ID, RPC](nodes)
39+
case NodeSelectionModeRandomRPC:
40+
return NewRandomRPCSelector[CHAIN_ID, RPC](nodes)
3841
default:
3942
panic(fmt.Sprintf("unsupported NodeSelectionMode: %s", selectionMode))
4043
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package multinode
2+
3+
import (
4+
"math/rand/v2"
5+
)
6+
7+
type randomRPCSelector[
8+
CHAIN_ID ID,
9+
RPC any,
10+
] struct {
11+
nodes []Node[CHAIN_ID, RPC]
12+
}
13+
14+
func NewRandomRPCSelector[
15+
CHAIN_ID ID,
16+
RPC any,
17+
](nodes []Node[CHAIN_ID, RPC]) NodeSelector[CHAIN_ID, RPC] {
18+
return &randomRPCSelector[CHAIN_ID, RPC]{
19+
nodes: nodes,
20+
}
21+
}
22+
23+
func (s *randomRPCSelector[CHAIN_ID, RPC]) Select() Node[CHAIN_ID, RPC] {
24+
var liveNodes []Node[CHAIN_ID, RPC]
25+
for _, n := range s.nodes {
26+
if n.State() == nodeStateAlive {
27+
liveNodes = append(liveNodes, n)
28+
} else {
29+
n.UnsubscribeAllExceptAliveLoop()
30+
}
31+
}
32+
33+
if len(liveNodes) == 0 {
34+
return nil
35+
}
36+
37+
// #nosec G404
38+
return liveNodes[rand.IntN(len(liveNodes))]
39+
}
40+
41+
func (s *randomRPCSelector[CHAIN_ID, RPC]) Name() string {
42+
return NodeSelectionModeRandomRPC
43+
}
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
package multinode
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
)
8+
9+
func TestRandomRPCNodeSelectorName(t *testing.T) {
10+
selector := newNodeSelector[ID, RPCClient[ID, Head]](NodeSelectionModeRandomRPC, nil)
11+
assert.Equal(t, NodeSelectionModeRandomRPC, selector.Name())
12+
}
13+
14+
func TestRandomRPCNodeSelector(t *testing.T) {
15+
t.Parallel()
16+
17+
type nodeClient RPCClient[ID, Head]
18+
var nodes []Node[ID, nodeClient]
19+
20+
for i := 0; i < 3; i++ {
21+
node := newMockNode[ID, nodeClient](t)
22+
if i == 0 {
23+
node.On("State").Return(nodeStateOutOfSync)
24+
node.On("UnsubscribeAllExceptAliveLoop")
25+
} else {
26+
node.On("State").Return(nodeStateAlive)
27+
}
28+
nodes = append(nodes, node)
29+
}
30+
31+
selector := newNodeSelector(NodeSelectionModeRandomRPC, nodes)
32+
33+
// All selections should be from alive nodes only
34+
for i := 0; i < 20; i++ {
35+
selected := selector.Select()
36+
assert.NotNil(t, selected)
37+
assert.Contains(t, []Node[ID, nodeClient]{nodes[1], nodes[2]}, selected)
38+
}
39+
}
40+
41+
func TestRandomRPCNodeSelector_None(t *testing.T) {
42+
t.Parallel()
43+
44+
type nodeClient RPCClient[ID, Head]
45+
var nodes []Node[ID, nodeClient]
46+
47+
for i := 0; i < 3; i++ {
48+
node := newMockNode[ID, nodeClient](t)
49+
if i == 0 {
50+
node.On("State").Return(nodeStateOutOfSync)
51+
} else {
52+
node.On("State").Return(nodeStateUnreachable)
53+
}
54+
node.On("UnsubscribeAllExceptAliveLoop")
55+
nodes = append(nodes, node)
56+
}
57+
58+
selector := newNodeSelector(NodeSelectionModeRandomRPC, nodes)
59+
assert.Nil(t, selector.Select())
60+
}
61+
62+
func TestRandomRPCNodeSelector_Distribution(t *testing.T) {
63+
t.Parallel()
64+
65+
type nodeClient RPCClient[ID, Head]
66+
var nodes []Node[ID, nodeClient]
67+
68+
const nAlive = 3
69+
for i := 0; i < nAlive; i++ {
70+
node := newMockNode[ID, nodeClient](t)
71+
node.On("State").Return(nodeStateAlive)
72+
nodes = append(nodes, node)
73+
}
74+
75+
selector := newNodeSelector(NodeSelectionModeRandomRPC, nodes)
76+
77+
const iterations = 1000
78+
counts := make(map[Node[ID, nodeClient]]int, nAlive)
79+
for i := 0; i < iterations; i++ {
80+
selected := selector.Select()
81+
assert.NotNil(t, selected)
82+
counts[selected]++
83+
}
84+
85+
// Each node should be selected at least once with overwhelming probability
86+
for _, n := range nodes {
87+
assert.Positive(t, counts[n], "expected every alive node to be selected at least once")
88+
}
89+
}
90+
91+
func TestRandomRPCNodeSelector_SingleNode(t *testing.T) {
92+
t.Parallel()
93+
94+
type nodeClient RPCClient[ID, Head]
95+
96+
node := newMockNode[ID, nodeClient](t)
97+
node.On("State").Return(nodeStateAlive)
98+
99+
selector := newNodeSelector(NodeSelectionModeRandomRPC, []Node[ID, nodeClient]{node})
100+
101+
for i := 0; i < 5; i++ {
102+
assert.Same(t, node, selector.Select())
103+
}
104+
}

0 commit comments

Comments
 (0)