From 8679eb6f3c98546ba780bc9fe4d7af16cb2aa9ed Mon Sep 17 00:00:00 2001 From: Jordan Krage Date: Tue, 9 Sep 2025 07:52:50 -0500 Subject: [PATCH 1/2] pkg/loop/internal/relayer/pluginprovider/ext/ccipocr3: resync on CCIPProvider reconnect --- .../ext/ccipocr3/ccip_provider.go | 8 ++- .../ext/ccipocr3/chainaccessor.go | 62 ++++++++++++------- pkg/loop/internal/relayer/relayer.go | 5 +- 3 files changed, 48 insertions(+), 27 deletions(-) diff --git a/pkg/loop/internal/relayer/pluginprovider/ext/ccipocr3/ccip_provider.go b/pkg/loop/internal/relayer/pluginprovider/ext/ccipocr3/ccip_provider.go index a705678aa..68ffdbcf8 100644 --- a/pkg/loop/internal/relayer/pluginprovider/ext/ccipocr3/ccip_provider.go +++ b/pkg/loop/internal/relayer/pluginprovider/ext/ccipocr3/ccip_provider.go @@ -22,7 +22,7 @@ var ( type CCIPProviderClient struct { *goplugin.ServiceClient - chainAccessor ccipocr3.ChainAccessor + chainAccessor *ChainAccessorClient contractTransmitter ocr3types.ContractTransmitter[[]byte] chainSpecificAddressCodec ccipocr3.ChainSpecificAddressCodec commitPluginCodec ccipocr3.CommitPluginCodec @@ -31,7 +31,7 @@ type CCIPProviderClient struct { sourceChainExtraDataCodec ccipocr3.SourceChainExtraDataCodec } -func NewCCIPProviderClient(b *net.BrokerExt, cc grpc.ClientConnInterface) types.CCIPProvider { +func NewCCIPProviderClient(b *net.BrokerExt, cc grpc.ClientConnInterface) *CCIPProviderClient { c := &CCIPProviderClient{ ServiceClient: goplugin.NewServiceClient(b.WithName("CCIPProviderClient"), cc), } @@ -52,6 +52,10 @@ func NewCCIPProviderClient(b *net.BrokerExt, cc grpc.ClientConnInterface) types. return c } +func (p *CCIPProviderClient) GetSyncs() []*ccipocr3pb.SyncRequest { + return p.chainAccessor.GetSyncs() +} + func (p *CCIPProviderClient) ChainAccessor() ccipocr3.ChainAccessor { return p.chainAccessor } diff --git a/pkg/loop/internal/relayer/pluginprovider/ext/ccipocr3/chainaccessor.go b/pkg/loop/internal/relayer/pluginprovider/ext/ccipocr3/chainaccessor.go index ea402484c..41d698053 100644 --- a/pkg/loop/internal/relayer/pluginprovider/ext/ccipocr3/chainaccessor.go +++ b/pkg/loop/internal/relayer/pluginprovider/ext/ccipocr3/chainaccessor.go @@ -2,6 +2,8 @@ package ccipocr3 import ( "context" + "slices" + "sync" "time" "google.golang.org/grpc" @@ -14,22 +16,31 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" ) -var _ ccipocr3.ChainAccessor = (*chainAccessorClient)(nil) +var _ ccipocr3.ChainAccessor = (*ChainAccessorClient)(nil) -type chainAccessorClient struct { +type ChainAccessorClient struct { *net.BrokerExt grpc ccipocr3pb.ChainAccessorClient + + mu sync.RWMutex + syncs []*ccipocr3pb.SyncRequest } -func NewChainAccessorClient(broker *net.BrokerExt, cc grpc.ClientConnInterface) ccipocr3.ChainAccessor { - return &chainAccessorClient{ +func NewChainAccessorClient(broker *net.BrokerExt, cc grpc.ClientConnInterface) *ChainAccessorClient { + return &ChainAccessorClient{ BrokerExt: broker, grpc: ccipocr3pb.NewChainAccessorClient(cc), } } +func (c *ChainAccessorClient) GetSyncs() []*ccipocr3pb.SyncRequest { + c.mu.RLock() + defer c.mu.RUnlock() + return slices.Clone(c.syncs) +} + // AllAccessors methods -func (c *chainAccessorClient) GetContractAddress(contractName string) ([]byte, error) { +func (c *ChainAccessorClient) GetContractAddress(contractName string) ([]byte, error) { resp, err := c.grpc.GetContractAddress(context.Background(), &ccipocr3pb.GetContractAddressRequest{ ContractName: contractName, }) @@ -39,7 +50,7 @@ func (c *chainAccessorClient) GetContractAddress(contractName string) ([]byte, e return resp.Address, nil } -func (c *chainAccessorClient) GetAllConfigsLegacy( +func (c *ChainAccessorClient) GetAllConfigsLegacy( ctx context.Context, destChainSelector ccipocr3.ChainSelector, sourceChainSelectors []ccipocr3.ChainSelector, @@ -66,7 +77,7 @@ func (c *chainAccessorClient) GetAllConfigsLegacy( return pbToChainConfigSnapshotDetailed(resp.Snapshot), sourceConfigs, nil } -func (c *chainAccessorClient) GetChainFeeComponents(ctx context.Context) (ccipocr3.ChainFeeComponents, error) { +func (c *ChainAccessorClient) GetChainFeeComponents(ctx context.Context) (ccipocr3.ChainFeeComponents, error) { resp, err := c.grpc.GetChainFeeComponents(ctx, &emptypb.Empty{}) if err != nil { return ccipocr3.ChainFeeComponents{}, err @@ -77,16 +88,19 @@ func (c *chainAccessorClient) GetChainFeeComponents(ctx context.Context) (ccipoc }, nil } -func (c *chainAccessorClient) Sync(ctx context.Context, contractName string, contractAddress ccipocr3.UnknownAddress) error { - _, err := c.grpc.Sync(ctx, &ccipocr3pb.SyncRequest{ - ContractName: contractName, - ContractAddress: contractAddress, - }) +func (c *ChainAccessorClient) Sync(ctx context.Context, contractName string, contractAddress ccipocr3.UnknownAddress) error { + req := &ccipocr3pb.SyncRequest{ContractName: contractName, ContractAddress: contractAddress} + _, err := c.grpc.Sync(ctx, req) + if err != nil { + c.mu.Lock() + c.syncs = append(c.syncs, req) // TODO dedupe? + c.mu.Unlock() + } return err } // DestinationAccessor methods -func (c *chainAccessorClient) CommitReportsGTETimestamp( +func (c *ChainAccessorClient) CommitReportsGTETimestamp( ctx context.Context, ts time.Time, confidence primitives.ConfidenceLevel, @@ -112,7 +126,7 @@ func (c *chainAccessorClient) CommitReportsGTETimestamp( return reports, nil } -func (c *chainAccessorClient) ExecutedMessages( +func (c *ChainAccessorClient) ExecutedMessages( ctx context.Context, ranges map[ccipocr3.ChainSelector][]ccipocr3.SeqNumRange, confidence primitives.ConfidenceLevel, @@ -149,7 +163,7 @@ func (c *chainAccessorClient) ExecutedMessages( return result, nil } -func (c *chainAccessorClient) NextSeqNum(ctx context.Context, sources []ccipocr3.ChainSelector) (map[ccipocr3.ChainSelector]ccipocr3.SeqNum, error) { +func (c *ChainAccessorClient) NextSeqNum(ctx context.Context, sources []ccipocr3.ChainSelector) (map[ccipocr3.ChainSelector]ccipocr3.SeqNum, error) { var chainSelectors []uint64 for _, source := range sources { chainSelectors = append(chainSelectors, uint64(source)) @@ -169,7 +183,7 @@ func (c *chainAccessorClient) NextSeqNum(ctx context.Context, sources []ccipocr3 return result, nil } -func (c *chainAccessorClient) Nonces(ctx context.Context, addresses map[ccipocr3.ChainSelector][]ccipocr3.UnknownEncodedAddress) (map[ccipocr3.ChainSelector]map[string]uint64, error) { +func (c *ChainAccessorClient) Nonces(ctx context.Context, addresses map[ccipocr3.ChainSelector][]ccipocr3.UnknownEncodedAddress) (map[ccipocr3.ChainSelector]map[string]uint64, error) { req := &ccipocr3pb.NoncesRequest{ Addresses: make(map[uint64]*ccipocr3pb.UnknownEncodedAddressList), } @@ -194,7 +208,7 @@ func (c *chainAccessorClient) Nonces(ctx context.Context, addresses map[ccipocr3 return result, nil } -func (c *chainAccessorClient) GetChainFeePriceUpdate(ctx context.Context, selectors []ccipocr3.ChainSelector) map[ccipocr3.ChainSelector]ccipocr3.TimestampedBig { +func (c *ChainAccessorClient) GetChainFeePriceUpdate(ctx context.Context, selectors []ccipocr3.ChainSelector) map[ccipocr3.ChainSelector]ccipocr3.TimestampedBig { var chainSelectors []uint64 for _, sel := range selectors { chainSelectors = append(chainSelectors, uint64(sel)) @@ -219,7 +233,7 @@ func (c *chainAccessorClient) GetChainFeePriceUpdate(ctx context.Context, select return result } -func (c *chainAccessorClient) GetLatestPriceSeqNr(ctx context.Context) (uint64, error) { +func (c *ChainAccessorClient) GetLatestPriceSeqNr(ctx context.Context) (uint64, error) { resp, err := c.grpc.GetLatestPriceSeqNr(ctx, &emptypb.Empty{}) if err != nil { return 0, err @@ -228,7 +242,7 @@ func (c *chainAccessorClient) GetLatestPriceSeqNr(ctx context.Context) (uint64, } // SourceAccessor methods -func (c *chainAccessorClient) MsgsBetweenSeqNums(ctx context.Context, dest ccipocr3.ChainSelector, seqNumRange ccipocr3.SeqNumRange) ([]ccipocr3.Message, error) { +func (c *ChainAccessorClient) MsgsBetweenSeqNums(ctx context.Context, dest ccipocr3.ChainSelector, seqNumRange ccipocr3.SeqNumRange) ([]ccipocr3.Message, error) { resp, err := c.grpc.MsgsBetweenSeqNums(ctx, &ccipocr3pb.MsgsBetweenSeqNumsRequest{ DestChainSelector: uint64(dest), SeqNumRange: &ccipocr3pb.SeqNumRange{ @@ -247,7 +261,7 @@ func (c *chainAccessorClient) MsgsBetweenSeqNums(ctx context.Context, dest ccipo return messages, nil } -func (c *chainAccessorClient) LatestMessageTo(ctx context.Context, dest ccipocr3.ChainSelector) (ccipocr3.SeqNum, error) { +func (c *ChainAccessorClient) LatestMessageTo(ctx context.Context, dest ccipocr3.ChainSelector) (ccipocr3.SeqNum, error) { resp, err := c.grpc.LatestMessageTo(ctx, &ccipocr3pb.LatestMessageToRequest{ DestChainSelector: uint64(dest), }) @@ -257,7 +271,7 @@ func (c *chainAccessorClient) LatestMessageTo(ctx context.Context, dest ccipocr3 return ccipocr3.SeqNum(resp.SeqNum), nil } -func (c *chainAccessorClient) GetExpectedNextSequenceNumber(ctx context.Context, dest ccipocr3.ChainSelector) (ccipocr3.SeqNum, error) { +func (c *ChainAccessorClient) GetExpectedNextSequenceNumber(ctx context.Context, dest ccipocr3.ChainSelector) (ccipocr3.SeqNum, error) { resp, err := c.grpc.GetExpectedNextSequenceNumber(ctx, &ccipocr3pb.GetExpectedNextSequenceNumberRequest{ DestChainSelector: uint64(dest), }) @@ -267,7 +281,7 @@ func (c *chainAccessorClient) GetExpectedNextSequenceNumber(ctx context.Context, return ccipocr3.SeqNum(resp.SeqNum), nil } -func (c *chainAccessorClient) GetTokenPriceUSD(ctx context.Context, address ccipocr3.UnknownAddress) (ccipocr3.TimestampedUnixBig, error) { +func (c *ChainAccessorClient) GetTokenPriceUSD(ctx context.Context, address ccipocr3.UnknownAddress) (ccipocr3.TimestampedUnixBig, error) { resp, err := c.grpc.GetTokenPriceUSD(ctx, &ccipocr3pb.GetTokenPriceUSDRequest{ Address: address, }) @@ -280,7 +294,7 @@ func (c *chainAccessorClient) GetTokenPriceUSD(ctx context.Context, address ccip }, nil } -func (c *chainAccessorClient) GetFeeQuoterDestChainConfig(ctx context.Context, dest ccipocr3.ChainSelector) (ccipocr3.FeeQuoterDestChainConfig, error) { +func (c *ChainAccessorClient) GetFeeQuoterDestChainConfig(ctx context.Context, dest ccipocr3.ChainSelector) (ccipocr3.FeeQuoterDestChainConfig, error) { resp, err := c.grpc.GetFeeQuoterDestChainConfig(ctx, &ccipocr3pb.GetFeeQuoterDestChainConfigRequest{ DestChainSelector: uint64(dest), }) @@ -353,7 +367,7 @@ func (s *chainAccessorServer) GetChainFeeComponents(ctx context.Context, req *em } func (s *chainAccessorServer) Sync(ctx context.Context, req *ccipocr3pb.SyncRequest) (*emptypb.Empty, error) { - err := s.impl.Sync(ctx, req.ContractName, ccipocr3.UnknownAddress(req.ContractAddress)) + err := s.impl.Sync(ctx, req.ContractName, req.ContractAddress) return &emptypb.Empty{}, err } diff --git a/pkg/loop/internal/relayer/relayer.go b/pkg/loop/internal/relayer/relayer.go index 3c4b7f092..39ad4b5cc 100644 --- a/pkg/loop/internal/relayer/relayer.go +++ b/pkg/loop/internal/relayer/relayer.go @@ -289,6 +289,7 @@ func (r *relayerClient) NewLLOProvider(ctx context.Context, rargs types.RelayArg } func (r *relayerClient) NewCCIPProvider(ctx context.Context, cargs types.CCIPProviderArgs) (types.CCIPProvider, error) { + var ccipProvider *ccipocr3.CCIPProviderClient cc := r.NewClientConn("CCIPProvider", func(ctx context.Context) (uint32, net.Resources, error) { reply, err := r.relayer.NewCCIPProvider(ctx, &pb.NewCCIPProviderRequest{ CcipProviderArgs: &pb.CCIPProviderArgs{ @@ -297,6 +298,7 @@ func (r *relayerClient) NewCCIPProvider(ctx context.Context, cargs types.CCIPPro ChainWriterConfig: cargs.ChainWriterConfig, OffRampAddress: cargs.OffRampAddress, PluginType: cargs.PluginType, + Syncs: ccipProvider.GetSyncs(), // restore state }, }) if err != nil { @@ -305,7 +307,8 @@ func (r *relayerClient) NewCCIPProvider(ctx context.Context, cargs types.CCIPPro return reply.CcipProviderID, nil, nil }) - return ccipocr3.NewCCIPProviderClient(r.WithName(cargs.ExternalJobID.String()).WithName("CCIPProviderClient"), cc), nil + ccipProvider = ccipocr3.NewCCIPProviderClient(r.WithName(cargs.ExternalJobID.String()).WithName("CCIPProviderClient"), cc) + return ccipProvider, nil } func (r *relayerClient) LatestHead(ctx context.Context) (types.Head, error) { From bcf1a79d1c15a6bd349413d51d1258b7ee7725c3 Mon Sep 17 00:00:00 2001 From: Jordan Krage Date: Tue, 9 Sep 2025 10:13:24 -0500 Subject: [PATCH 2/2] pkg/loop/internal/relayer: update relayerServer.NewCCIPProvider --- pkg/loop/internal/relayer/relayer.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pkg/loop/internal/relayer/relayer.go b/pkg/loop/internal/relayer/relayer.go index 39ad4b5cc..9a3657db0 100644 --- a/pkg/loop/internal/relayer/relayer.go +++ b/pkg/loop/internal/relayer/relayer.go @@ -738,6 +738,13 @@ func (r *relayerServer) NewCCIPProvider(ctx context.Context, request *pb.NewCCIP return nil, err } + for _, s := range rargs.Syncs { + err = provider.ChainAccessor().Sync(ctx, ...) + if err != nil { + return nil, err + } + } + const name = "CCIPProvider" id, _, err := r.ServeNew(name, func(s *grpc.Server) { ccipocr3.RegisterProviderServices(s, provider)