Skip to content

Commit e242962

Browse files
committed
DF-23489 unreachable node recovery depends on successful polls
1 parent 8817033 commit e242962

6 files changed

Lines changed: 163 additions & 1 deletion

File tree

multinode/README.md

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,32 @@ Manages all nodes performing node selection and load balancing, health checks an
1919
Used to poll for new heads and finalized heads within subscriptions.
2020

2121
### Transaction Sender
22-
Used to send transactions to all healthy RPCs and aggregate the results.
22+
Used to send transactions to all healthy RPCs and aggregate the results.
23+
24+
## States diagram
25+
26+
```mermaid
27+
graph TD
28+
Undialed --> Dialed
29+
Undialed --> Unreachable
30+
Dialed --> Alive
31+
Dialed --> InvalidChainID
32+
Dialed --> Syncing
33+
Dialed --> Unreachable
34+
Alive --> OutOfSync
35+
Alive --> Unreachable
36+
OutOfSync --> Alive
37+
OutOfSync --> InvalidChainID
38+
OutOfSync --> Syncing
39+
OutOfSync --> Unreachable
40+
InvalidChainID --> Alive
41+
InvalidChainID --> Syncing
42+
InvalidChainID --> Unreachable
43+
Syncing --> Alive
44+
Syncing --> OutOfSync
45+
Syncing --> InvalidChainID
46+
Syncing --> Unreachable
47+
Unreachable --> Dialed
48+
Unusable:::terminal
49+
Closed:::terminal
50+
```

multinode/config/config.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ type MultiNode struct {
1717

1818
// Node Configs
1919
PollFailureThreshold *uint32
20+
PollSuccessThreshold *uint32
2021
PollInterval *config.Duration
2122
SelectionMode *string
2223
SyncThreshold *uint32
@@ -44,6 +45,10 @@ func (c *MultiNodeConfig) PollFailureThreshold() uint32 {
4445
return *c.MultiNode.PollFailureThreshold
4546
}
4647

48+
func (c *MultiNodeConfig) PollSuccessThreshold() uint32 {
49+
return *c.MultiNode.PollSuccessThreshold
50+
}
51+
4752
func (c *MultiNodeConfig) PollInterval() time.Duration {
4853
return c.MultiNode.PollInterval.Duration()
4954
}
@@ -103,6 +108,9 @@ func (c *MultiNodeConfig) SetFrom(f *MultiNodeConfig) {
103108
if f.MultiNode.PollFailureThreshold != nil {
104109
c.MultiNode.PollFailureThreshold = f.MultiNode.PollFailureThreshold
105110
}
111+
if f.MultiNode.PollSuccessThreshold != nil {
112+
c.MultiNode.PollSuccessThreshold = f.MultiNode.PollSuccessThreshold
113+
}
106114
if f.MultiNode.PollInterval != nil {
107115
c.MultiNode.PollInterval = f.MultiNode.PollInterval
108116
}

multinode/node.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ var errInvalidChainID = errors.New("invalid chain id")
1818

1919
type NodeConfig interface {
2020
PollFailureThreshold() uint32
21+
PollSuccessThreshold() uint32
2122
PollInterval() time.Duration
2223
SelectionMode() string
2324
SyncThreshold() uint32

multinode/node_lifecycle.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -527,6 +527,39 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(syncIssues syncStatus) {
527527
}
528528
}
529529

530+
// probeUntilStable polls the node PollSuccessThreshold consecutive times before allowing it back into
531+
// the alive pool. Returns true if all probes pass, false if any probe fails or ctx is cancelled.
532+
// When threshold is 0 the probe is disabled and the function returns true immediately.
533+
func (n *node[CHAIN_ID, HEAD, RPC]) probeUntilStable(ctx context.Context, lggr logger.Logger) bool {
534+
threshold := n.nodePoolCfg.PollSuccessThreshold()
535+
if threshold == 0 {
536+
return true
537+
}
538+
pollInterval := n.nodePoolCfg.PollInterval()
539+
var successes uint32
540+
for successes < threshold {
541+
select {
542+
case <-ctx.Done():
543+
return false
544+
case <-time.After(pollInterval):
545+
}
546+
n.metrics.IncrementPolls(ctx, n.name)
547+
pollCtx, cancel := context.WithTimeout(ctx, pollInterval)
548+
version, err := n.RPC().ClientVersion(pollCtx)
549+
cancel()
550+
if err != nil {
551+
n.metrics.IncrementPollsFailed(ctx, n.name)
552+
lggr.Warnw("Recovery probe poll failed; restarting redial", "err", err, "successesSoFar", successes, "threshold", threshold)
553+
return false
554+
}
555+
n.metrics.IncrementPollsSuccess(ctx, n.name)
556+
n.metrics.RecordNodeClientVersion(ctx, n.name, version)
557+
successes++
558+
lggr.Debugw("Recovery probe poll succeeded", "successes", successes, "threshold", threshold)
559+
}
560+
return true
561+
}
562+
530563
func (n *node[CHAIN_ID, HEAD, RPC]) unreachableLoop() {
531564
defer n.wg.Done()
532565
ctx, cancel := n.newCtx()
@@ -572,6 +605,11 @@ func (n *node[CHAIN_ID, HEAD, RPC]) unreachableLoop() {
572605
n.setState(nodeStateUnreachable)
573606
continue
574607
case nodeStateAlive:
608+
if !n.probeUntilStable(ctx, lggr) {
609+
n.rpc.Close()
610+
n.setState(nodeStateUnreachable)
611+
continue
612+
}
575613
lggr.Infow(fmt.Sprintf("Successfully redialled and verified RPC node %s. Node was offline for %s", n.String(), time.Since(unreachableAt)), "nodeState", n.getCachedState())
576614
fallthrough
577615
default:

multinode/node_lifecycle_test.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1495,6 +1495,88 @@ func TestUnit_NodeLifecycle_unreachableLoop(t *testing.T) {
14951495
return node.State() == nodeStateAlive
14961496
})
14971497
})
1498+
t.Run("with PollSuccessThreshold set, without isSyncing, node becomes alive once all probe polls succeed", func(t *testing.T) {
1499+
t.Parallel()
1500+
rpc := newMockRPCClient[ID, Head](t)
1501+
nodeChainID := RandomID()
1502+
const pollSuccessThreshold = 2
1503+
node := newAliveNode(t, testNodeOpts{
1504+
rpc: rpc,
1505+
chainID: nodeChainID,
1506+
config: testNodeConfig{
1507+
pollSuccessThreshold: pollSuccessThreshold,
1508+
pollInterval: tests.TestInterval,
1509+
},
1510+
})
1511+
defer func() { assert.NoError(t, node.close()) }()
1512+
1513+
rpc.On("Dial", mock.Anything).Return(nil).Once()
1514+
rpc.On("ChainID", mock.Anything).Return(nodeChainID, nil).Once()
1515+
rpc.On("ClientVersion", mock.Anything).Return("", nil).Twice()
1516+
setupRPCForAliveLoop(t, rpc)
1517+
1518+
node.declareUnreachable()
1519+
tests.AssertEventually(t, func() bool {
1520+
return node.State() == nodeStateAlive
1521+
})
1522+
})
1523+
t.Run("with PollSuccessThreshold set, node becomes alive once all probe polls succeed", func(t *testing.T) {
1524+
t.Parallel()
1525+
rpc := newMockRPCClient[ID, Head](t)
1526+
nodeChainID := RandomID()
1527+
const pollSuccessThreshold = 2
1528+
node := newAliveNode(t, testNodeOpts{
1529+
rpc: rpc,
1530+
chainID: nodeChainID,
1531+
config: testNodeConfig{
1532+
nodeIsSyncingEnabled: true,
1533+
pollSuccessThreshold: pollSuccessThreshold,
1534+
pollInterval: tests.TestInterval,
1535+
},
1536+
})
1537+
defer func() { assert.NoError(t, node.close()) }()
1538+
1539+
rpc.On("Dial", mock.Anything).Return(nil).Once()
1540+
rpc.On("ChainID", mock.Anything).Return(nodeChainID, nil).Once()
1541+
rpc.On("IsSyncing", mock.Anything).Return(false, nil)
1542+
rpc.On("ClientVersion", mock.Anything).Return("", nil).Twice()
1543+
setupRPCForAliveLoop(t, rpc)
1544+
1545+
node.declareUnreachable()
1546+
tests.AssertEventually(t, func() bool {
1547+
return node.State() == nodeStateAlive
1548+
})
1549+
})
1550+
t.Run("with PollSuccessThreshold set, probe poll failure keeps node unreachable and restarts redial", func(t *testing.T) {
1551+
t.Parallel()
1552+
rpc := newMockRPCClient[ID, Head](t)
1553+
nodeChainID := RandomID()
1554+
lggr, observedLogs := logger.TestObserved(t, zap.WarnLevel)
1555+
const pollSuccessThreshold = 2
1556+
node := newAliveNode(t, testNodeOpts{
1557+
rpc: rpc,
1558+
chainID: nodeChainID,
1559+
lggr: lggr,
1560+
config: testNodeConfig{
1561+
pollSuccessThreshold: pollSuccessThreshold,
1562+
pollInterval: tests.TestInterval,
1563+
},
1564+
})
1565+
defer func() { assert.NoError(t, node.close()) }()
1566+
1567+
rpc.On("Dial", mock.Anything).Return(nil).Once()
1568+
rpc.On("ChainID", mock.Anything).Return(nodeChainID, nil).Once()
1569+
rpc.On("ClientVersion", mock.Anything).Return("", nil).Once()
1570+
rpc.On("ClientVersion", mock.Anything).Return("", errors.New("probe poll failed")).Once()
1571+
// after the probe aborts, rpc.Close() is called and the redial backoff fires again; keep failing
1572+
rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial"))
1573+
// guard: if current code (no probe) enters aliveLoop, fail the subscribe so the node returns to unreachable
1574+
rpc.On("SubscribeToHeads", mock.Anything).Return(nil, nil, errors.New("unexpected")).Maybe()
1575+
1576+
node.declareUnreachable()
1577+
tests.AssertLogEventually(t, observedLogs, "Recovery probe poll failed; restarting redial")
1578+
assert.Equal(t, nodeStateUnreachable, node.State())
1579+
})
14981580
}
14991581

15001582
func TestUnit_NodeLifecycle_invalidChainIDLoop(t *testing.T) {

multinode/node_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616

1717
type testNodeConfig struct {
1818
pollFailureThreshold uint32
19+
pollSuccessThreshold uint32
1920
pollInterval time.Duration
2021
selectionMode string
2122
syncThreshold uint32
@@ -34,6 +35,10 @@ func (n testNodeConfig) PollFailureThreshold() uint32 {
3435
return n.pollFailureThreshold
3536
}
3637

38+
func (n testNodeConfig) PollSuccessThreshold() uint32 {
39+
return n.pollSuccessThreshold
40+
}
41+
3742
func (n testNodeConfig) PollInterval() time.Duration {
3843
return n.pollInterval
3944
}

0 commit comments

Comments
 (0)