Skip to content

Commit 6b8bcaa

Browse files
update
1 parent 5add104 commit 6b8bcaa

3 files changed

Lines changed: 11 additions & 23 deletions

File tree

multinode/multi_node.go

Lines changed: 7 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ func (c *MultiNode[CHAIN_ID, RPC]) SelectRPC(ctx context.Context) (rpc RPC, err
193193
// selectNode returns the active Node, if it is still nodeStateAlive, otherwise it selects a new one from the NodeSelector.
194194
func (c *MultiNode[CHAIN_ID, RPC]) selectNode(ctx context.Context) (node Node[CHAIN_ID, RPC], err error) {
195195
if c.selectionMode == NodeSelectionModeRandomRPC {
196-
return c.selectRandomRPCNode(ctx)
196+
return c.awaitNodeSelection(ctx)
197197
}
198198

199199
c.activeMu.RLock()
@@ -217,34 +217,18 @@ func (c *MultiNode[CHAIN_ID, RPC]) selectNode(ctx context.Context) (node Node[CH
217217
c.activeNode.UnsubscribeAllExceptAliveLoop()
218218
}
219219

220-
for {
221-
c.activeNode = c.nodeSelector.Select()
222-
if c.activeNode != nil {
223-
break
224-
}
225-
if slices.ContainsFunc(c.primaryNodes, func(n Node[CHAIN_ID, RPC]) bool {
226-
return n.State().isInitializing()
227-
}) {
228-
// initial dial still in-progress - retry until done
229-
select {
230-
case <-ctx.Done():
231-
return nil, ctx.Err()
232-
case <-time.After(100 * time.Millisecond):
233-
continue
234-
}
235-
}
236-
c.lggr.Criticalw("No live RPC nodes available", "NodeSelectionMode", c.nodeSelector.Name())
237-
c.eng.EmitHealthErr(fmt.Errorf("no live nodes available for chain %s", c.chainID.String()))
238-
return nil, ErrNodeError
220+
c.activeNode, err = c.awaitNodeSelection(ctx)
221+
if err != nil {
222+
return nil, err
239223
}
240224

241225
c.lggr.Debugw("Switched to a new active node due to prev node heath issues", "prevNode", prevNodeName, "newNode", c.activeNode.String())
242226
return c.activeNode, err
243227
}
244228

245-
// selectRandomRPCNode picks a random healthy node on every call without caching
246-
// or terminating existing subscriptions on other nodes.
247-
func (c *MultiNode[CHAIN_ID, RPC]) selectRandomRPCNode(ctx context.Context) (Node[CHAIN_ID, RPC], error) {
229+
// awaitNodeSelection blocks until nodeSelector returns a live node or all nodes
230+
// finish initializing. Returns ErrNodeError when no live nodes are available.
231+
func (c *MultiNode[CHAIN_ID, RPC]) awaitNodeSelection(ctx context.Context) (Node[CHAIN_ID, RPC], error) {
248232
for {
249233
node := c.nodeSelector.Select()
250234
if node != nil {

multinode/node_selector_random_rpc.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ func (s *randomRPCSelector[CHAIN_ID, RPC]) Select() Node[CHAIN_ID, RPC] {
2525
for _, n := range s.nodes {
2626
if n.State() == nodeStateAlive {
2727
liveNodes = append(liveNodes, n)
28+
} else {
29+
n.UnsubscribeAllExceptAliveLoop()
2830
}
2931
}
3032

multinode/node_selector_random_rpc_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ func TestRandomRPCNodeSelector(t *testing.T) {
2121
node := newMockNode[ID, nodeClient](t)
2222
if i == 0 {
2323
node.On("State").Return(nodeStateOutOfSync)
24+
node.On("UnsubscribeAllExceptAliveLoop")
2425
} else {
2526
node.On("State").Return(nodeStateAlive)
2627
}
@@ -50,6 +51,7 @@ func TestRandomRPCNodeSelector_None(t *testing.T) {
5051
} else {
5152
node.On("State").Return(nodeStateUnreachable)
5253
}
54+
node.On("UnsubscribeAllExceptAliveLoop")
5355
nodes = append(nodes, node)
5456
}
5557

0 commit comments

Comments
 (0)