Skip to content

Commit 3c71fef

Browse files
Head Tracker Finality Violation Detection (#28)
* verify finalized block hashes * Fix spacing * lint * Allow nil prevHeadWithChain * Update tracker.go * Check prevLatestFinalized is not nil * Update tracker.go * Update tracker.go * Add instant finality check * Update tracker.go * Update finality error handling * Emit finality health error * Verify hashes before saving head * Handle re-org pre finality * Update tracker.go * ignore out of order heads * verify with finality depth * Update tracker.go * Update tracker.go * Update tracker.go * Update tracker.go * Update tracker.go * relax invariant violation * Update tracker.go * Update tracker.go
1 parent 3d6cea2 commit 3c71fef

1 file changed

Lines changed: 89 additions & 18 deletions

File tree

chains/heads/tracker.go

Lines changed: 89 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
"github.com/smartcontractkit/chainlink-common/pkg/logger"
1414
"github.com/smartcontractkit/chainlink-common/pkg/services"
15+
"github.com/smartcontractkit/chainlink-common/pkg/types"
1516
"github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox"
1617

1718
"github.com/smartcontractkit/chainlink-framework/chains"
@@ -36,7 +37,7 @@ const HeadsBufferSize = 10
3637
type Tracker[H chains.Head[BLOCK_HASH], BLOCK_HASH chains.Hashable] interface {
3738
services.Service
3839
// Backfill given a head will fill in any missing heads up to latestFinalized
39-
Backfill(ctx context.Context, headWithChain H) (err error)
40+
Backfill(ctx context.Context, headWithChain H, prevHeadWithChain H) (err error)
4041
LatestChain() H
4142
// LatestAndFinalizedBlock - returns latest and latest finalized blocks.
4243
// NOTE: Returns latest finalized block as is, ignoring the FinalityTagBypass feature flag.
@@ -59,6 +60,11 @@ type TrackerConfig interface {
5960
PersistenceEnabled() bool
6061
}
6162

63+
type headPair[HTH any] struct {
64+
head HTH
65+
prevHead HTH
66+
}
67+
6268
type tracker[
6369
HTH Head[BLOCK_HASH, ID],
6470
S chains.Subscription,
@@ -77,7 +83,7 @@ type tracker[
7783
config ChainConfig
7884
htConfig TrackerConfig
7985

80-
backfillMB *mailbox.Mailbox[HTH]
86+
backfillMB *mailbox.Mailbox[headPair[HTH]]
8187
broadcastMB *mailbox.Mailbox[HTH]
8288
headListener Listener[HTH, BLOCK_HASH]
8389
getNilHead func() HTH
@@ -105,7 +111,7 @@ func NewTracker[
105111
chainID: client.ConfiguredChainID(),
106112
config: config,
107113
htConfig: htConfig,
108-
backfillMB: mailbox.NewSingle[HTH](),
114+
backfillMB: mailbox.NewSingle[headPair[HTH]](),
109115
broadcastMB: mailbox.New[HTH](HeadsBufferSize),
110116
headSaver: headSaver,
111117
mailMon: mailMon,
@@ -194,7 +200,43 @@ func (t *tracker[HTH, S, ID, BLOCK_HASH]) close() error {
194200
return t.broadcastMB.Close()
195201
}
196202

197-
func (t *tracker[HTH, S, ID, BLOCK_HASH]) Backfill(ctx context.Context, headWithChain HTH) (err error) {
203+
// verifyFinalizedBlockHashes returns finality violated error if a block hash mismatch is found in provided chains
204+
func (t *tracker[HTH, S, ID, BLOCK_HASH]) verifyFinalizedBlockHashes(finalizedHeadWithChain chains.Head[BLOCK_HASH], prevHeadWithChain chains.Head[BLOCK_HASH]) error {
205+
if finalizedHeadWithChain == nil || prevHeadWithChain == nil {
206+
return nil
207+
}
208+
209+
prevLatestFinalized := prevHeadWithChain.LatestFinalizedHead()
210+
if prevLatestFinalized == nil {
211+
return nil
212+
}
213+
214+
prevLatestFinalizedBlockNum := prevLatestFinalized.BlockNumber()
215+
prevLatestFinalizedHash := prevLatestFinalized.BlockHash()
216+
finalizedHead, err := finalizedHeadWithChain.HeadAtHeight(prevLatestFinalizedBlockNum)
217+
if err != nil {
218+
return nil
219+
}
220+
221+
finalizedBlockNum := finalizedHead.BlockNumber()
222+
if finalizedBlockNum < prevLatestFinalizedBlockNum {
223+
return fmt.Errorf("latest finalized head at height %d is behind previously seen finalized head at height %d: %w",
224+
finalizedBlockNum, prevLatestFinalizedBlockNum, types.ErrFinalityViolated)
225+
}
226+
227+
finalizedHash := finalizedHead.BlockHash()
228+
if finalizedHash != prevLatestFinalizedHash {
229+
return fmt.Errorf("block hash mismatch at height %d: expected %s, got %s: %w",
230+
prevLatestFinalizedBlockNum, prevLatestFinalizedHash, finalizedHash, types.ErrFinalityViolated)
231+
}
232+
return nil
233+
}
234+
235+
func (t *tracker[HTH, S, ID, BLOCK_HASH]) instantFinality() bool {
236+
return !t.config.FinalityTagEnabled() && t.config.FinalityDepth() == 0 && t.config.FinalizedBlockOffset() == 0
237+
}
238+
239+
func (t *tracker[HTH, S, ID, BLOCK_HASH]) Backfill(ctx context.Context, headWithChain HTH, prevHeadWithChain HTH) (err error) {
198240
latestFinalized, err := t.calculateLatestFinalized(ctx, headWithChain, t.htConfig.FinalityTagBypass())
199241
if err != nil {
200242
return fmt.Errorf("failed to calculate finalized block: %w", err)
@@ -205,18 +247,27 @@ func (t *tracker[HTH, S, ID, BLOCK_HASH]) Backfill(ctx context.Context, headWith
205247
}
206248

207249
if headWithChain.BlockNumber() < latestFinalized.BlockNumber() {
208-
const errMsg = "invariant violation: expected head of canonical chain to be ahead of the latestFinalized"
250+
const warnMsg = "expected head of canonical chain to be ahead of the latestFinalized, but this may be normal on chains with fast finality due to fetch timing"
209251
t.log.With("head_block_num", headWithChain.BlockNumber(),
210252
"latest_finalized_block_number", latestFinalized.BlockNumber()).
211-
Criticalf(errMsg)
212-
return errors.New(errMsg)
253+
Warnf(warnMsg)
254+
return errors.New(warnMsg)
213255
}
214256

215257
if headWithChain.BlockNumber()-latestFinalized.BlockNumber() > int64(t.htConfig.MaxAllowedFinalityDepth()) {
216258
return fmt.Errorf("gap between latest finalized block (%d) and current head (%d) is too large (> %d)",
217259
latestFinalized.BlockNumber(), headWithChain.BlockNumber(), t.htConfig.MaxAllowedFinalityDepth())
218260
}
219261

262+
if !t.instantFinality() {
263+
// verify block hashes since calculateLatestFinalized made an additional RPC call
264+
err = t.verifyFinalizedBlockHashes(latestFinalized, prevHeadWithChain.LatestFinalizedHead())
265+
if err != nil {
266+
t.eng.EmitHealthErr(err)
267+
return err
268+
}
269+
}
270+
220271
return t.backfill(ctx, headWithChain, latestFinalized)
221272
}
222273

@@ -235,6 +286,26 @@ func (t *tracker[HTH, S, ID, BLOCK_HASH]) handleNewHead(ctx context.Context, hea
235286
"blockDifficulty", head.BlockDifficulty(),
236287
)
237288

289+
var prevLatestFinalized chains.Head[BLOCK_HASH]
290+
if prevHead.IsValid() {
291+
prevLatestFinalized = prevHead.LatestFinalizedHead()
292+
}
293+
294+
if prevLatestFinalized != nil && head.BlockNumber() < prevLatestFinalized.BlockNumber() {
295+
promOldHead.WithLabelValues(t.chainID.String()).Inc()
296+
t.log.Critical("Got very old block. Either a very deep re-org occurred, one of the RPC nodes has gotten far out of sync, or the chain went backwards in block numbers. This node may not function correctly without manual intervention.", "err", types.ErrFinalityViolated)
297+
oldBlockErr := fmt.Errorf("got very old block with number %d (highest seen was %d)", head.BlockNumber(), prevHead.BlockNumber())
298+
err := fmt.Errorf("%w: %w", oldBlockErr, types.ErrFinalityViolated)
299+
t.eng.EmitHealthErr(err)
300+
return err
301+
}
302+
303+
if err := t.verifyFinalizedBlockHashes(head.LatestFinalizedHead(), prevHead); err != nil {
304+
t.log.Critical(err)
305+
t.eng.EmitHealthErr(err)
306+
return err
307+
}
308+
238309
if err := t.headSaver.Save(ctx, head); ctx.Err() != nil {
239310
return nil
240311
} else if err != nil {
@@ -248,7 +319,7 @@ func (t *tracker[HTH, S, ID, BLOCK_HASH]) handleNewHead(ctx context.Context, hea
248319
if !headWithChain.IsValid() {
249320
return fmt.Errorf("heads.tracker#handleNewHighestHead headWithChain was unexpectedly nil")
250321
}
251-
t.backfillMB.Deliver(headWithChain)
322+
t.backfillMB.Deliver(headPair[HTH]{headWithChain, prevHead})
252323
t.broadcastMB.Deliver(headWithChain)
253324
} else if head.BlockNumber() == prevHead.BlockNumber() {
254325
if head.BlockHash() != prevHead.BlockHash() {
@@ -258,13 +329,13 @@ func (t *tracker[HTH, S, ID, BLOCK_HASH]) handleNewHead(ctx context.Context, hea
258329
}
259330
} else {
260331
t.log.Debugw("Got out of order head", "blockNum", head.BlockNumber(), "head", head.BlockHash(), "prevHead", prevHead.BlockNumber())
261-
prevLatestFinalized := prevHead.LatestFinalizedHead()
262-
263-
if prevLatestFinalized != nil && head.BlockNumber() <= prevLatestFinalized.BlockNumber() {
264-
promOldHead.WithLabelValues(t.chainID.String()).Inc()
265-
err := fmt.Errorf("got very old block with number %d (highest seen was %d)", head.BlockNumber(), prevHead.BlockNumber())
266-
t.log.Critical("Got very old block. Either a very deep re-org occurred, one of the RPC nodes has gotten far out of sync, or the chain went backwards in block numbers. This node may not function correctly without manual intervention.", "err", err)
267-
t.eng.EmitHealthErr(err)
332+
promOldHead.WithLabelValues(t.chainID.String()).Inc()
333+
if prevLatestFinalized == nil {
334+
// sanity check
335+
finalityDepth := int64(t.config.FinalityDepth())
336+
if head.BlockNumber() < prevHead.BlockNumber()-finalityDepth {
337+
t.log.Warnf("Received old block at height %d past finality depth of %d. Either a re-org occurred, one of the RPC nodes has gotten out of sync, or the chain went backwards in block numbers.", head.BlockNumber(), finalityDepth)
338+
}
268339
}
269340
}
270341
return nil
@@ -314,12 +385,12 @@ func (t *tracker[HTH, S, ID, BLOCK_HASH]) backfillLoop(ctx context.Context) {
314385
return
315386
case <-t.backfillMB.Notify():
316387
for {
317-
head, exists := t.backfillMB.Retrieve()
388+
backfillHeadPair, exists := t.backfillMB.Retrieve()
318389
if !exists {
319390
break
320391
}
321392
{
322-
err := t.Backfill(ctx, head)
393+
err := t.Backfill(ctx, backfillHeadPair.head, backfillHeadPair.prevHead)
323394
if err != nil {
324395
t.log.Warnw("Unexpected error while backfilling heads", "err", err)
325396
} else if ctx.Err() != nil {
@@ -399,7 +470,7 @@ func (t *tracker[HTH, S, ID, BLOCK_HASH]) calculateLatestFinalized(ctx context.C
399470
return t.getHeadAtHeight(ctx, latestFinalized.BlockHash(), finalizedBlockNumber)
400471
}
401472
// no need to make an additional RPC call on chains with instant finality
402-
if t.config.FinalityDepth() == 0 && t.config.FinalizedBlockOffset() == 0 {
473+
if t.instantFinality() {
403474
return currentHead, nil
404475
}
405476
finalizedBlockNumber := currentHead.BlockNumber() - int64(t.config.FinalityDepth()) - int64(t.config.FinalizedBlockOffset())

0 commit comments

Comments
 (0)