Skip to content

Commit 8817033

Browse files
committed
DF-23489 multinode: make PollFailure hysteretic
Successful polls now decrement the failure count, don't fully reset it. So that nodes (RPCs) are eventually declared unreachable if they sustain poll error rates above 1:1.
1 parent fc8cb87 commit 8817033

2 files changed

Lines changed: 45 additions & 5 deletions

File tree

multinode/node_lifecycle.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,10 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
123123
lggr.Debugw("Ping successful", "nodeState", n.State())
124124
n.metrics.RecordNodeClientVersion(ctx, n.name, version)
125125
n.metrics.IncrementPollsSuccess(ctx, n.name)
126-
pollFailures = 0
126+
// Decay rather than reset; detects sustained failure rates above 1:1
127+
if pollFailures > 0 {
128+
pollFailures--
129+
}
127130
}
128131
if pollFailureThreshold > 0 && pollFailures >= pollFailureThreshold {
129132
lggr.Errorw(fmt.Sprintf("RPC endpoint failed to respond to %d consecutive polls", pollFailures), "pollFailures", pollFailures, "nodeState", n.getCachedState())
@@ -356,7 +359,13 @@ func (n *node[CHAIN_ID, HEAD, RPC]) isOutOfSyncWithPool() (outOfSync bool, liveN
356359
}
357360

358361
if outOfSync && n.getCachedState() == nodeStateAlive {
359-
n.lfcLog.Errorw("RPC endpoint has fallen behind", "blockNumber", localChainInfo.BlockNumber, "bestLatestBlockNumber", ci.BlockNumber, "totalDifficulty", localChainInfo.TotalDifficulty)
362+
n.lfcLog.Errorw(
363+
"RPC endpoint has fallen behind",
364+
"blockNumber", localChainInfo.BlockNumber,
365+
"bestLatestBlockNumber", ci.BlockNumber,
366+
"totalDifficulty", localChainInfo.TotalDifficulty,
367+
"blockDifference", localChainInfo.BlockNumber-ci.BlockNumber,
368+
)
360369
}
361370
return outOfSync, ln
362371
}

multinode/node_lifecycle_test.go

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
110110
tests.AssertLogEventually(t, observedLogs, "Polling disabled")
111111
assert.Equal(t, nodeStateAlive, node.State())
112112
})
113-
t.Run("stays alive while below pollFailureThreshold and resets counter on success", func(t *testing.T) {
113+
t.Run("stays alive while below pollFailureThreshold, success decrements failure count", func(t *testing.T) {
114114
t.Parallel()
115115
rpc := newMockRPCClient[ID, Head](t)
116116
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{})
@@ -132,9 +132,9 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
132132
// stays healthy while below threshold
133133
assert.Equal(t, nodeStateAlive, node.State())
134134
}).Times(pollFailureThreshold - 1)
135-
// 2. Successful call that is expected to reset counter
135+
// 2. Successful call that is expected to decrement the counter (counter: 2 → 1)
136136
rpc.On("ClientVersion", mock.Anything).Return("", nil).Once()
137-
// 3. Return error. If we have not reset the timer, we'll transition to nonAliveState
137+
// 3. Return error. Counter was decremented (not reset), so it reaches 2 — still below threshold.
138138
rpc.On("ClientVersion", mock.Anything).Return("", pollError).Once()
139139
// 4. Once during the call, check if node is alive
140140
var ensuredAlive atomic.Bool
@@ -176,6 +176,37 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
176176
return nodeStateUnreachable == node.State()
177177
})
178178
})
179+
t.Run("transitions to unreachable when net poll failures accumulate despite intermittent successes", func(t *testing.T) {
180+
t.Parallel()
181+
rpc := newMockRPCClient[ID, Head](t)
182+
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{})
183+
const pollFailureThreshold = 3
184+
node := newSubscribedNode(t, testNodeOpts{
185+
config: testNodeConfig{
186+
pollFailureThreshold: pollFailureThreshold,
187+
pollInterval: tests.TestInterval,
188+
},
189+
rpc: rpc,
190+
})
191+
defer func() { assert.NoError(t, node.close()) }()
192+
193+
pollError := errors.New("failed to get ClientVersion")
194+
// Pattern F·F·S·F·F: with the decay counter the net failure debt reaches
195+
// threshold=3 at the 5th poll (counter: 1→2→1→2→3). With the old
196+
// reset-on-success behaviour the counter resets to 0 at S and peaks at only
197+
// 2 before the next success, never tripping.
198+
rpc.On("ClientVersion", mock.Anything).Return("", pollError).Times(2)
199+
rpc.On("ClientVersion", mock.Anything).Return("", nil).Once()
200+
rpc.On("ClientVersion", mock.Anything).Return("", pollError).Times(2)
201+
// Unlimited successes after: ensures old code stays alive indefinitely so
202+
// the test correctly fails (times out) when run against the old behaviour.
203+
rpc.On("ClientVersion", mock.Anything).Return("", nil)
204+
rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Maybe()
205+
node.declareAlive()
206+
tests.AssertEventually(t, func() bool {
207+
return node.State() == nodeStateUnreachable
208+
})
209+
})
179210
t.Run("with threshold poll failures, but we are the last node alive, forcibly keeps it alive", func(t *testing.T) {
180211
t.Parallel()
181212
rpc := newMockRPCClient[ID, Head](t)

0 commit comments

Comments
 (0)