Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions core/capabilities/ccip/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

kcr "github.com/smartcontractkit/chainlink-evm/gethwrappers/keystone/generated/capabilities_registry_1_1_0"
"github.com/smartcontractkit/chainlink-evm/pkg/config/toml"

"github.com/smartcontractkit/chainlink/v2/core/capabilities/ccip/common"
configsevm "github.com/smartcontractkit/chainlink/v2/core/capabilities/ccip/configs/evm"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/ccip/launcher"
Expand All @@ -51,7 +52,7 @@ import (

type RelayGetter interface {
Get(types.RelayID) (loop.Relayer, error)
GetIDToRelayerMap() (map[types.RelayID]loop.Relayer, error)
GetIDToRelayerMap() map[types.RelayID]loop.Relayer
}

type Keystore[K keystore.Key] interface {
Expand Down Expand Up @@ -145,10 +146,7 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) (services
return nil, err
}

allRelayers, err := d.relayers.GetIDToRelayerMap()
if err != nil {
return nil, fmt.Errorf("could not fetch all relayers: %w", err)
}
allRelayers := d.relayers.GetIDToRelayerMap()
transmitterKeys, err := d.getTransmitterKeys(ctx, maps.Keys(allRelayers))
if err != nil {
return nil, err
Expand Down
12 changes: 7 additions & 5 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,13 +471,15 @@ func NewApplication(ctx context.Context, opts ApplicationOpts) (Application, err
)
srvcs = append(srvcs, workflowORM)

promReporter := headreporter.NewPrometheusReporter(opts.DS, legacyEVMChains)
chainIDs := make([]*big.Int, legacyEVMChains.Len())
promReporter := headreporter.NewLegacyEVMPrometheusReporter(opts.DS, legacyEVMChains)
evmChainIDs := make([]*big.Int, legacyEVMChains.Len())
for i, chain := range legacyEVMChains.Slice() {
chainIDs[i] = chain.ID()
evmChainIDs[i] = chain.ID()
}
telemReporter := headreporter.NewTelemetryReporter(telemetryManager, globalLogger, chainIDs...)
headReporter := headreporter.NewHeadReporterService(opts.DS, globalLogger, promReporter, telemReporter)

legacyEVMTelemReporter := headreporter.NewLegacyEVMTelemetryReporter(telemetryManager, globalLogger, evmChainIDs...)
loopTelemReporter := headreporter.NewTelemetryReporter(telemetryManager, globalLogger, relayChainInterops.GetIDToRelayerMap())
headReporter := headreporter.NewHeadReporterService(opts.DS, globalLogger, promReporter, legacyEVMTelemReporter, loopTelemReporter)
srvcs = append(srvcs, headReporter)
for _, chain := range legacyEVMChains.Slice() {
chain.HeadBroadcaster().Subscribe(headReporter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ func (f *FakeRelayerChainInteroperators) Get(id types.RelayID) (loop.Relayer, er
return r, nil
}

func (f *FakeRelayerChainInteroperators) GetIDToRelayerMap() (map[types.RelayID]loop.Relayer, error) {
return f.Relayers, nil
func (f *FakeRelayerChainInteroperators) GetIDToRelayerMap() map[types.RelayID]loop.Relayer {
return f.Relayers
}

func (f *FakeRelayerChainInteroperators) Slice() []loop.Relayer {
Expand Down
7 changes: 4 additions & 3 deletions core/services/chainlink/relayer_chain_interoperators.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/loop"
"github.com/smartcontractkit/chainlink-common/pkg/types"

"github.com/smartcontractkit/chainlink/v2/core/chains"
"github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm"
"github.com/smartcontractkit/chainlink/v2/core/services"
Expand Down Expand Up @@ -244,15 +245,15 @@ func (rs *CoreRelayerChainInteroperators) Get(id types.RelayID) (loop.Relayer, e
return lr, nil
}

func (rs *CoreRelayerChainInteroperators) GetIDToRelayerMap() (map[types.RelayID]loop.Relayer, error) {
func (rs *CoreRelayerChainInteroperators) GetIDToRelayerMap() map[types.RelayID]loop.Relayer {
rs.mu.Lock()
defer rs.mu.Unlock()
result := make(map[types.RelayID]loop.Relayer)
for id, relayer := range rs.loopRelayers {
result[id] = relayer
}

return result, nil
return result
}

// LegacyEVMChains returns a container with all the evm chains
Expand Down Expand Up @@ -380,7 +381,7 @@ func FilterRelayersByType(network string) func(id types.RelayID) bool {
}

// List returns all the [RelayerChainInteroperators] that match the [FilterFn].
// A typical usage pattern to use [List] with [FilterByType] to obtain a set of [RelayerChainInteroperators]
// A typical usage pattern to use [List] with [FilterRelayersByType] to obtain a set of [RelayerChainInteroperators]
// for a given chain
func (rs *CoreRelayerChainInteroperators) List(filter FilterFn) RelayerChainInteroperators {
matches := make(map[types.RelayID]loop.Relayer)
Expand Down
3 changes: 2 additions & 1 deletion core/services/headreporter/prometheus_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
evmtypes "github.com/smartcontractkit/chainlink-evm/pkg/types"
txmgrcommon "github.com/smartcontractkit/chainlink-framework/chains/txmgr"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr"
"github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm"
)
Expand Down Expand Up @@ -60,7 +61,7 @@ var (
})
)

func NewPrometheusReporter(ds sqlutil.DataSource, chainContainer legacyevm.LegacyChainContainer) *prometheusReporter {
func NewLegacyEVMPrometheusReporter(ds sqlutil.DataSource, chainContainer legacyevm.LegacyChainContainer) *prometheusReporter {
return &prometheusReporter{
ds: ds,
chains: chainContainer,
Expand Down
8 changes: 4 additions & 4 deletions core/services/headreporter/prometheus_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func Test_PrometheusReporter(t *testing.T) {
backend.On("SetMaxUnconfirmedAge", big.NewInt(0), float64(0)).Return()
backend.On("SetMaxUnconfirmedBlocks", big.NewInt(0), int64(0)).Return()

reporter := headreporter.NewPrometheusReporter(db, newLegacyChainContainer(t, db))
reporter := headreporter.NewLegacyEVMPrometheusReporter(db, newLegacyChainContainer(t, db))
reporter.SetBackend(backend)

head := headreporter.NewHead()
Expand All @@ -52,7 +52,7 @@ func Test_PrometheusReporter(t *testing.T) {
db := pgtest.NewSqlxDB(t)
backend := headreporter.NewMockPrometheusBackend(t)

reporter := headreporter.NewPrometheusReporter(db, newLegacyChainContainerWithNullTxm(t))
reporter := headreporter.NewLegacyEVMPrometheusReporter(db, newLegacyChainContainerWithNullTxm(t))
reporter.SetBackend(backend)

head := headreporter.NewHead()
Expand All @@ -78,7 +78,7 @@ func Test_PrometheusReporter(t *testing.T) {
})).Return()
backend.On("SetMaxUnconfirmedBlocks", big.NewInt(0), int64(35)).Return()

reporter := headreporter.NewPrometheusReporter(db, newLegacyChainContainer(t, db))
reporter := headreporter.NewLegacyEVMPrometheusReporter(db, newLegacyChainContainer(t, db))
reporter.SetBackend(backend)

head := headreporter.NewHead()
Expand All @@ -105,7 +105,7 @@ func Test_PrometheusReporter(t *testing.T) {
backend.On("SetMaxUnconfirmedAge", big.NewInt(0), float64(0)).Return()
backend.On("SetMaxUnconfirmedBlocks", big.NewInt(0), int64(0)).Return()

reporter := headreporter.NewPrometheusReporter(db, newLegacyChainContainer(t, db))
reporter := headreporter.NewLegacyEVMPrometheusReporter(db, newLegacyChainContainer(t, db))
reporter.SetBackend(backend)

head := headreporter.NewHead()
Expand Down
97 changes: 85 additions & 12 deletions core/services/headreporter/telemetry_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,59 +2,65 @@ package headreporter

import (
"context"
"encoding/hex"
"fmt"
"math/big"
"strconv"

"github.com/pkg/errors"
"github.com/smartcontractkit/chainlink-common/pkg/loop"
"github.com/smartcontractkit/chainlink-common/pkg/types"

"github.com/smartcontractkit/libocr/commontypes"
"google.golang.org/protobuf/proto"

evmtypes "github.com/smartcontractkit/chainlink-evm/pkg/types"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/synchronization"
"github.com/smartcontractkit/chainlink/v2/core/services/synchronization/telem"
"github.com/smartcontractkit/chainlink/v2/core/services/telemetry"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

type telemetryReporter struct {
type legacyEVMTelemetryReporter struct {
lggr logger.Logger
endpoints map[uint64]commontypes.MonitoringEndpoint
}

func NewTelemetryReporter(monitoringEndpointGen telemetry.MonitoringEndpointGenerator, lggr logger.Logger, chainIDs ...*big.Int) HeadReporter {
func NewLegacyEVMTelemetryReporter(monitoringEndpointGen telemetry.MonitoringEndpointGenerator, lggr logger.Logger, chainIDs ...*big.Int) HeadReporter {
endpoints := make(map[uint64]commontypes.MonitoringEndpoint)
for _, chainID := range chainIDs {
endpoints[chainID.Uint64()] = monitoringEndpointGen.GenMonitoringEndpoint("EVM", chainID.String(), "", synchronization.HeadReport)
}
return &telemetryReporter{lggr: lggr.Named("TelemetryReporter"), endpoints: endpoints}
return &legacyEVMTelemetryReporter{lggr: lggr.Named("TelemetryReporter"), endpoints: endpoints}
}

func (t *telemetryReporter) ReportNewHead(ctx context.Context, head *evmtypes.Head) error {
func (t *legacyEVMTelemetryReporter) ReportNewHead(ctx context.Context, head *evmtypes.Head) error {
monitoringEndpoint := t.endpoints[head.EVMChainID.ToInt().Uint64()]
if monitoringEndpoint == nil {
return errors.Errorf("No monitoring endpoint provided chain_id=%d", head.EVMChainID.Int64())
return fmt.Errorf("No monitoring endpoint provided chain_id=%d", head.EVMChainID.Int64())
}
var finalized *telem.Block
latestFinalizedHead := head.LatestFinalizedHead()
if latestFinalizedHead != nil {
finalized = &telem.Block{
Timestamp: uint64(latestFinalizedHead.GetTimestamp().UTC().Unix()),
Number: uint64(latestFinalizedHead.BlockNumber()),
Timestamp: utils.NonNegativeInt64ToUint64(latestFinalizedHead.GetTimestamp().UTC().Unix()),
Number: utils.NonNegativeInt64ToUint64(latestFinalizedHead.BlockNumber()),
Hash: latestFinalizedHead.BlockHash().Hex(),
}
}
request := &telem.HeadReportRequest{
ChainID: head.EVMChainID.String(),
Latest: &telem.Block{
Timestamp: uint64(head.Timestamp.UTC().Unix()),
Number: uint64(head.Number),
Timestamp: utils.NonNegativeInt64ToUint64(head.Timestamp.UTC().Unix()),
Number: utils.NonNegativeInt64ToUint64(head.Number),
Hash: head.Hash.Hex(),
},
Finalized: finalized,
}
bytes, err := proto.Marshal(request)
if err != nil {
return errors.WithMessage(err, "telem.HeadReportRequest marshal error")
return fmt.Errorf("telem.HeadReportRequest marshal error: %w", err)
}
monitoringEndpoint.SendLog(bytes)
if finalized == nil {
Expand All @@ -64,6 +70,73 @@ func (t *telemetryReporter) ReportNewHead(ctx context.Context, head *evmtypes.He
return nil
}

func (t *telemetryReporter) ReportPeriodic(ctx context.Context) error {
func (t *legacyEVMTelemetryReporter) ReportPeriodic(_ context.Context) error {
return nil
}

type loopTelemetryReporter struct {
lggr logger.Logger
endpoints map[types.RelayID]commontypes.MonitoringEndpoint
relays map[types.RelayID]loop.Relayer
}

// NewTelemetryReporter creates a new telemetry reporter for each relayer
func NewTelemetryReporter(monitoringEndpointGen telemetry.MonitoringEndpointGenerator, lggr logger.Logger, relayers map[types.RelayID]loop.Relayer) HeadReporter {
if relayers == nil {
return nil
}
endpoints := make(map[types.RelayID]commontypes.MonitoringEndpoint)
for relayID := range relayers {
endpoints[relayID] = monitoringEndpointGen.GenMonitoringEndpoint(relayID.Network, relayID.ChainID, "", synchronization.HeadReport)
}
return &loopTelemetryReporter{lggr: lggr.Named("TelemetryReporter"), endpoints: endpoints, relays: relayers}
}

// ReportNewHead is unimplemented on Solana because there is no Headtracker to subscribe to
func (t *loopTelemetryReporter) ReportNewHead(_ context.Context, _ *evmtypes.Head) error {
return nil
}

// ReportPeriodic is used on Solana to report the latest head
func (t *loopTelemetryReporter) ReportPeriodic(ctx context.Context) error {
for relayID, endpoint := range t.endpoints {
relay, ok := t.relays[relayID]
if !ok {
return fmt.Errorf("no relay found for Solana chain_id=%s", relayID.ChainID)
}
err := reportLatestHead(ctx, endpoint, relayID.ChainID, relay)
if err != nil {
return err
}
}

return nil
}

func reportLatestHead(ctx context.Context, endpoint commontypes.MonitoringEndpoint, chainID string, relay loop.Relayer) error {
head, err := relay.LatestHead(ctx)
if err != nil {
return fmt.Errorf("failed getting Solana head for chainID %s: %w", chainID, err)
}

blockNum, err := strconv.ParseUint(head.Height, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse Solana block height %s: %w", head.Height, err)
}

request := &telem.HeadReportRequest{
ChainID: chainID,
Latest: &telem.Block{
Timestamp: head.Timestamp,
Number: blockNum,
Hash: hex.EncodeToString(head.Hash),
},
Finalized: nil, // latest finalized head retrieval not supported by Solana relayer yet
}
bytes, err := proto.Marshal(request)
if err != nil {
return fmt.Errorf("telem.HeadReportRequest marshal error: %w", err)
}
endpoint.SendLog(bytes)
return nil
}
Loading
Loading