Skip to content

Commit e83ad43

Browse files
committed
DF-23489 unreachable node recovery depends on successful polls
1 parent 3245ce4 commit e83ad43

6 files changed

Lines changed: 255 additions & 1 deletion

File tree

multinode/README.md

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,32 @@ Manages all nodes performing node selection and load balancing, health checks an
1919
Used to poll for new heads and finalized heads within subscriptions.
2020

2121
### Transaction Sender
22-
Used to send transactions to all healthy RPCs and aggregate the results.
22+
Used to send transactions to all healthy RPCs and aggregate the results.
23+
24+
## States diagram
25+
26+
```mermaid
27+
graph TD
28+
Undialed --> Dialed
29+
Undialed --> Unreachable
30+
Dialed --> Alive
31+
Dialed --> InvalidChainID
32+
Dialed --> Syncing
33+
Dialed --> Unreachable
34+
Alive --> OutOfSync
35+
Alive --> Unreachable
36+
OutOfSync --> Alive
37+
OutOfSync --> InvalidChainID
38+
OutOfSync --> Syncing
39+
OutOfSync --> Unreachable
40+
InvalidChainID --> Alive
41+
InvalidChainID --> Syncing
42+
InvalidChainID --> Unreachable
43+
Syncing --> Alive
44+
Syncing --> OutOfSync
45+
Syncing --> InvalidChainID
46+
Syncing --> Unreachable
47+
Unreachable --> Dialed
48+
Unusable:::terminal
49+
Closed:::terminal
50+
```

multinode/config/config.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ type MultiNode struct {
1717

1818
// Node Configs
1919
PollFailureThreshold *uint32
20+
PollSuccessThreshold *uint32
2021
PollInterval *config.Duration
2122
SelectionMode *string
2223
SyncThreshold *uint32
@@ -44,6 +45,10 @@ func (c *MultiNodeConfig) PollFailureThreshold() uint32 {
4445
return *c.MultiNode.PollFailureThreshold
4546
}
4647

48+
func (c *MultiNodeConfig) PollSuccessThreshold() uint32 {
49+
return *c.MultiNode.PollSuccessThreshold
50+
}
51+
4752
func (c *MultiNodeConfig) PollInterval() time.Duration {
4853
return c.MultiNode.PollInterval.Duration()
4954
}
@@ -103,6 +108,9 @@ func (c *MultiNodeConfig) SetFrom(f *MultiNodeConfig) {
103108
if f.MultiNode.PollFailureThreshold != nil {
104109
c.MultiNode.PollFailureThreshold = f.MultiNode.PollFailureThreshold
105110
}
111+
if f.MultiNode.PollSuccessThreshold != nil {
112+
c.MultiNode.PollSuccessThreshold = f.MultiNode.PollSuccessThreshold
113+
}
106114
if f.MultiNode.PollInterval != nil {
107115
c.MultiNode.PollInterval = f.MultiNode.PollInterval
108116
}

multinode/node.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ var errInvalidChainID = errors.New("invalid chain id")
1818

1919
type NodeConfig interface {
2020
PollFailureThreshold() uint32
21+
PollSuccessThreshold() uint32
2122
PollInterval() time.Duration
2223
SelectionMode() string
2324
SyncThreshold() uint32

multinode/node_lifecycle.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -527,6 +527,39 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(syncIssues syncStatus) {
527527
}
528528
}
529529

530+
// probeUntilStable polls the node PollSuccessThreshold consecutive times before allowing it back into
531+
// the alive pool. Returns true if all probes pass, false if any probe fails or ctx is cancelled.
532+
// When threshold is 0 the probe is disabled and the function returns true immediately.
533+
func (n *node[CHAIN_ID, HEAD, RPC]) probeUntilStable(ctx context.Context, lggr logger.Logger) bool {
534+
threshold := n.nodePoolCfg.PollSuccessThreshold()
535+
if threshold == 0 {
536+
return true
537+
}
538+
pollInterval := n.nodePoolCfg.PollInterval()
539+
var successes uint32
540+
for successes < threshold {
541+
select {
542+
case <-ctx.Done():
543+
return false
544+
case <-time.After(pollInterval):
545+
}
546+
n.metrics.IncrementPolls(ctx, n.name)
547+
pollCtx, cancel := context.WithTimeout(ctx, pollInterval)
548+
version, err := n.RPC().ClientVersion(pollCtx)
549+
cancel()
550+
if err != nil {
551+
n.metrics.IncrementPollsFailed(ctx, n.name)
552+
lggr.Warnw("Recovery probe poll failed; restarting redial", "err", err, "successesSoFar", successes, "threshold", threshold)
553+
return false
554+
}
555+
n.metrics.IncrementPollsSuccess(ctx, n.name)
556+
n.metrics.RecordNodeClientVersion(ctx, n.name, version)
557+
successes++
558+
lggr.Debugw("Recovery probe poll succeeded", "successes", successes, "threshold", threshold)
559+
}
560+
return true
561+
}
562+
530563
func (n *node[CHAIN_ID, HEAD, RPC]) unreachableLoop() {
531564
defer n.wg.Done()
532565
ctx, cancel := n.newCtx()
@@ -572,6 +605,11 @@ func (n *node[CHAIN_ID, HEAD, RPC]) unreachableLoop() {
572605
n.setState(nodeStateUnreachable)
573606
continue
574607
case nodeStateAlive:
608+
if !n.probeUntilStable(ctx, lggr) {
609+
n.rpc.Close()
610+
n.setState(nodeStateUnreachable)
611+
continue
612+
}
575613
lggr.Infow(fmt.Sprintf("Successfully redialled and verified RPC node %s. Node was offline for %s", n.String(), time.Since(unreachableAt)), "nodeState", n.getCachedState())
576614
fallthrough
577615
default:

multinode/node_lifecycle_test.go

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package multinode
22

33
import (
4+
"context"
45
"errors"
56
"fmt"
67
"math/big"
@@ -1495,6 +1496,179 @@ func TestUnit_NodeLifecycle_unreachableLoop(t *testing.T) {
14951496
return node.State() == nodeStateAlive
14961497
})
14971498
})
1499+
t.Run("with PollSuccessThreshold set, without isSyncing, node becomes alive once all probe polls succeed", func(t *testing.T) {
1500+
t.Parallel()
1501+
rpc := newMockRPCClient[ID, Head](t)
1502+
nodeChainID := RandomID()
1503+
const pollSuccessThreshold = 2
1504+
node := newAliveNode(t, testNodeOpts{
1505+
rpc: rpc,
1506+
chainID: nodeChainID,
1507+
config: testNodeConfig{
1508+
pollSuccessThreshold: pollSuccessThreshold,
1509+
pollInterval: tests.TestInterval,
1510+
},
1511+
})
1512+
defer func() { assert.NoError(t, node.close()) }()
1513+
1514+
rpc.On("Dial", mock.Anything).Return(nil).Once()
1515+
rpc.On("ChainID", mock.Anything).Return(nodeChainID, nil).Once()
1516+
rpc.On("ClientVersion", mock.Anything).Return("", nil).Twice()
1517+
setupRPCForAliveLoop(t, rpc)
1518+
1519+
node.declareUnreachable()
1520+
tests.AssertEventually(t, func() bool {
1521+
return node.State() == nodeStateAlive
1522+
})
1523+
})
1524+
t.Run("with PollSuccessThreshold set, node becomes alive once all probe polls succeed", func(t *testing.T) {
1525+
t.Parallel()
1526+
rpc := newMockRPCClient[ID, Head](t)
1527+
nodeChainID := RandomID()
1528+
const pollSuccessThreshold = 2
1529+
node := newAliveNode(t, testNodeOpts{
1530+
rpc: rpc,
1531+
chainID: nodeChainID,
1532+
config: testNodeConfig{
1533+
nodeIsSyncingEnabled: true,
1534+
pollSuccessThreshold: pollSuccessThreshold,
1535+
pollInterval: tests.TestInterval,
1536+
},
1537+
})
1538+
defer func() { assert.NoError(t, node.close()) }()
1539+
1540+
rpc.On("Dial", mock.Anything).Return(nil).Once()
1541+
rpc.On("ChainID", mock.Anything).Return(nodeChainID, nil).Once()
1542+
rpc.On("IsSyncing", mock.Anything).Return(false, nil)
1543+
rpc.On("ClientVersion", mock.Anything).Return("", nil).Twice()
1544+
setupRPCForAliveLoop(t, rpc)
1545+
1546+
node.declareUnreachable()
1547+
tests.AssertEventually(t, func() bool {
1548+
return node.State() == nodeStateAlive
1549+
})
1550+
})
1551+
t.Run("with PollSuccessThreshold set, probe poll failure keeps node unreachable and restarts redial", func(t *testing.T) {
1552+
t.Parallel()
1553+
rpc := newMockRPCClient[ID, Head](t)
1554+
nodeChainID := RandomID()
1555+
lggr, observedLogs := logger.TestObserved(t, zap.WarnLevel)
1556+
const pollSuccessThreshold = 2
1557+
node := newAliveNode(t, testNodeOpts{
1558+
rpc: rpc,
1559+
chainID: nodeChainID,
1560+
lggr: lggr,
1561+
config: testNodeConfig{
1562+
pollSuccessThreshold: pollSuccessThreshold,
1563+
pollInterval: tests.TestInterval,
1564+
},
1565+
})
1566+
defer func() { assert.NoError(t, node.close()) }()
1567+
1568+
rpc.On("Dial", mock.Anything).Return(nil).Once()
1569+
rpc.On("ChainID", mock.Anything).Return(nodeChainID, nil).Once()
1570+
rpc.On("ClientVersion", mock.Anything).Return("", nil).Once()
1571+
rpc.On("ClientVersion", mock.Anything).Return("", errors.New("probe poll failed")).Once()
1572+
// after the probe aborts, rpc.Close() is called and the redial backoff fires again; keep failing
1573+
rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial"))
1574+
// guard: if current code (no probe) enters aliveLoop, fail the subscribe so the node returns to unreachable
1575+
rpc.On("SubscribeToHeads", mock.Anything).Return(nil, nil, errors.New("unexpected")).Maybe()
1576+
1577+
node.declareUnreachable()
1578+
tests.AssertLogEventually(t, observedLogs, "Recovery probe poll failed; restarting redial")
1579+
assert.Equal(t, nodeStateUnreachable, node.State())
1580+
})
1581+
}
1582+
1583+
func TestUnit_NodeLifecycle_probeUntilStable(t *testing.T) {
1584+
t.Parallel()
1585+
1586+
t.Run("returns true immediately when threshold is zero, skipping probe", func(t *testing.T) {
1587+
t.Parallel()
1588+
rpc := newMockRPCClient[ID, Head](t)
1589+
// ClientVersion is intentionally NOT mocked: probing must be entirely skipped.
1590+
node := newTestNode(t, testNodeOpts{
1591+
rpc: rpc,
1592+
config: testNodeConfig{
1593+
pollSuccessThreshold: 0,
1594+
pollInterval: tests.TestInterval,
1595+
},
1596+
})
1597+
result := node.probeUntilStable(t.Context(), logger.Test(t))
1598+
assert.True(t, result)
1599+
})
1600+
t.Run("returns false when context is already cancelled", func(t *testing.T) {
1601+
t.Parallel()
1602+
rpc := newMockRPCClient[ID, Head](t)
1603+
// ClientVersion must never be called: ctx is done before the first timer fires.
1604+
node := newTestNode(t, testNodeOpts{
1605+
rpc: rpc,
1606+
config: testNodeConfig{
1607+
pollSuccessThreshold: 2,
1608+
pollInterval: tests.TestInterval,
1609+
},
1610+
})
1611+
ctx, cancel := context.WithCancel(t.Context())
1612+
cancel()
1613+
result := node.probeUntilStable(ctx, logger.Test(t))
1614+
assert.False(t, result)
1615+
})
1616+
t.Run("returns false when first poll fails", func(t *testing.T) {
1617+
t.Parallel()
1618+
rpc := newMockRPCClient[ID, Head](t)
1619+
lggr, observedLogs := logger.TestObserved(t, zap.WarnLevel)
1620+
node := newTestNode(t, testNodeOpts{
1621+
rpc: rpc,
1622+
lggr: lggr,
1623+
config: testNodeConfig{
1624+
pollSuccessThreshold: 2,
1625+
pollInterval: tests.TestInterval,
1626+
},
1627+
})
1628+
rpc.On("ClientVersion", mock.Anything).Return("", errors.New("rpc unavailable")).Once()
1629+
result := node.probeUntilStable(t.Context(), lggr)
1630+
assert.False(t, result)
1631+
tests.AssertLogEventually(t, observedLogs, "Recovery probe poll failed; restarting redial")
1632+
})
1633+
t.Run("returns true when all threshold polls succeed", func(t *testing.T) {
1634+
t.Parallel()
1635+
rpc := newMockRPCClient[ID, Head](t)
1636+
lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel)
1637+
const threshold = 3
1638+
node := newTestNode(t, testNodeOpts{
1639+
rpc: rpc,
1640+
lggr: lggr,
1641+
config: testNodeConfig{
1642+
pollSuccessThreshold: threshold,
1643+
pollInterval: tests.TestInterval,
1644+
},
1645+
})
1646+
rpc.On("ClientVersion", mock.Anything).Return("v1.0.0", nil).Times(threshold)
1647+
result := node.probeUntilStable(t.Context(), lggr)
1648+
assert.True(t, result)
1649+
tests.AssertLogCountEventually(t, observedLogs, "Recovery probe poll succeeded", threshold)
1650+
})
1651+
t.Run("returns false when a later probe poll fails, logging correct successesSoFar", func(t *testing.T) {
1652+
t.Parallel()
1653+
rpc := newMockRPCClient[ID, Head](t)
1654+
lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel)
1655+
const threshold = 3
1656+
node := newTestNode(t, testNodeOpts{
1657+
rpc: rpc,
1658+
lggr: lggr,
1659+
config: testNodeConfig{
1660+
pollSuccessThreshold: threshold,
1661+
pollInterval: tests.TestInterval,
1662+
},
1663+
})
1664+
rpc.On("ClientVersion", mock.Anything).Return("v1.0.0", nil).Times(threshold - 1)
1665+
rpc.On("ClientVersion", mock.Anything).Return("", errors.New("rpc unavailable")).Once()
1666+
result := node.probeUntilStable(t.Context(), lggr)
1667+
assert.False(t, result)
1668+
// threshold-1 successes logged before the failure
1669+
tests.AssertLogCountEventually(t, observedLogs, "Recovery probe poll succeeded", threshold-1)
1670+
tests.AssertLogEventually(t, observedLogs, "Recovery probe poll failed; restarting redial")
1671+
})
14981672
}
14991673

15001674
func TestUnit_NodeLifecycle_invalidChainIDLoop(t *testing.T) {

multinode/node_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616

1717
type testNodeConfig struct {
1818
pollFailureThreshold uint32
19+
pollSuccessThreshold uint32
1920
pollInterval time.Duration
2021
selectionMode string
2122
syncThreshold uint32
@@ -34,6 +35,10 @@ func (n testNodeConfig) PollFailureThreshold() uint32 {
3435
return n.pollFailureThreshold
3536
}
3637

38+
func (n testNodeConfig) PollSuccessThreshold() uint32 {
39+
return n.pollSuccessThreshold
40+
}
41+
3742
func (n testNodeConfig) PollInterval() time.Duration {
3843
return n.pollInterval
3944
}

0 commit comments

Comments
 (0)