Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ var (

type CCIPProviderClient struct {
*goplugin.ServiceClient
chainAccessor ccipocr3.ChainAccessor
chainAccessor *ChainAccessorClient
contractTransmitter ocr3types.ContractTransmitter[[]byte]
chainSpecificAddressCodec ccipocr3.ChainSpecificAddressCodec
commitPluginCodec ccipocr3.CommitPluginCodec
Expand All @@ -31,7 +31,7 @@ type CCIPProviderClient struct {
sourceChainExtraDataCodec ccipocr3.SourceChainExtraDataCodec
}

func NewCCIPProviderClient(b *net.BrokerExt, cc grpc.ClientConnInterface) types.CCIPProvider {
func NewCCIPProviderClient(b *net.BrokerExt, cc grpc.ClientConnInterface) *CCIPProviderClient {
c := &CCIPProviderClient{
ServiceClient: goplugin.NewServiceClient(b.WithName("CCIPProviderClient"), cc),
}
Expand All @@ -52,6 +52,10 @@ func NewCCIPProviderClient(b *net.BrokerExt, cc grpc.ClientConnInterface) types.
return c
}

func (p *CCIPProviderClient) GetSyncs() []*ccipocr3pb.SyncRequest {
return p.chainAccessor.GetSyncs()
}

func (p *CCIPProviderClient) ChainAccessor() ccipocr3.ChainAccessor {
return p.chainAccessor
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package ccipocr3

import (
"context"
"slices"
"sync"
"time"

"google.golang.org/grpc"
Expand All @@ -14,22 +16,31 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives"
)

var _ ccipocr3.ChainAccessor = (*chainAccessorClient)(nil)
var _ ccipocr3.ChainAccessor = (*ChainAccessorClient)(nil)

type chainAccessorClient struct {
type ChainAccessorClient struct {
*net.BrokerExt
grpc ccipocr3pb.ChainAccessorClient

mu sync.RWMutex
syncs []*ccipocr3pb.SyncRequest
}

func NewChainAccessorClient(broker *net.BrokerExt, cc grpc.ClientConnInterface) ccipocr3.ChainAccessor {
return &chainAccessorClient{
func NewChainAccessorClient(broker *net.BrokerExt, cc grpc.ClientConnInterface) *ChainAccessorClient {
return &ChainAccessorClient{
BrokerExt: broker,
grpc: ccipocr3pb.NewChainAccessorClient(cc),
}
}

func (c *ChainAccessorClient) GetSyncs() []*ccipocr3pb.SyncRequest {
c.mu.RLock()
defer c.mu.RUnlock()
return slices.Clone(c.syncs)
}

// AllAccessors methods
func (c *chainAccessorClient) GetContractAddress(contractName string) ([]byte, error) {
func (c *ChainAccessorClient) GetContractAddress(contractName string) ([]byte, error) {
resp, err := c.grpc.GetContractAddress(context.Background(), &ccipocr3pb.GetContractAddressRequest{
ContractName: contractName,
})
Expand All @@ -39,7 +50,7 @@ func (c *chainAccessorClient) GetContractAddress(contractName string) ([]byte, e
return resp.Address, nil
}

func (c *chainAccessorClient) GetAllConfigsLegacy(
func (c *ChainAccessorClient) GetAllConfigsLegacy(
ctx context.Context,
destChainSelector ccipocr3.ChainSelector,
sourceChainSelectors []ccipocr3.ChainSelector,
Expand All @@ -66,7 +77,7 @@ func (c *chainAccessorClient) GetAllConfigsLegacy(
return pbToChainConfigSnapshotDetailed(resp.Snapshot), sourceConfigs, nil
}

func (c *chainAccessorClient) GetChainFeeComponents(ctx context.Context) (ccipocr3.ChainFeeComponents, error) {
func (c *ChainAccessorClient) GetChainFeeComponents(ctx context.Context) (ccipocr3.ChainFeeComponents, error) {
resp, err := c.grpc.GetChainFeeComponents(ctx, &emptypb.Empty{})
if err != nil {
return ccipocr3.ChainFeeComponents{}, err
Expand All @@ -77,16 +88,19 @@ func (c *chainAccessorClient) GetChainFeeComponents(ctx context.Context) (ccipoc
}, nil
}

func (c *chainAccessorClient) Sync(ctx context.Context, contractName string, contractAddress ccipocr3.UnknownAddress) error {
_, err := c.grpc.Sync(ctx, &ccipocr3pb.SyncRequest{
ContractName: contractName,
ContractAddress: contractAddress,
})
func (c *ChainAccessorClient) Sync(ctx context.Context, contractName string, contractAddress ccipocr3.UnknownAddress) error {
req := &ccipocr3pb.SyncRequest{ContractName: contractName, ContractAddress: contractAddress}
_, err := c.grpc.Sync(ctx, req)
if err != nil {
c.mu.Lock()
c.syncs = append(c.syncs, req) // TODO dedupe?
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably just using a map is simplest. I was trying to maintain the protobuf caching but that is nbd.

c.mu.Unlock()
}
return err
}

// DestinationAccessor methods
func (c *chainAccessorClient) CommitReportsGTETimestamp(
func (c *ChainAccessorClient) CommitReportsGTETimestamp(
ctx context.Context,
ts time.Time,
confidence primitives.ConfidenceLevel,
Expand All @@ -112,7 +126,7 @@ func (c *chainAccessorClient) CommitReportsGTETimestamp(
return reports, nil
}

func (c *chainAccessorClient) ExecutedMessages(
func (c *ChainAccessorClient) ExecutedMessages(
ctx context.Context,
ranges map[ccipocr3.ChainSelector][]ccipocr3.SeqNumRange,
confidence primitives.ConfidenceLevel,
Expand Down Expand Up @@ -149,7 +163,7 @@ func (c *chainAccessorClient) ExecutedMessages(
return result, nil
}

func (c *chainAccessorClient) NextSeqNum(ctx context.Context, sources []ccipocr3.ChainSelector) (map[ccipocr3.ChainSelector]ccipocr3.SeqNum, error) {
func (c *ChainAccessorClient) NextSeqNum(ctx context.Context, sources []ccipocr3.ChainSelector) (map[ccipocr3.ChainSelector]ccipocr3.SeqNum, error) {
var chainSelectors []uint64
for _, source := range sources {
chainSelectors = append(chainSelectors, uint64(source))
Expand All @@ -169,7 +183,7 @@ func (c *chainAccessorClient) NextSeqNum(ctx context.Context, sources []ccipocr3
return result, nil
}

func (c *chainAccessorClient) Nonces(ctx context.Context, addresses map[ccipocr3.ChainSelector][]ccipocr3.UnknownEncodedAddress) (map[ccipocr3.ChainSelector]map[string]uint64, error) {
func (c *ChainAccessorClient) Nonces(ctx context.Context, addresses map[ccipocr3.ChainSelector][]ccipocr3.UnknownEncodedAddress) (map[ccipocr3.ChainSelector]map[string]uint64, error) {
req := &ccipocr3pb.NoncesRequest{
Addresses: make(map[uint64]*ccipocr3pb.UnknownEncodedAddressList),
}
Expand All @@ -194,7 +208,7 @@ func (c *chainAccessorClient) Nonces(ctx context.Context, addresses map[ccipocr3
return result, nil
}

func (c *chainAccessorClient) GetChainFeePriceUpdate(ctx context.Context, selectors []ccipocr3.ChainSelector) map[ccipocr3.ChainSelector]ccipocr3.TimestampedBig {
func (c *ChainAccessorClient) GetChainFeePriceUpdate(ctx context.Context, selectors []ccipocr3.ChainSelector) map[ccipocr3.ChainSelector]ccipocr3.TimestampedBig {
var chainSelectors []uint64
for _, sel := range selectors {
chainSelectors = append(chainSelectors, uint64(sel))
Expand All @@ -219,7 +233,7 @@ func (c *chainAccessorClient) GetChainFeePriceUpdate(ctx context.Context, select
return result
}

func (c *chainAccessorClient) GetLatestPriceSeqNr(ctx context.Context) (uint64, error) {
func (c *ChainAccessorClient) GetLatestPriceSeqNr(ctx context.Context) (uint64, error) {
resp, err := c.grpc.GetLatestPriceSeqNr(ctx, &emptypb.Empty{})
if err != nil {
return 0, err
Expand All @@ -228,7 +242,7 @@ func (c *chainAccessorClient) GetLatestPriceSeqNr(ctx context.Context) (uint64,
}

// SourceAccessor methods
func (c *chainAccessorClient) MsgsBetweenSeqNums(ctx context.Context, dest ccipocr3.ChainSelector, seqNumRange ccipocr3.SeqNumRange) ([]ccipocr3.Message, error) {
func (c *ChainAccessorClient) MsgsBetweenSeqNums(ctx context.Context, dest ccipocr3.ChainSelector, seqNumRange ccipocr3.SeqNumRange) ([]ccipocr3.Message, error) {
resp, err := c.grpc.MsgsBetweenSeqNums(ctx, &ccipocr3pb.MsgsBetweenSeqNumsRequest{
DestChainSelector: uint64(dest),
SeqNumRange: &ccipocr3pb.SeqNumRange{
Expand All @@ -247,7 +261,7 @@ func (c *chainAccessorClient) MsgsBetweenSeqNums(ctx context.Context, dest ccipo
return messages, nil
}

func (c *chainAccessorClient) LatestMessageTo(ctx context.Context, dest ccipocr3.ChainSelector) (ccipocr3.SeqNum, error) {
func (c *ChainAccessorClient) LatestMessageTo(ctx context.Context, dest ccipocr3.ChainSelector) (ccipocr3.SeqNum, error) {
resp, err := c.grpc.LatestMessageTo(ctx, &ccipocr3pb.LatestMessageToRequest{
DestChainSelector: uint64(dest),
})
Expand All @@ -257,7 +271,7 @@ func (c *chainAccessorClient) LatestMessageTo(ctx context.Context, dest ccipocr3
return ccipocr3.SeqNum(resp.SeqNum), nil
}

func (c *chainAccessorClient) GetExpectedNextSequenceNumber(ctx context.Context, dest ccipocr3.ChainSelector) (ccipocr3.SeqNum, error) {
func (c *ChainAccessorClient) GetExpectedNextSequenceNumber(ctx context.Context, dest ccipocr3.ChainSelector) (ccipocr3.SeqNum, error) {
resp, err := c.grpc.GetExpectedNextSequenceNumber(ctx, &ccipocr3pb.GetExpectedNextSequenceNumberRequest{
DestChainSelector: uint64(dest),
})
Expand All @@ -267,7 +281,7 @@ func (c *chainAccessorClient) GetExpectedNextSequenceNumber(ctx context.Context,
return ccipocr3.SeqNum(resp.SeqNum), nil
}

func (c *chainAccessorClient) GetTokenPriceUSD(ctx context.Context, address ccipocr3.UnknownAddress) (ccipocr3.TimestampedUnixBig, error) {
func (c *ChainAccessorClient) GetTokenPriceUSD(ctx context.Context, address ccipocr3.UnknownAddress) (ccipocr3.TimestampedUnixBig, error) {
resp, err := c.grpc.GetTokenPriceUSD(ctx, &ccipocr3pb.GetTokenPriceUSDRequest{
Address: address,
})
Expand All @@ -280,7 +294,7 @@ func (c *chainAccessorClient) GetTokenPriceUSD(ctx context.Context, address ccip
}, nil
}

func (c *chainAccessorClient) GetFeeQuoterDestChainConfig(ctx context.Context, dest ccipocr3.ChainSelector) (ccipocr3.FeeQuoterDestChainConfig, error) {
func (c *ChainAccessorClient) GetFeeQuoterDestChainConfig(ctx context.Context, dest ccipocr3.ChainSelector) (ccipocr3.FeeQuoterDestChainConfig, error) {
resp, err := c.grpc.GetFeeQuoterDestChainConfig(ctx, &ccipocr3pb.GetFeeQuoterDestChainConfigRequest{
DestChainSelector: uint64(dest),
})
Expand Down Expand Up @@ -353,7 +367,7 @@ func (s *chainAccessorServer) GetChainFeeComponents(ctx context.Context, req *em
}

func (s *chainAccessorServer) Sync(ctx context.Context, req *ccipocr3pb.SyncRequest) (*emptypb.Empty, error) {
err := s.impl.Sync(ctx, req.ContractName, ccipocr3.UnknownAddress(req.ContractAddress))
err := s.impl.Sync(ctx, req.ContractName, req.ContractAddress)
return &emptypb.Empty{}, err
}

Expand Down
12 changes: 11 additions & 1 deletion pkg/loop/internal/relayer/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@
}

func (r *relayerClient) NewCCIPProvider(ctx context.Context, cargs types.CCIPProviderArgs) (types.CCIPProvider, error) {
var ccipProvider *ccipocr3.CCIPProviderClient
cc := r.NewClientConn("CCIPProvider", func(ctx context.Context) (uint32, net.Resources, error) {
reply, err := r.relayer.NewCCIPProvider(ctx, &pb.NewCCIPProviderRequest{
CcipProviderArgs: &pb.CCIPProviderArgs{
Expand All @@ -297,6 +298,7 @@
ChainWriterConfig: cargs.ChainWriterConfig,
OffRampAddress: cargs.OffRampAddress,
PluginType: cargs.PluginType,
Syncs: ccipProvider.GetSyncs(), // restore state
},
})
if err != nil {
Expand All @@ -305,7 +307,8 @@
return reply.CcipProviderID, nil, nil
})

return ccipocr3.NewCCIPProviderClient(r.WithName(cargs.ExternalJobID.String()).WithName("CCIPProviderClient"), cc), nil
ccipProvider = ccipocr3.NewCCIPProviderClient(r.WithName(cargs.ExternalJobID.String()).WithName("CCIPProviderClient"), cc)
return ccipProvider, nil
}

func (r *relayerClient) LatestHead(ctx context.Context) (types.Head, error) {
Expand Down Expand Up @@ -735,6 +738,13 @@
return nil, err
}

for _, s := range rargs.Syncs {
err = provider.ChainAccessor().Sync(ctx, ...)

Check failure on line 742 in pkg/loop/internal/relayer/relayer.go

View workflow job for this annotation

GitHub Actions / build-test

syntax error: unexpected ..., expected expression

Check failure on line 742 in pkg/loop/internal/relayer/relayer.go

View workflow job for this annotation

GitHub Actions / build-race-tests

syntax error: unexpected ..., expected expression

Check failure on line 742 in pkg/loop/internal/relayer/relayer.go

View workflow job for this annotation

GitHub Actions / benchmark

syntax error: unexpected ..., expected expression
if err != nil {
return nil, err
}
}

const name = "CCIPProvider"
id, _, err := r.ServeNew(name, func(s *grpc.Server) {
ccipocr3.RegisterProviderServices(s, provider)
Expand Down
Loading