From c4c3fe3fb86ab5d491cf44d75252057bb54215a2 Mon Sep 17 00:00:00 2001 From: Oliver Townsend Date: Sun, 14 Sep 2025 12:23:29 -0300 Subject: [PATCH 1/2] CCIP - add extra data codec registry service interface --- pkg/loop/internal/relayer/relayer.go | 4 ++-- pkg/loop/internal/relayer/test/relayer.go | 2 +- pkg/loop/internal/types/types.go | 2 +- pkg/loop/mocks/relayer.go | 29 ++++++++++++----------- pkg/loop/relayer_service.go | 4 ++-- pkg/types/provider_ccip.go | 16 +++++++++++++ pkg/types/relayer.go | 4 ++-- 7 files changed, 39 insertions(+), 22 deletions(-) diff --git a/pkg/loop/internal/relayer/relayer.go b/pkg/loop/internal/relayer/relayer.go index a97a3e8019..93d3a8f87c 100644 --- a/pkg/loop/internal/relayer/relayer.go +++ b/pkg/loop/internal/relayer/relayer.go @@ -288,7 +288,7 @@ func (r *relayerClient) NewLLOProvider(ctx context.Context, rargs types.RelayArg return nil, fmt.Errorf("llo provider not supported: %w", errors.ErrUnsupported) } -func (r *relayerClient) NewCCIPProvider(ctx context.Context, cargs types.CCIPProviderArgs) (types.CCIPProvider, error) { +func (r *relayerClient) NewCCIPProvider(ctx context.Context, cargs types.CCIPProviderArgs, edcs types.ExtraDataCodecRegistryService) (types.CCIPProvider, error) { var ccipProvider *ccipocr3.CCIPProviderClient cc := r.NewClientConn("CCIPProvider", func(ctx context.Context) (uint32, net.Resources, error) { persistedSyncs := ccipProvider.GetSyncRequests() @@ -734,7 +734,7 @@ func (r *relayerServer) NewCCIPProvider(ctx context.Context, request *pb.NewCCIP PluginType: rargs.PluginType, } - provider, err := r.impl.NewCCIPProvider(ctx, ccipProviderArgs) + provider, err := r.impl.NewCCIPProvider(ctx, ccipProviderArgs, nil) if err != nil { return nil, err } diff --git a/pkg/loop/internal/relayer/test/relayer.go b/pkg/loop/internal/relayer/test/relayer.go index 5a181fdbb0..36e2b6aa31 100644 --- a/pkg/loop/internal/relayer/test/relayer.go +++ b/pkg/loop/internal/relayer/test/relayer.go @@ -313,7 +313,7 @@ func (s staticRelayer) NewLLOProvider(ctx context.Context, r types.RelayArgs, p return nil, errors.New("not implemented") } -func (s staticRelayer) NewCCIPProvider(ctx context.Context, r types.CCIPProviderArgs) (types.CCIPProvider, error) { +func (s staticRelayer) NewCCIPProvider(ctx context.Context, r types.CCIPProviderArgs, edcs types.ExtraDataCodecRegistryService) (types.CCIPProvider, error) { ccipProviderArgs := types.CCIPProviderArgs{ ExternalJobID: s.relayArgs.ExternalJobID, ContractReaderConfig: s.contractReaderConfig, diff --git a/pkg/loop/internal/types/types.go b/pkg/loop/internal/types/types.go index 0ffb588c8c..faad347ccd 100644 --- a/pkg/loop/internal/types/types.go +++ b/pkg/loop/internal/types/types.go @@ -58,7 +58,7 @@ type Relayer interface { NewConfigProvider(context.Context, types.RelayArgs) (types.ConfigProvider, error) NewPluginProvider(context.Context, types.RelayArgs, types.PluginArgs) (types.PluginProvider, error) NewLLOProvider(context.Context, types.RelayArgs, types.PluginArgs) (types.LLOProvider, error) - NewCCIPProvider(context.Context, types.CCIPProviderArgs) (types.CCIPProvider, error) + NewCCIPProvider(context.Context, types.CCIPProviderArgs, types.ExtraDataCodecRegistryService) (types.CCIPProvider, error) } // Keystore This interface contains all the keystore GRPC functionality, keystore.Keystore is meant to be exposed to consumers and the keystore.Management interface in exposed only to the core node diff --git a/pkg/loop/mocks/relayer.go b/pkg/loop/mocks/relayer.go index 7f0d29d563..bcf373cd48 100644 --- a/pkg/loop/mocks/relayer.go +++ b/pkg/loop/mocks/relayer.go @@ -460,9 +460,9 @@ func (_c *Relayer_Name_Call) RunAndReturn(run func() string) *Relayer_Name_Call return _c } -// NewCCIPProvider provides a mock function with given fields: _a0, _a1 -func (_m *Relayer) NewCCIPProvider(_a0 context.Context, _a1 types.CCIPProviderArgs) (types.CCIPProvider, error) { - ret := _m.Called(_a0, _a1) +// NewCCIPProvider provides a mock function with given fields: _a0, _a1, _a2 +func (_m *Relayer) NewCCIPProvider(_a0 context.Context, _a1 types.CCIPProviderArgs, _a2 types.ExtraDataCodecRegistryService) (types.CCIPProvider, error) { + ret := _m.Called(_a0, _a1, _a2) if len(ret) == 0 { panic("no return value specified for NewCCIPProvider") @@ -470,19 +470,19 @@ func (_m *Relayer) NewCCIPProvider(_a0 context.Context, _a1 types.CCIPProviderAr var r0 types.CCIPProvider var r1 error - if rf, ok := ret.Get(0).(func(context.Context, types.CCIPProviderArgs) (types.CCIPProvider, error)); ok { - return rf(_a0, _a1) + if rf, ok := ret.Get(0).(func(context.Context, types.CCIPProviderArgs, types.ExtraDataCodecRegistryService) (types.CCIPProvider, error)); ok { + return rf(_a0, _a1, _a2) } - if rf, ok := ret.Get(0).(func(context.Context, types.CCIPProviderArgs) types.CCIPProvider); ok { - r0 = rf(_a0, _a1) + if rf, ok := ret.Get(0).(func(context.Context, types.CCIPProviderArgs, types.ExtraDataCodecRegistryService) types.CCIPProvider); ok { + r0 = rf(_a0, _a1, _a2) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(types.CCIPProvider) } } - if rf, ok := ret.Get(1).(func(context.Context, types.CCIPProviderArgs) error); ok { - r1 = rf(_a0, _a1) + if rf, ok := ret.Get(1).(func(context.Context, types.CCIPProviderArgs, types.ExtraDataCodecRegistryService) error); ok { + r1 = rf(_a0, _a1, _a2) } else { r1 = ret.Error(1) } @@ -498,13 +498,14 @@ type Relayer_NewCCIPProvider_Call struct { // NewCCIPProvider is a helper method to define mock.On call // - _a0 context.Context // - _a1 types.CCIPProviderArgs -func (_e *Relayer_Expecter) NewCCIPProvider(_a0 interface{}, _a1 interface{}) *Relayer_NewCCIPProvider_Call { - return &Relayer_NewCCIPProvider_Call{Call: _e.mock.On("NewCCIPProvider", _a0, _a1)} +// - _a2 types.ExtraDataCodecRegistryService +func (_e *Relayer_Expecter) NewCCIPProvider(_a0 interface{}, _a1 interface{}, _a2 interface{}) *Relayer_NewCCIPProvider_Call { + return &Relayer_NewCCIPProvider_Call{Call: _e.mock.On("NewCCIPProvider", _a0, _a1, _a2)} } -func (_c *Relayer_NewCCIPProvider_Call) Run(run func(_a0 context.Context, _a1 types.CCIPProviderArgs)) *Relayer_NewCCIPProvider_Call { +func (_c *Relayer_NewCCIPProvider_Call) Run(run func(_a0 context.Context, _a1 types.CCIPProviderArgs, _a2 types.ExtraDataCodecRegistryService)) *Relayer_NewCCIPProvider_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(types.CCIPProviderArgs)) + run(args[0].(context.Context), args[1].(types.CCIPProviderArgs), args[2].(types.ExtraDataCodecRegistryService)) }) return _c } @@ -514,7 +515,7 @@ func (_c *Relayer_NewCCIPProvider_Call) Return(_a0 types.CCIPProvider, _a1 error return _c } -func (_c *Relayer_NewCCIPProvider_Call) RunAndReturn(run func(context.Context, types.CCIPProviderArgs) (types.CCIPProvider, error)) *Relayer_NewCCIPProvider_Call { +func (_c *Relayer_NewCCIPProvider_Call) RunAndReturn(run func(context.Context, types.CCIPProviderArgs, types.ExtraDataCodecRegistryService) (types.CCIPProvider, error)) *Relayer_NewCCIPProvider_Call { _c.Call.Return(run) return _c } diff --git a/pkg/loop/relayer_service.go b/pkg/loop/relayer_service.go index 61a5a3b4be..5f9bf1d5bb 100644 --- a/pkg/loop/relayer_service.go +++ b/pkg/loop/relayer_service.go @@ -85,11 +85,11 @@ func (r *RelayerService) NewLLOProvider(ctx context.Context, rargs types.RelayAr return r.Service.NewLLOProvider(ctx, rargs, pargs) } -func (r *RelayerService) NewCCIPProvider(ctx context.Context, cargs types.CCIPProviderArgs) (types.CCIPProvider, error) { +func (r *RelayerService) NewCCIPProvider(ctx context.Context, cargs types.CCIPProviderArgs, edcs types.ExtraDataCodecRegistryService) (types.CCIPProvider, error) { if err := r.WaitCtx(ctx); err != nil { return nil, err } - return r.Service.NewCCIPProvider(ctx, cargs) + return r.Service.NewCCIPProvider(ctx, cargs, edcs) } func (r *RelayerService) LatestHead(ctx context.Context) (types.Head, error) { diff --git a/pkg/types/provider_ccip.go b/pkg/types/provider_ccip.go index 461fbbe4cc..b6f27e6e03 100644 --- a/pkg/types/provider_ccip.go +++ b/pkg/types/provider_ccip.go @@ -49,6 +49,8 @@ type CCIPFactoryGenerator interface { CCIPExecutionFactoryGenerator } +// CCIPProvider is a product-specific interface that exposes the necessary components +// for running CCIP on a specific chain/chain family. It includes access to the ChainAccessor, type CCIPProvider interface { services.Service ChainAccessor() ccipocr3.ChainAccessor @@ -65,3 +67,17 @@ type CCIPProviderArgs struct { OffRampAddress string PluginType uint32 } + +// ExtraDataCodecRegistryService maintains a registry of SourceChainExtraDataCodec instances by chain family. +// It implements the Service interface and manages the lifecycle of codec registrations. +type ExtraDataCodecRegistryService interface { + services.Service + // RegisterChainFamily pre-registers a chain family that will be initialized later. + RegisterChainFamily(chainFamily string) + // SetSourceChainCodec registers a SourceChainExtraDataCodec for the given chain family + // and marks it as initialized. + SetSourceChainCodec(chainFamily string, codec ccipocr3.SourceChainExtraDataCodec) + // GetExtraDataCodec returns the complete ExtraDataCodec map. + // This includes all registered chain families, with no-op codecs for uninitialized ones. + GetExtraDataCodec() (ccipocr3.ExtraDataCodec, error) +} diff --git a/pkg/types/relayer.go b/pkg/types/relayer.go index f6112b948a..59f8db1f7a 100644 --- a/pkg/types/relayer.go +++ b/pkg/types/relayer.go @@ -255,7 +255,7 @@ type Relayer interface { NewOCR3CapabilityProvider(ctx context.Context, rargs RelayArgs, pargs PluginArgs) (OCR3CapabilityProvider, error) - NewCCIPProvider(ctx context.Context, cargs CCIPProviderArgs) (CCIPProvider, error) + NewCCIPProvider(ctx context.Context, cargs CCIPProviderArgs, edcs ExtraDataCodecRegistryService) (CCIPProvider, error) } var _ Relayer = &UnimplementedRelayer{} @@ -366,6 +366,6 @@ func (u *UnimplementedRelayer) NewOCR3CapabilityProvider(ctx context.Context, ra return nil, status.Errorf(codes.Unimplemented, "method NewOCR3CapabilityProvider not implemented") } -func (u *UnimplementedRelayer) NewCCIPProvider(ctx context.Context, cargs CCIPProviderArgs) (CCIPProvider, error) { +func (u *UnimplementedRelayer) NewCCIPProvider(ctx context.Context, cargs CCIPProviderArgs, edcs ExtraDataCodecRegistryService) (CCIPProvider, error) { return nil, status.Errorf(codes.Unimplemented, "method NewCCIPProvider not implemented") } From 92e55482ddca7d8053418610a8cbeb0cdefe940e Mon Sep 17 00:00:00 2001 From: Oliver Townsend Date: Sun, 14 Sep 2025 13:22:40 -0300 Subject: [PATCH 2/2] add extraDataCodecRegistry service to NewRelayer interface --- pkg/loop/internal/example-relay/main.go | 6 ++- pkg/loop/internal/pb/generate.go | 1 + pkg/loop/internal/pb/relayer.pb.go | 27 ++++++---- pkg/loop/internal/pb/relayer.proto | 1 + pkg/loop/internal/relayer/relayer.go | 61 +++++++++++++++++++---- pkg/loop/internal/relayer/test/relayer.go | 8 +-- pkg/loop/internal/types/types.go | 4 +- pkg/loop/mocks/relayer.go | 29 ++++++----- pkg/loop/relayer_service.go | 8 +-- 9 files changed, 99 insertions(+), 46 deletions(-) diff --git a/pkg/loop/internal/example-relay/main.go b/pkg/loop/internal/example-relay/main.go index b61fd86d1b..8e18d6c594 100644 --- a/pkg/loop/internal/example-relay/main.go +++ b/pkg/loop/internal/example-relay/main.go @@ -60,7 +60,7 @@ func (p *pluginRelayer) HealthReport() map[string]error { return map[string]erro func (p *pluginRelayer) Name() string { return p.lggr.Name() } -func (p *pluginRelayer) NewRelayer(ctx context.Context, config string, keystore, csaKeystore core.Keystore, cr core.CapabilitiesRegistry) (loop.Relayer, error) { +func (p *pluginRelayer) NewRelayer(ctx context.Context, config string, keystore, csaKeystore core.Keystore, cr core.CapabilitiesRegistry, edcrs types.ExtraDataCodecRegistryService) (loop.Relayer, error) { return &relayer{lggr: logger.Named(p.lggr, "Relayer"), ds: p.ds}, nil } @@ -142,3 +142,7 @@ func (r *relayer) NewPluginProvider(ctx context.Context, args types.RelayArgs, a func (r *relayer) NewLLOProvider(ctx context.Context, args types.RelayArgs, args2 types.PluginArgs) (types.LLOProvider, error) { return nil, errors.New("unimplemented") } + +func (r *relayer) NewCCIPProvider(ctx context.Context, args types.CCIPProviderArgs) (types.CCIPProvider, error) { + return nil, errors.New("unimplemented") +} diff --git a/pkg/loop/internal/pb/generate.go b/pkg/loop/internal/pb/generate.go index 1b274ab304..97f91090bc 100644 --- a/pkg/loop/internal/pb/generate.go +++ b/pkg/loop/internal/pb/generate.go @@ -10,4 +10,5 @@ //go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative pipeline_runner.proto //go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative keyvalue_store.proto //go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative validate_config.proto +//go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative extra_data_codec_registry.proto package pb diff --git a/pkg/loop/internal/pb/relayer.pb.go b/pkg/loop/internal/pb/relayer.pb.go index c3a3d8937a..1b395c3d97 100644 --- a/pkg/loop/internal/pb/relayer.pb.go +++ b/pkg/loop/internal/pb/relayer.pb.go @@ -24,13 +24,14 @@ const ( ) type NewRelayerRequest struct { - state protoimpl.MessageState `protogen:"open.v1"` - Config string `protobuf:"bytes,1,opt,name=config,proto3" json:"config,omitempty"` // toml (is chain instance config enough?) - KeystoreID uint32 `protobuf:"varint,2,opt,name=keystoreID,proto3" json:"keystoreID,omitempty"` - CapabilityRegistryID uint32 `protobuf:"varint,3,opt,name=capabilityRegistryID,proto3" json:"capabilityRegistryID,omitempty"` - KeystoreCSAID uint32 `protobuf:"varint,4,opt,name=keystoreCSAID,proto3" json:"keystoreCSAID,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + Config string `protobuf:"bytes,1,opt,name=config,proto3" json:"config,omitempty"` // toml (is chain instance config enough?) + KeystoreID uint32 `protobuf:"varint,2,opt,name=keystoreID,proto3" json:"keystoreID,omitempty"` + CapabilityRegistryID uint32 `protobuf:"varint,3,opt,name=capabilityRegistryID,proto3" json:"capabilityRegistryID,omitempty"` + KeystoreCSAID uint32 `protobuf:"varint,4,opt,name=keystoreCSAID,proto3" json:"keystoreCSAID,omitempty"` + ExtraDataCodecRegistryServiceID uint32 `protobuf:"varint,5,opt,name=extraDataCodecRegistryServiceID,proto3" json:"extraDataCodecRegistryServiceID,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *NewRelayerRequest) Reset() { @@ -91,6 +92,13 @@ func (x *NewRelayerRequest) GetKeystoreCSAID() uint32 { return 0 } +func (x *NewRelayerRequest) GetExtraDataCodecRegistryServiceID() uint32 { + if x != nil { + return x.ExtraDataCodecRegistryServiceID + } + return 0 +} + type NewRelayerReply struct { state protoimpl.MessageState `protogen:"open.v1"` RelayerID uint32 `protobuf:"varint,1,opt,name=relayerID,proto3" json:"relayerID,omitempty"` @@ -2701,14 +2709,15 @@ var File_loop_internal_pb_relayer_proto protoreflect.FileDescriptor const file_loop_internal_pb_relayer_proto_rawDesc = "" + "\n" + - "\x1eloop/internal/pb/relayer.proto\x12\x04loop\x1a\x1bgoogle/protobuf/empty.proto\x1a\x1cgoogle/protobuf/struct.proto\x1a&loop/internal/pb/contract_reader.proto\"\xa5\x01\n" + + "\x1eloop/internal/pb/relayer.proto\x12\x04loop\x1a\x1bgoogle/protobuf/empty.proto\x1a\x1cgoogle/protobuf/struct.proto\x1a&loop/internal/pb/contract_reader.proto\"\xef\x01\n" + "\x11NewRelayerRequest\x12\x16\n" + "\x06config\x18\x01 \x01(\tR\x06config\x12\x1e\n" + "\n" + "keystoreID\x18\x02 \x01(\rR\n" + "keystoreID\x122\n" + "\x14capabilityRegistryID\x18\x03 \x01(\rR\x14capabilityRegistryID\x12$\n" + - "\rkeystoreCSAID\x18\x04 \x01(\rR\rkeystoreCSAID\"/\n" + + "\rkeystoreCSAID\x18\x04 \x01(\rR\rkeystoreCSAID\x12H\n" + + "\x1fextraDataCodecRegistryServiceID\x18\x05 \x01(\rR\x1fextraDataCodecRegistryServiceID\"/\n" + "\x0fNewRelayerReply\x12\x1c\n" + "\trelayerID\x18\x01 \x01(\rR\trelayerID\"\xbf\x01\n" + "\tRelayArgs\x12$\n" + diff --git a/pkg/loop/internal/pb/relayer.proto b/pkg/loop/internal/pb/relayer.proto index 5ee90a6f27..b6d8b267c3 100644 --- a/pkg/loop/internal/pb/relayer.proto +++ b/pkg/loop/internal/pb/relayer.proto @@ -17,6 +17,7 @@ message NewRelayerRequest { uint32 keystoreID = 2; uint32 capabilityRegistryID = 3; uint32 keystoreCSAID = 4; + uint32 extraDataCodecRegistryServiceID = 5; } message NewRelayerReply { diff --git a/pkg/loop/internal/relayer/relayer.go b/pkg/loop/internal/relayer/relayer.go index 93d3a8f87c..f3a3f303e0 100644 --- a/pkg/loop/internal/relayer/relayer.go +++ b/pkg/loop/internal/relayer/relayer.go @@ -17,6 +17,7 @@ import ( tonpb "github.com/smartcontractkit/chainlink-common/pkg/chains/ton" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/loop/internal/core/services/capability" + "github.com/smartcontractkit/chainlink-common/pkg/loop/internal/core/services/codec" ks "github.com/smartcontractkit/chainlink-common/pkg/loop/internal/core/services/keystore" "github.com/smartcontractkit/chainlink-common/pkg/loop/internal/goplugin" "github.com/smartcontractkit/chainlink-common/pkg/loop/internal/net" @@ -49,7 +50,7 @@ func NewPluginRelayerClient(brokerCfg net.BrokerConfig) *PluginRelayerClient { return &PluginRelayerClient{PluginClient: pc, pluginRelayer: pb.NewPluginRelayerClient(pc), ServiceClient: goplugin.NewServiceClient(pc.BrokerExt, pc)} } -func (p *PluginRelayerClient) NewRelayer(ctx context.Context, config string, keystore, csaKeystore core.Keystore, capabilityRegistry core.CapabilitiesRegistry) (looptypes.Relayer, error) { +func (p *PluginRelayerClient) NewRelayer(ctx context.Context, config string, keystore, csaKeystore core.Keystore, capabilityRegistry core.CapabilitiesRegistry, edcrs types.ExtraDataCodecRegistryService) (looptypes.Relayer, error) { cc := p.NewClientConn("Relayer", func(ctx context.Context) (relayerID uint32, deps net.Resources, err error) { var ksRes net.Resource ksID, ksRes, err := p.ServeNew("Keystore", func(s *grpc.Server) { @@ -77,11 +78,24 @@ func (p *PluginRelayerClient) NewRelayer(ctx context.Context, config string, key } deps.Add(capabilityRegistryResource) + var edcrsID uint32 + if edcrs != nil { + var edcrsRes net.Resource + edcrsID, edcrsRes, err = p.ServeNew("ExtraDataCodecRegistryService", func(s *grpc.Server) { + pb.RegisterExtraDataCodecRegistryServiceServer(s, codec.NewExtraDataCodecRegistryServer(p.BrokerExt, edcrs)) + }) + if err != nil { + return 0, nil, fmt.Errorf("failed to serve new extra data codec registry service: %w", err) + } + deps.Add(edcrsRes) + } + reply, err := p.pluginRelayer.NewRelayer(ctx, &pb.NewRelayerRequest{ - Config: config, - KeystoreID: ksID, - KeystoreCSAID: ksCSAID, - CapabilityRegistryID: capabilityRegistryID, + Config: config, + KeystoreID: ksID, + KeystoreCSAID: ksCSAID, + CapabilityRegistryID: capabilityRegistryID, + ExtraDataCodecRegistryServiceID: edcrsID, }) if err != nil { return 0, nil, fmt.Errorf("Failed to create relayer client: failed request: %w", err) @@ -132,19 +146,44 @@ func (p *pluginRelayerServer) NewRelayer(ctx context.Context, request *pb.NewRel crRes := net.Resource{Closer: capRegistryConn, Name: "CapabilityRegistry"} capRegistry := capability.NewCapabilitiesRegistryClient(capRegistryConn, p.BrokerExt) - r, err := p.impl.NewRelayer(ctx, request.Config, ks.NewClient(ksConn), ks.NewClient(ksCSAConn), capRegistry) + var edcrs types.ExtraDataCodecRegistryService + var edcrsRes net.Resource + if request.ExtraDataCodecRegistryServiceID != 0 { + edcrsConn, err := p.Dial(request.ExtraDataCodecRegistryServiceID) + if err != nil { + p.CloseAll(ksRes, ksCSARes, crRes) + return nil, net.ErrConnDial{Name: "ExtraDataCodecRegistryService", ID: request.ExtraDataCodecRegistryServiceID, Err: err} + } + edcrsRes = net.Resource{Closer: edcrsConn, Name: "ExtraDataCodecRegistryService"} + edcrs = codec.NewExtraDataCodecRegistryClient(edcrsConn, p.BrokerExt) + } + + r, err := p.impl.NewRelayer(ctx, request.Config, ks.NewClient(ksConn), ks.NewClient(ksCSAConn), capRegistry, edcrs) if err != nil { - p.CloseAll(ksRes, ksCSARes, crRes) + if edcrsRes.Name != "" { + p.CloseAll(ksRes, ksCSARes, crRes, edcrsRes) + } else { + p.CloseAll(ksRes, ksCSARes, crRes) + } return nil, err } err = r.Start(ctx) if err != nil { - p.CloseAll(ksRes, ksCSARes, crRes) + if edcrsRes.Name != "" { + p.CloseAll(ksRes, ksCSARes, crRes, edcrsRes) + } else { + p.CloseAll(ksRes, ksCSARes, crRes) + } return nil, err } const name = "Relayer" rRes := net.Resource{Closer: r, Name: name} + var resources []net.Resource + resources = append(resources, rRes, ksRes, ksCSARes, crRes) + if edcrsRes.Name != "" { + resources = append(resources, edcrsRes) + } id, _, err := p.ServeNew(name, func(s *grpc.Server) { pb.RegisterServiceServer(s, &goplugin.ServiceServer{Srv: r}) pb.RegisterRelayerServer(s, newChainRelayerServer(r, p.BrokerExt)) @@ -154,7 +193,7 @@ func (p *pluginRelayerServer) NewRelayer(ctx context.Context, request *pb.NewRel if tonService, ok := r.(types.TONService); ok { tonpb.RegisterTONServer(s, newTONServer(tonService, p.BrokerExt)) } - }, rRes, ksRes, ksCSARes, crRes) + }, resources...) if err != nil { return nil, err } @@ -288,7 +327,7 @@ func (r *relayerClient) NewLLOProvider(ctx context.Context, rargs types.RelayArg return nil, fmt.Errorf("llo provider not supported: %w", errors.ErrUnsupported) } -func (r *relayerClient) NewCCIPProvider(ctx context.Context, cargs types.CCIPProviderArgs, edcs types.ExtraDataCodecRegistryService) (types.CCIPProvider, error) { +func (r *relayerClient) NewCCIPProvider(ctx context.Context, cargs types.CCIPProviderArgs) (types.CCIPProvider, error) { var ccipProvider *ccipocr3.CCIPProviderClient cc := r.NewClientConn("CCIPProvider", func(ctx context.Context) (uint32, net.Resources, error) { persistedSyncs := ccipProvider.GetSyncRequests() @@ -734,7 +773,7 @@ func (r *relayerServer) NewCCIPProvider(ctx context.Context, request *pb.NewCCIP PluginType: rargs.PluginType, } - provider, err := r.impl.NewCCIPProvider(ctx, ccipProviderArgs, nil) + provider, err := r.impl.NewCCIPProvider(ctx, ccipProviderArgs) if err != nil { return nil, err } diff --git a/pkg/loop/internal/relayer/test/relayer.go b/pkg/loop/internal/relayer/test/relayer.go index 36e2b6aa31..e6ac291bb1 100644 --- a/pkg/loop/internal/relayer/test/relayer.go +++ b/pkg/loop/internal/relayer/test/relayer.go @@ -158,7 +158,7 @@ func (s staticPluginRelayer) HealthReport() map[string]error { return hp } -func (s staticPluginRelayer) NewRelayer(ctx context.Context, config string, keystore, csaKeystore core.Keystore, capabilityRegistry core.CapabilitiesRegistry) (looptypes.Relayer, error) { +func (s staticPluginRelayer) NewRelayer(ctx context.Context, config string, keystore, csaKeystore core.Keystore, capabilityRegistry core.CapabilitiesRegistry, edcrs types.ExtraDataCodecRegistryService) (looptypes.Relayer, error) { if s.relayer.StaticChecks && config != ConfigTOML { return nil, fmt.Errorf("expected config %q but got %q", ConfigTOML, config) } @@ -313,7 +313,7 @@ func (s staticRelayer) NewLLOProvider(ctx context.Context, r types.RelayArgs, p return nil, errors.New("not implemented") } -func (s staticRelayer) NewCCIPProvider(ctx context.Context, r types.CCIPProviderArgs, edcs types.ExtraDataCodecRegistryService) (types.CCIPProvider, error) { +func (s staticRelayer) NewCCIPProvider(ctx context.Context, r types.CCIPProviderArgs) (types.CCIPProvider, error) { ccipProviderArgs := types.CCIPProviderArgs{ ExternalJobID: s.relayArgs.ExternalJobID, ContractReaderConfig: s.contractReaderConfig, @@ -495,7 +495,7 @@ func newRelayArgsWithProviderType(_type types.OCR2PluginType) types.RelayArgs { func RunPlugin(t *testing.T, p looptypes.PluginRelayer) { t.Run("Relayer", func(t *testing.T) { ctx := t.Context() - relayer, err := p.NewRelayer(ctx, ConfigTOML, keystoretest.Keystore, keystoretest.Keystore, nil) + relayer, err := p.NewRelayer(ctx, ConfigTOML, keystoretest.Keystore, keystoretest.Keystore, nil, nil) require.NoError(t, err) servicetest.Run(t, relayer) Run(t, relayer) @@ -542,7 +542,7 @@ func RunFuzzPluginRelayer(f *testing.F, relayerFunc func(*testing.T) looptypes.P } ctx := t.Context() - _, err := relayerFunc(t).NewRelayer(ctx, fConfig, keystore, keystore, nil) + _, err := relayerFunc(t).NewRelayer(ctx, fConfig, keystore, keystore, nil, nil) grpcUnavailableErr(t, err) }) diff --git a/pkg/loop/internal/types/types.go b/pkg/loop/internal/types/types.go index faad347ccd..71d8cae66b 100644 --- a/pkg/loop/internal/types/types.go +++ b/pkg/loop/internal/types/types.go @@ -11,7 +11,7 @@ import ( type PluginRelayer interface { services.Service - NewRelayer(ctx context.Context, config string, keystore, csaKeystore core.Keystore, capabilityRegistry core.CapabilitiesRegistry) (Relayer, error) + NewRelayer(ctx context.Context, config string, keystore, csaKeystore core.Keystore, capabilityRegistry core.CapabilitiesRegistry, edcrs types.ExtraDataCodecRegistryService) (Relayer, error) } type MedianProvider interface { @@ -58,7 +58,7 @@ type Relayer interface { NewConfigProvider(context.Context, types.RelayArgs) (types.ConfigProvider, error) NewPluginProvider(context.Context, types.RelayArgs, types.PluginArgs) (types.PluginProvider, error) NewLLOProvider(context.Context, types.RelayArgs, types.PluginArgs) (types.LLOProvider, error) - NewCCIPProvider(context.Context, types.CCIPProviderArgs, types.ExtraDataCodecRegistryService) (types.CCIPProvider, error) + NewCCIPProvider(context.Context, types.CCIPProviderArgs) (types.CCIPProvider, error) } // Keystore This interface contains all the keystore GRPC functionality, keystore.Keystore is meant to be exposed to consumers and the keystore.Management interface in exposed only to the core node diff --git a/pkg/loop/mocks/relayer.go b/pkg/loop/mocks/relayer.go index bcf373cd48..7f0d29d563 100644 --- a/pkg/loop/mocks/relayer.go +++ b/pkg/loop/mocks/relayer.go @@ -460,9 +460,9 @@ func (_c *Relayer_Name_Call) RunAndReturn(run func() string) *Relayer_Name_Call return _c } -// NewCCIPProvider provides a mock function with given fields: _a0, _a1, _a2 -func (_m *Relayer) NewCCIPProvider(_a0 context.Context, _a1 types.CCIPProviderArgs, _a2 types.ExtraDataCodecRegistryService) (types.CCIPProvider, error) { - ret := _m.Called(_a0, _a1, _a2) +// NewCCIPProvider provides a mock function with given fields: _a0, _a1 +func (_m *Relayer) NewCCIPProvider(_a0 context.Context, _a1 types.CCIPProviderArgs) (types.CCIPProvider, error) { + ret := _m.Called(_a0, _a1) if len(ret) == 0 { panic("no return value specified for NewCCIPProvider") @@ -470,19 +470,19 @@ func (_m *Relayer) NewCCIPProvider(_a0 context.Context, _a1 types.CCIPProviderAr var r0 types.CCIPProvider var r1 error - if rf, ok := ret.Get(0).(func(context.Context, types.CCIPProviderArgs, types.ExtraDataCodecRegistryService) (types.CCIPProvider, error)); ok { - return rf(_a0, _a1, _a2) + if rf, ok := ret.Get(0).(func(context.Context, types.CCIPProviderArgs) (types.CCIPProvider, error)); ok { + return rf(_a0, _a1) } - if rf, ok := ret.Get(0).(func(context.Context, types.CCIPProviderArgs, types.ExtraDataCodecRegistryService) types.CCIPProvider); ok { - r0 = rf(_a0, _a1, _a2) + if rf, ok := ret.Get(0).(func(context.Context, types.CCIPProviderArgs) types.CCIPProvider); ok { + r0 = rf(_a0, _a1) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(types.CCIPProvider) } } - if rf, ok := ret.Get(1).(func(context.Context, types.CCIPProviderArgs, types.ExtraDataCodecRegistryService) error); ok { - r1 = rf(_a0, _a1, _a2) + if rf, ok := ret.Get(1).(func(context.Context, types.CCIPProviderArgs) error); ok { + r1 = rf(_a0, _a1) } else { r1 = ret.Error(1) } @@ -498,14 +498,13 @@ type Relayer_NewCCIPProvider_Call struct { // NewCCIPProvider is a helper method to define mock.On call // - _a0 context.Context // - _a1 types.CCIPProviderArgs -// - _a2 types.ExtraDataCodecRegistryService -func (_e *Relayer_Expecter) NewCCIPProvider(_a0 interface{}, _a1 interface{}, _a2 interface{}) *Relayer_NewCCIPProvider_Call { - return &Relayer_NewCCIPProvider_Call{Call: _e.mock.On("NewCCIPProvider", _a0, _a1, _a2)} +func (_e *Relayer_Expecter) NewCCIPProvider(_a0 interface{}, _a1 interface{}) *Relayer_NewCCIPProvider_Call { + return &Relayer_NewCCIPProvider_Call{Call: _e.mock.On("NewCCIPProvider", _a0, _a1)} } -func (_c *Relayer_NewCCIPProvider_Call) Run(run func(_a0 context.Context, _a1 types.CCIPProviderArgs, _a2 types.ExtraDataCodecRegistryService)) *Relayer_NewCCIPProvider_Call { +func (_c *Relayer_NewCCIPProvider_Call) Run(run func(_a0 context.Context, _a1 types.CCIPProviderArgs)) *Relayer_NewCCIPProvider_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(types.CCIPProviderArgs), args[2].(types.ExtraDataCodecRegistryService)) + run(args[0].(context.Context), args[1].(types.CCIPProviderArgs)) }) return _c } @@ -515,7 +514,7 @@ func (_c *Relayer_NewCCIPProvider_Call) Return(_a0 types.CCIPProvider, _a1 error return _c } -func (_c *Relayer_NewCCIPProvider_Call) RunAndReturn(run func(context.Context, types.CCIPProviderArgs, types.ExtraDataCodecRegistryService) (types.CCIPProvider, error)) *Relayer_NewCCIPProvider_Call { +func (_c *Relayer_NewCCIPProvider_Call) RunAndReturn(run func(context.Context, types.CCIPProviderArgs) (types.CCIPProvider, error)) *Relayer_NewCCIPProvider_Call { _c.Call.Return(run) return _c } diff --git a/pkg/loop/relayer_service.go b/pkg/loop/relayer_service.go index 5f9bf1d5bb..ea99ddfc8b 100644 --- a/pkg/loop/relayer_service.go +++ b/pkg/loop/relayer_service.go @@ -22,13 +22,13 @@ type RelayerService struct { // NewRelayerService returns a new [*RelayerService]. // cmd must return a new exec.Cmd each time it is called. -func NewRelayerService(lggr logger.Logger, grpcOpts GRPCOpts, cmd func() *exec.Cmd, config string, keystore core.Keystore, csaKeystore core.Keystore, capabilityRegistry core.CapabilitiesRegistry) *RelayerService { +func NewRelayerService(lggr logger.Logger, grpcOpts GRPCOpts, cmd func() *exec.Cmd, config string, keystore core.Keystore, csaKeystore core.Keystore, capabilityRegistry core.CapabilitiesRegistry, edcrs types.ExtraDataCodecRegistryService) *RelayerService { newService := func(ctx context.Context, instance any) (Relayer, services.HealthReporter, error) { plug, ok := instance.(PluginRelayer) if !ok { return nil, nil, fmt.Errorf("expected PluginRelayer but got %T", instance) } - r, err := plug.NewRelayer(ctx, config, keystore, csaKeystore, capabilityRegistry) + r, err := plug.NewRelayer(ctx, config, keystore, csaKeystore, capabilityRegistry, edcrs) if err != nil { return nil, nil, fmt.Errorf("failed to create Relayer: %w", err) } @@ -85,11 +85,11 @@ func (r *RelayerService) NewLLOProvider(ctx context.Context, rargs types.RelayAr return r.Service.NewLLOProvider(ctx, rargs, pargs) } -func (r *RelayerService) NewCCIPProvider(ctx context.Context, cargs types.CCIPProviderArgs, edcs types.ExtraDataCodecRegistryService) (types.CCIPProvider, error) { +func (r *RelayerService) NewCCIPProvider(ctx context.Context, cargs types.CCIPProviderArgs) (types.CCIPProvider, error) { if err := r.WaitCtx(ctx); err != nil { return nil, err } - return r.Service.NewCCIPProvider(ctx, cargs, edcs) + return r.Service.NewCCIPProvider(ctx, cargs) } func (r *RelayerService) LatestHead(ctx context.Context) (types.Head, error) {