Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions multinode/.mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ packages:
nodeMetrics:
sendOnlyNodeMetrics:
transactionSenderMetrics:
FinalizedStateChecker:
82 changes: 82 additions & 0 deletions multinode/mock_finalized_state_checker_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

68 changes: 68 additions & 0 deletions multinode/mock_node_metrics_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 43 additions & 0 deletions multinode/mock_rpc_client_ext_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package multinode

import (
"context"

mock "github.com/stretchr/testify/mock"
)

// mockRPCClientCheckFinalizedStateAvailabilityCall mirrors mockery-generated *Call types for
// CheckFinalizedStateAvailability (added manually on mockRPCClient, not on RPCClient interface).
// Named without underscores to satisfy revive var-naming (mockery-generated code uses underscores).
type mockRPCClientCheckFinalizedStateAvailabilityCall[CHAIN_ID ID, HEAD Head] struct {
*mock.Call
}

// CheckFinalizedStateAvailability is a helper to define EXPECT().CheckFinalizedStateAvailability(...).
func (_e *mockRPCClient_Expecter[CHAIN_ID, HEAD]) CheckFinalizedStateAvailability(ctx interface{}) *mockRPCClientCheckFinalizedStateAvailabilityCall[CHAIN_ID, HEAD] {
return &mockRPCClientCheckFinalizedStateAvailabilityCall[CHAIN_ID, HEAD]{Call: _e.mock.On("CheckFinalizedStateAvailability", ctx)}
}

func (_c *mockRPCClientCheckFinalizedStateAvailabilityCall[CHAIN_ID, HEAD]) Return(err error) *mockRPCClientCheckFinalizedStateAvailabilityCall[CHAIN_ID, HEAD] {
_c.Call.Return(err)
return _c
}

// CheckFinalizedStateAvailability extends mockRPCClient to also satisfy FinalizedStateChecker,
// so NewNode populates finalizedStateChecker for tests that exercise finalized-state checks.
func (_m *mockRPCClient[CHAIN_ID, HEAD]) CheckFinalizedStateAvailability(ctx context.Context) error {
ret := _m.Called(ctx)

if len(ret) == 0 {
panic("no return value specified for CheckFinalizedStateAvailability")
}

var r0 error
if rf, ok := ret.Get(0).(func(context.Context) error); ok {
r0 = rf(ctx)
} else {
r0 = ret.Error(0)
}

return r0
}
24 changes: 21 additions & 3 deletions multinode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,18 @@ type ChainConfig interface {
FinalizedBlockOffset() uint32
}

// FinalizedStateCheckConfig is an optional interface for enabling finalized state availability checking.
// It is optional (not part of NodeConfig) so non-EVM multinode consumers are not forced to extend their config types; see aliveLoop in node_lifecycle.go.
type FinalizedStateCheckConfig interface {
FinalizedStateCheckFailureThreshold() uint32
}

// FinalizedStateChecker is an optional interface for RPCClients that support finalized state checks.
// It is optional (not part of RPCClient) so non-EVM multinode consumers avoid boilerplate and review churn; see aliveLoop in node_lifecycle.go.
type FinalizedStateChecker interface {
CheckFinalizedStateAvailability(ctx context.Context) error
}

type nodeMetrics interface {
IncrementNodeVerifies(ctx context.Context, nodeName string)
IncrementNodeVerifiesFailed(ctx context.Context, nodeName string)
Expand All @@ -49,13 +61,15 @@ type nodeMetrics interface {
IncrementNodeTransitionsToInvalidChainID(ctx context.Context, nodeName string)
IncrementNodeTransitionsToUnusable(ctx context.Context, nodeName string)
IncrementNodeTransitionsToSyncing(ctx context.Context, nodeName string)
IncrementNodeTransitionsToFinalizedStateNotAvailable(ctx context.Context, nodeName string)
RecordNodeClientVersion(ctx context.Context, nodeName string, version string)
SetHighestSeenBlock(ctx context.Context, nodeName string, blockNumber int64)
SetHighestFinalizedBlock(ctx context.Context, nodeName string, blockNumber int64)
IncrementSeenBlocks(ctx context.Context, nodeName string)
IncrementPolls(ctx context.Context, nodeName string)
IncrementPollsFailed(ctx context.Context, nodeName string)
IncrementPollsSuccess(ctx context.Context, nodeName string)
IncrementFinalizedStateFailed(ctx context.Context, nodeName string)
}

type Node[
Expand Down Expand Up @@ -106,8 +120,9 @@ type node[
ws *url.URL
http *url.URL

rpc RPC
isLoadBalancedRPC bool
rpc RPC
finalizedStateChecker FinalizedStateChecker // set in NewNode when rpc implements FinalizedStateChecker
isLoadBalancedRPC bool

stateMu sync.RWMutex // protects state* fields
state nodeState
Expand Down Expand Up @@ -165,6 +180,9 @@ func NewNode[
)
n.lfcLog = logger.Named(lggr, "Lifecycle")
n.rpc = rpc
if fs, ok := any(rpc).(FinalizedStateChecker); ok {
n.finalizedStateChecker = fs
}
n.isLoadBalancedRPC = isLoadBalancedRPC
n.chainFamily = chainFamily
return n
Expand Down Expand Up @@ -274,7 +292,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) verifyChainID(callerCtx context.Context, lgg
// The node is already closed, and any subsequent transition is invalid.
// To make spotting such transitions a bit easier, return the invalid node state.
return nodeStateLen
case nodeStateDialed, nodeStateOutOfSync, nodeStateInvalidChainID, nodeStateSyncing:
case nodeStateDialed, nodeStateOutOfSync, nodeStateInvalidChainID, nodeStateSyncing, nodeStateFinalizedStateNotAvailable:
default:
panic(fmt.Sprintf("cannot verify node in state %v", st))
}
Expand Down
41 changes: 37 additions & 4 deletions multinode/node_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ func (n nodeState) String() string {
return "Syncing"
case nodeStateFinalizedBlockOutOfSync:
return "FinalizedBlockOutOfSync"
case nodeStateFinalizedStateNotAvailable:
return "FinalizedStateNotAvailable"
default:
return fmt.Sprintf("nodeState(%d)", n)
}
Expand Down Expand Up @@ -72,6 +74,8 @@ const (
nodeStateSyncing
// nodeStateFinalizedBlockOutOfSync - node is lagging behind on latest finalized block
nodeStateFinalizedBlockOutOfSync
// nodeStateFinalizedStateNotAvailable - node cannot serve historical state at finalized block
nodeStateFinalizedStateNotAvailable
// nodeStateLen tracks the number of states
nodeStateLen
)
Expand Down Expand Up @@ -182,7 +186,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) transitionToAlive(fn func()) {
return
}
switch n.state {
case nodeStateDialed, nodeStateInvalidChainID, nodeStateSyncing:
case nodeStateDialed, nodeStateInvalidChainID, nodeStateSyncing, nodeStateFinalizedStateNotAvailable:
n.state = nodeStateAlive
default:
panic(transitionFail(n.state, nodeStateAlive))
Expand Down Expand Up @@ -266,7 +270,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) transitionToUnreachable(fn func()) {
return
}
switch n.state {
case nodeStateUndialed, nodeStateDialed, nodeStateAlive, nodeStateOutOfSync, nodeStateInvalidChainID, nodeStateSyncing:
case nodeStateUndialed, nodeStateDialed, nodeStateAlive, nodeStateOutOfSync, nodeStateInvalidChainID, nodeStateSyncing, nodeStateFinalizedStateNotAvailable:
n.rpc.Close()
n.state = nodeStateUnreachable
default:
Expand All @@ -288,6 +292,8 @@ func (n *node[CHAIN_ID, HEAD, RPC]) declareState(state nodeState) {
n.declareSyncing()
case nodeStateAlive:
n.declareAlive()
case nodeStateFinalizedStateNotAvailable:
n.declareFinalizedStateNotAvailable()
default:
panic(fmt.Sprintf("%#v state declaration is not implemented", state))
}
Expand All @@ -311,7 +317,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) transitionToInvalidChainID(fn func()) {
return
}
switch n.state {
case nodeStateDialed, nodeStateOutOfSync, nodeStateSyncing:
case nodeStateDialed, nodeStateOutOfSync, nodeStateSyncing, nodeStateFinalizedStateNotAvailable:
n.rpc.Close()
n.state = nodeStateInvalidChainID
default:
Expand All @@ -338,7 +344,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) transitionToSyncing(fn func()) {
return
}
switch n.state {
case nodeStateDialed, nodeStateOutOfSync, nodeStateInvalidChainID:
case nodeStateDialed, nodeStateOutOfSync, nodeStateInvalidChainID, nodeStateFinalizedStateNotAvailable:
n.rpc.Close()
n.state = nodeStateSyncing
default:
Expand All @@ -351,6 +357,33 @@ func (n *node[CHAIN_ID, HEAD, RPC]) transitionToSyncing(fn func()) {
fn()
}

func (n *node[CHAIN_ID, HEAD, RPC]) declareFinalizedStateNotAvailable() {
n.transitionToFinalizedStateNotAvailable(func() {
n.lfcLog.Errorw("RPC Node cannot serve finalized state", "nodeState", n.state)
n.wg.Add(1)
go n.finalizedStateNotAvailableLoop()
})
}

func (n *node[CHAIN_ID, HEAD, RPC]) transitionToFinalizedStateNotAvailable(fn func()) {
ctx, cancel := n.stopCh.NewCtx()
defer cancel()
n.metrics.IncrementNodeTransitionsToFinalizedStateNotAvailable(ctx, n.name)
n.stateMu.Lock()
defer n.stateMu.Unlock()
if n.state == nodeStateClosed {
return
}
switch n.state {
case nodeStateAlive:
n.rpc.Close()
n.state = nodeStateFinalizedStateNotAvailable
default:
panic(transitionFail(n.state, nodeStateFinalizedStateNotAvailable))
}
fn()
}

func transitionFail(from nodeState, to nodeState) string {
return fmt.Sprintf("cannot transition from %#v to %#v", from, to)
}
Loading
Loading