Skip to content

Commit 327af1e

Browse files
implement random rpc
1 parent d78c798 commit 327af1e

5 files changed

Lines changed: 269 additions & 1 deletion

File tree

multinode/multi_node.go

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ func (c *MultiNode[CHAIN_ID, RPC]) start(ctx context.Context) error {
164164
}
165165
c.eng.Go(c.runLoop)
166166

167-
if c.leaseDuration.Seconds() > 0 && c.selectionMode != NodeSelectionModeRoundRobin {
167+
if c.leaseDuration.Seconds() > 0 && c.selectionMode != NodeSelectionModeRoundRobin && c.selectionMode != NodeSelectionModeRandomRPC {
168168
c.lggr.Infof("The MultiNode will switch to best node every %s", c.leaseDuration.String())
169169
c.eng.Go(c.checkLeaseLoop)
170170
} else {
@@ -192,6 +192,10 @@ func (c *MultiNode[CHAIN_ID, RPC]) SelectRPC(ctx context.Context) (rpc RPC, err
192192

193193
// selectNode returns the active Node, if it is still nodeStateAlive, otherwise it selects a new one from the NodeSelector.
194194
func (c *MultiNode[CHAIN_ID, RPC]) selectNode(ctx context.Context) (node Node[CHAIN_ID, RPC], err error) {
195+
if c.selectionMode == NodeSelectionModeRandomRPC {
196+
return c.selectRandomRPCNode(ctx)
197+
}
198+
195199
c.activeMu.RLock()
196200
node = c.activeNode
197201
c.activeMu.RUnlock()
@@ -238,6 +242,30 @@ func (c *MultiNode[CHAIN_ID, RPC]) selectNode(ctx context.Context) (node Node[CH
238242
return c.activeNode, err
239243
}
240244

245+
// selectRandomRPCNode picks a random healthy node on every call without caching
246+
// or terminating existing subscriptions on other nodes.
247+
func (c *MultiNode[CHAIN_ID, RPC]) selectRandomRPCNode(ctx context.Context) (Node[CHAIN_ID, RPC], error) {
248+
for {
249+
node := c.nodeSelector.Select()
250+
if node != nil {
251+
return node, nil
252+
}
253+
if slices.ContainsFunc(c.primaryNodes, func(n Node[CHAIN_ID, RPC]) bool {
254+
return n.State().isInitializing()
255+
}) {
256+
select {
257+
case <-ctx.Done():
258+
return nil, ctx.Err()
259+
case <-time.After(100 * time.Millisecond):
260+
continue
261+
}
262+
}
263+
c.lggr.Criticalw("No live RPC nodes available", "NodeSelectionMode", c.nodeSelector.Name())
264+
c.eng.EmitHealthErr(fmt.Errorf("no live nodes available for chain %s", c.chainID.String()))
265+
return nil, ErrNodeError
266+
}
267+
}
268+
241269
// LatestChainInfo - returns number of live nodes available in the pool, so we can prevent the last alive node in a pool from being marked as out-of-sync.
242270
// Return highest ChainInfo most recently received by the alive nodes.
243271
// E.g. If Node A's the most recent block is 10 and highest 15 and for Node B it's - 12 and 14. This method will return 12.

multinode/multi_node_test.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,101 @@ func TestMultiNode_selectNode(t *testing.T) {
406406
})
407407
}
408408

409+
func TestMultiNode_RandomRPC(t *testing.T) {
410+
t.Parallel()
411+
t.Run("RandomRPC disables lease check", func(t *testing.T) {
412+
t.Parallel()
413+
chainID := RandomID()
414+
node := newHealthyNode(t, chainID)
415+
lggr, observedLogs := logger.TestObserved(t, zap.InfoLevel)
416+
mn := newTestMultiNode(t, multiNodeOpts{
417+
selectionMode: NodeSelectionModeRandomRPC,
418+
chainID: chainID,
419+
logger: lggr,
420+
nodes: []Node[ID, multiNodeRPCClient]{node},
421+
})
422+
servicetest.Run(t, mn)
423+
tests.RequireLogMessage(t, observedLogs, "Best node switching is disabled")
424+
})
425+
t.Run("RandomRPC is non-sticky, calls Select on every invocation", func(t *testing.T) {
426+
t.Parallel()
427+
ctx := tests.Context(t)
428+
chainID := RandomID()
429+
node1 := newMockNode[ID, multiNodeRPCClient](t)
430+
node1.On("State").Return(nodeStateAlive).Maybe()
431+
node1.On("String").Return("node1").Maybe()
432+
node2 := newMockNode[ID, multiNodeRPCClient](t)
433+
node2.On("State").Return(nodeStateAlive).Maybe()
434+
node2.On("String").Return("node2").Maybe()
435+
mn := newTestMultiNode(t, multiNodeOpts{
436+
selectionMode: NodeSelectionModeRandomRPC,
437+
chainID: chainID,
438+
nodes: []Node[ID, multiNodeRPCClient]{node1, node2},
439+
})
440+
nodeSelector := newMockNodeSelector[ID, multiNodeRPCClient](t)
441+
nodeSelector.On("Select").Return(node1).Once()
442+
nodeSelector.On("Select").Return(node2).Once()
443+
mn.nodeSelector = nodeSelector
444+
445+
first, err := mn.selectNode(ctx)
446+
require.NoError(t, err)
447+
assert.Same(t, node1, first)
448+
449+
second, err := mn.selectNode(ctx)
450+
require.NoError(t, err)
451+
assert.Same(t, node2, second)
452+
})
453+
t.Run("RandomRPC does not unsubscribe previous node on selection", func(t *testing.T) {
454+
t.Parallel()
455+
ctx := tests.Context(t)
456+
chainID := RandomID()
457+
node1 := newMockNode[ID, multiNodeRPCClient](t)
458+
node1.On("State").Return(nodeStateAlive).Maybe()
459+
node1.On("String").Return("node1").Maybe()
460+
node2 := newMockNode[ID, multiNodeRPCClient](t)
461+
node2.On("State").Return(nodeStateAlive).Maybe()
462+
node2.On("String").Return("node2").Maybe()
463+
mn := newTestMultiNode(t, multiNodeOpts{
464+
selectionMode: NodeSelectionModeRandomRPC,
465+
chainID: chainID,
466+
nodes: []Node[ID, multiNodeRPCClient]{node1, node2},
467+
})
468+
nodeSelector := newMockNodeSelector[ID, multiNodeRPCClient](t)
469+
nodeSelector.On("Select").Return(node1).Once()
470+
nodeSelector.On("Select").Return(node2).Once()
471+
mn.nodeSelector = nodeSelector
472+
473+
_, err := mn.selectNode(ctx)
474+
require.NoError(t, err)
475+
_, err = mn.selectNode(ctx)
476+
require.NoError(t, err)
477+
478+
// UnsubscribeAllExceptAliveLoop must NOT have been called on either node.
479+
// mockNode would fail the test if an unexpected call was made.
480+
node1.AssertNotCalled(t, "UnsubscribeAllExceptAliveLoop")
481+
node2.AssertNotCalled(t, "UnsubscribeAllExceptAliveLoop")
482+
})
483+
t.Run("RandomRPC reports error when no nodes available", func(t *testing.T) {
484+
t.Parallel()
485+
ctx := tests.Context(t)
486+
chainID := RandomID()
487+
lggr, observedLogs := logger.TestObserved(t, zap.InfoLevel)
488+
mn := newTestMultiNode(t, multiNodeOpts{
489+
selectionMode: NodeSelectionModeRandomRPC,
490+
chainID: chainID,
491+
logger: lggr,
492+
})
493+
nodeSelector := newMockNodeSelector[ID, multiNodeRPCClient](t)
494+
nodeSelector.On("Select").Return(nil).Once()
495+
nodeSelector.On("Name").Return("MockedNodeSelector").Once()
496+
mn.nodeSelector = nodeSelector
497+
node, err := mn.selectNode(ctx)
498+
require.EqualError(t, err, ErrNodeError.Error())
499+
require.Nil(t, node)
500+
tests.RequireLogMessage(t, observedLogs, "No live RPC nodes available")
501+
})
502+
}
503+
409504
func TestMultiNode_ChainInfo(t *testing.T) {
410505
t.Parallel()
411506
type nodeParams struct {

multinode/node_selector.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ const (
99
NodeSelectionModeRoundRobin = "RoundRobin"
1010
NodeSelectionModeTotalDifficulty = "TotalDifficulty"
1111
NodeSelectionModePriorityLevel = "PriorityLevel"
12+
NodeSelectionModeRandomRPC = "RandomRPC"
1213
)
1314

1415
type NodeSelector[
@@ -35,6 +36,8 @@ func newNodeSelector[
3536
return NewTotalDifficultyNodeSelector[CHAIN_ID, RPC](nodes)
3637
case NodeSelectionModePriorityLevel:
3738
return NewPriorityLevelNodeSelector[CHAIN_ID, RPC](nodes)
39+
case NodeSelectionModeRandomRPC:
40+
return NewRandomRPCSelector[CHAIN_ID, RPC](nodes)
3841
default:
3942
panic(fmt.Sprintf("unsupported NodeSelectionMode: %s", selectionMode))
4043
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package multinode
2+
3+
import (
4+
"math/rand/v2"
5+
)
6+
7+
type randomRPCSelector[
8+
CHAIN_ID ID,
9+
RPC any,
10+
] struct {
11+
nodes []Node[CHAIN_ID, RPC]
12+
}
13+
14+
func NewRandomRPCSelector[
15+
CHAIN_ID ID,
16+
RPC any,
17+
](nodes []Node[CHAIN_ID, RPC]) NodeSelector[CHAIN_ID, RPC] {
18+
return &randomRPCSelector[CHAIN_ID, RPC]{
19+
nodes: nodes,
20+
}
21+
}
22+
23+
func (s *randomRPCSelector[CHAIN_ID, RPC]) Select() Node[CHAIN_ID, RPC] {
24+
var liveNodes []Node[CHAIN_ID, RPC]
25+
for _, n := range s.nodes {
26+
if n.State() == nodeStateAlive {
27+
liveNodes = append(liveNodes, n)
28+
}
29+
}
30+
31+
if len(liveNodes) == 0 {
32+
return nil
33+
}
34+
35+
return liveNodes[rand.IntN(len(liveNodes))]
36+
}
37+
38+
func (s *randomRPCSelector[CHAIN_ID, RPC]) Name() string {
39+
return NodeSelectionModeRandomRPC
40+
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
package multinode
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
)
8+
9+
func TestRandomRPCNodeSelectorName(t *testing.T) {
10+
selector := newNodeSelector[ID, RPCClient[ID, Head]](NodeSelectionModeRandomRPC, nil)
11+
assert.Equal(t, NodeSelectionModeRandomRPC, selector.Name())
12+
}
13+
14+
func TestRandomRPCNodeSelector(t *testing.T) {
15+
t.Parallel()
16+
17+
type nodeClient RPCClient[ID, Head]
18+
var nodes []Node[ID, nodeClient]
19+
20+
for i := 0; i < 3; i++ {
21+
node := newMockNode[ID, nodeClient](t)
22+
if i == 0 {
23+
node.On("State").Return(nodeStateOutOfSync)
24+
} else {
25+
node.On("State").Return(nodeStateAlive)
26+
}
27+
nodes = append(nodes, node)
28+
}
29+
30+
selector := newNodeSelector(NodeSelectionModeRandomRPC, nodes)
31+
32+
// All selections should be from alive nodes only
33+
for i := 0; i < 20; i++ {
34+
selected := selector.Select()
35+
assert.NotNil(t, selected)
36+
assert.Contains(t, []Node[ID, nodeClient]{nodes[1], nodes[2]}, selected)
37+
}
38+
}
39+
40+
func TestRandomRPCNodeSelector_None(t *testing.T) {
41+
t.Parallel()
42+
43+
type nodeClient RPCClient[ID, Head]
44+
var nodes []Node[ID, nodeClient]
45+
46+
for i := 0; i < 3; i++ {
47+
node := newMockNode[ID, nodeClient](t)
48+
if i == 0 {
49+
node.On("State").Return(nodeStateOutOfSync)
50+
} else {
51+
node.On("State").Return(nodeStateUnreachable)
52+
}
53+
nodes = append(nodes, node)
54+
}
55+
56+
selector := newNodeSelector(NodeSelectionModeRandomRPC, nodes)
57+
assert.Nil(t, selector.Select())
58+
}
59+
60+
func TestRandomRPCNodeSelector_Distribution(t *testing.T) {
61+
t.Parallel()
62+
63+
type nodeClient RPCClient[ID, Head]
64+
var nodes []Node[ID, nodeClient]
65+
66+
const nAlive = 3
67+
for i := 0; i < nAlive; i++ {
68+
node := newMockNode[ID, nodeClient](t)
69+
node.On("State").Return(nodeStateAlive)
70+
nodes = append(nodes, node)
71+
}
72+
73+
selector := newNodeSelector(NodeSelectionModeRandomRPC, nodes)
74+
75+
const iterations = 1000
76+
counts := make(map[Node[ID, nodeClient]]int, nAlive)
77+
for i := 0; i < iterations; i++ {
78+
selected := selector.Select()
79+
assert.NotNil(t, selected)
80+
counts[selected]++
81+
}
82+
83+
// Each node should be selected at least once with overwhelming probability
84+
for _, n := range nodes {
85+
assert.Greater(t, counts[n], 0, "expected every alive node to be selected at least once")
86+
}
87+
}
88+
89+
func TestRandomRPCNodeSelector_SingleNode(t *testing.T) {
90+
t.Parallel()
91+
92+
type nodeClient RPCClient[ID, Head]
93+
94+
node := newMockNode[ID, nodeClient](t)
95+
node.On("State").Return(nodeStateAlive)
96+
97+
selector := newNodeSelector(NodeSelectionModeRandomRPC, []Node[ID, nodeClient]{node})
98+
99+
for i := 0; i < 5; i++ {
100+
assert.Same(t, node, selector.Select())
101+
}
102+
}

0 commit comments

Comments
 (0)