Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion pkg/loop/internal/example-relay/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (p *pluginRelayer) HealthReport() map[string]error { return map[string]erro

func (p *pluginRelayer) Name() string { return p.lggr.Name() }

func (p *pluginRelayer) NewRelayer(ctx context.Context, config string, keystore, csaKeystore core.Keystore, cr core.CapabilitiesRegistry) (loop.Relayer, error) {
func (p *pluginRelayer) NewRelayer(ctx context.Context, config string, keystore, csaKeystore core.Keystore, cr core.CapabilitiesRegistry, edcrs types.ExtraDataCodecRegistryService) (loop.Relayer, error) {
return &relayer{lggr: logger.Named(p.lggr, "Relayer"), ds: p.ds}, nil
}

Expand Down Expand Up @@ -142,3 +142,7 @@ func (r *relayer) NewPluginProvider(ctx context.Context, args types.RelayArgs, a
func (r *relayer) NewLLOProvider(ctx context.Context, args types.RelayArgs, args2 types.PluginArgs) (types.LLOProvider, error) {
return nil, errors.New("unimplemented")
}

func (r *relayer) NewCCIPProvider(ctx context.Context, args types.CCIPProviderArgs) (types.CCIPProvider, error) {
return nil, errors.New("unimplemented")
}
1 change: 1 addition & 0 deletions pkg/loop/internal/pb/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@
//go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative pipeline_runner.proto
//go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative keyvalue_store.proto
//go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative validate_config.proto
//go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative extra_data_codec_registry.proto
package pb
27 changes: 18 additions & 9 deletions pkg/loop/internal/pb/relayer.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/loop/internal/pb/relayer.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ message NewRelayerRequest {
uint32 keystoreID = 2;
uint32 capabilityRegistryID = 3;
uint32 keystoreCSAID = 4;
uint32 extraDataCodecRegistryServiceID = 5;
}

message NewRelayerReply {
Expand Down
57 changes: 48 additions & 9 deletions pkg/loop/internal/relayer/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
tonpb "github.com/smartcontractkit/chainlink-common/pkg/chains/ton"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/loop/internal/core/services/capability"
"github.com/smartcontractkit/chainlink-common/pkg/loop/internal/core/services/codec"

Check failure on line 20 in pkg/loop/internal/relayer/relayer.go

View workflow job for this annotation

GitHub Actions / build-test

no required module provides package github.com/smartcontractkit/chainlink-common/pkg/loop/internal/core/services/codec; to add it:

Check failure on line 20 in pkg/loop/internal/relayer/relayer.go

View workflow job for this annotation

GitHub Actions / build-race-tests

no required module provides package github.com/smartcontractkit/chainlink-common/pkg/loop/internal/core/services/codec; to add it:

Check failure on line 20 in pkg/loop/internal/relayer/relayer.go

View workflow job for this annotation

GitHub Actions / benchmark

no required module provides package github.com/smartcontractkit/chainlink-common/pkg/loop/internal/core/services/codec; to add it:

Check failure on line 20 in pkg/loop/internal/relayer/relayer.go

View workflow job for this annotation

GitHub Actions / benchmark

no required module provides package github.com/smartcontractkit/chainlink-common/pkg/loop/internal/core/services/codec; to add it:

Check failure on line 20 in pkg/loop/internal/relayer/relayer.go

View workflow job for this annotation

GitHub Actions / benchmark

no required module provides package github.com/smartcontractkit/chainlink-common/pkg/loop/internal/core/services/codec; to add it:

Check failure on line 20 in pkg/loop/internal/relayer/relayer.go

View workflow job for this annotation

GitHub Actions / benchmark

no required module provides package github.com/smartcontractkit/chainlink-common/pkg/loop/internal/core/services/codec; to add it:

Check failure on line 20 in pkg/loop/internal/relayer/relayer.go

View workflow job for this annotation

GitHub Actions / benchmark

no required module provides package github.com/smartcontractkit/chainlink-common/pkg/loop/internal/core/services/codec; to add it:

Check failure on line 20 in pkg/loop/internal/relayer/relayer.go

View workflow job for this annotation

GitHub Actions / benchmark

no required module provides package github.com/smartcontractkit/chainlink-common/pkg/loop/internal/core/services/codec; to add it:

Check failure on line 20 in pkg/loop/internal/relayer/relayer.go

View workflow job for this annotation

GitHub Actions / benchmark

no required module provides package github.com/smartcontractkit/chainlink-common/pkg/loop/internal/core/services/codec; to add it:

Check failure on line 20 in pkg/loop/internal/relayer/relayer.go

View workflow job for this annotation

GitHub Actions / benchmark

no required module provides package github.com/smartcontractkit/chainlink-common/pkg/loop/internal/core/services/codec; to add it:

Check failure on line 20 in pkg/loop/internal/relayer/relayer.go

View workflow job for this annotation

GitHub Actions / benchmark

no required module provides package github.com/smartcontractkit/chainlink-common/pkg/loop/internal/core/services/codec; to add it:

Check failure on line 20 in pkg/loop/internal/relayer/relayer.go

View workflow job for this annotation

GitHub Actions / benchmark

no required module provides package github.com/smartcontractkit/chainlink-common/pkg/loop/internal/core/services/codec; to add it:
ks "github.com/smartcontractkit/chainlink-common/pkg/loop/internal/core/services/keystore"
"github.com/smartcontractkit/chainlink-common/pkg/loop/internal/goplugin"
"github.com/smartcontractkit/chainlink-common/pkg/loop/internal/net"
Expand Down Expand Up @@ -49,7 +50,7 @@
return &PluginRelayerClient{PluginClient: pc, pluginRelayer: pb.NewPluginRelayerClient(pc), ServiceClient: goplugin.NewServiceClient(pc.BrokerExt, pc)}
}

func (p *PluginRelayerClient) NewRelayer(ctx context.Context, config string, keystore, csaKeystore core.Keystore, capabilityRegistry core.CapabilitiesRegistry) (looptypes.Relayer, error) {
func (p *PluginRelayerClient) NewRelayer(ctx context.Context, config string, keystore, csaKeystore core.Keystore, capabilityRegistry core.CapabilitiesRegistry, edcrs types.ExtraDataCodecRegistryService) (looptypes.Relayer, error) {
cc := p.NewClientConn("Relayer", func(ctx context.Context) (relayerID uint32, deps net.Resources, err error) {
var ksRes net.Resource
ksID, ksRes, err := p.ServeNew("Keystore", func(s *grpc.Server) {
Expand Down Expand Up @@ -77,11 +78,24 @@
}
deps.Add(capabilityRegistryResource)

var edcrsID uint32
if edcrs != nil {
var edcrsRes net.Resource
edcrsID, edcrsRes, err = p.ServeNew("ExtraDataCodecRegistryService", func(s *grpc.Server) {
pb.RegisterExtraDataCodecRegistryServiceServer(s, codec.NewExtraDataCodecRegistryServer(p.BrokerExt, edcrs))
})
if err != nil {
return 0, nil, fmt.Errorf("failed to serve new extra data codec registry service: %w", err)
}
deps.Add(edcrsRes)
}

reply, err := p.pluginRelayer.NewRelayer(ctx, &pb.NewRelayerRequest{
Config: config,
KeystoreID: ksID,
KeystoreCSAID: ksCSAID,
CapabilityRegistryID: capabilityRegistryID,
Config: config,
KeystoreID: ksID,
KeystoreCSAID: ksCSAID,
CapabilityRegistryID: capabilityRegistryID,
ExtraDataCodecRegistryServiceID: edcrsID,
})
if err != nil {
return 0, nil, fmt.Errorf("Failed to create relayer client: failed request: %w", err)
Expand Down Expand Up @@ -132,19 +146,44 @@
crRes := net.Resource{Closer: capRegistryConn, Name: "CapabilityRegistry"}
capRegistry := capability.NewCapabilitiesRegistryClient(capRegistryConn, p.BrokerExt)

r, err := p.impl.NewRelayer(ctx, request.Config, ks.NewClient(ksConn), ks.NewClient(ksCSAConn), capRegistry)
var edcrs types.ExtraDataCodecRegistryService
var edcrsRes net.Resource
if request.ExtraDataCodecRegistryServiceID != 0 {
edcrsConn, err := p.Dial(request.ExtraDataCodecRegistryServiceID)
if err != nil {
p.CloseAll(ksRes, ksCSARes, crRes)
return nil, net.ErrConnDial{Name: "ExtraDataCodecRegistryService", ID: request.ExtraDataCodecRegistryServiceID, Err: err}
}
edcrsRes = net.Resource{Closer: edcrsConn, Name: "ExtraDataCodecRegistryService"}
edcrs = codec.NewExtraDataCodecRegistryClient(edcrsConn, p.BrokerExt)
}

r, err := p.impl.NewRelayer(ctx, request.Config, ks.NewClient(ksConn), ks.NewClient(ksCSAConn), capRegistry, edcrs)
if err != nil {
p.CloseAll(ksRes, ksCSARes, crRes)
if edcrsRes.Name != "" {
p.CloseAll(ksRes, ksCSARes, crRes, edcrsRes)
} else {
p.CloseAll(ksRes, ksCSARes, crRes)
}
return nil, err
}
err = r.Start(ctx)
if err != nil {
p.CloseAll(ksRes, ksCSARes, crRes)
if edcrsRes.Name != "" {
p.CloseAll(ksRes, ksCSARes, crRes, edcrsRes)
} else {
p.CloseAll(ksRes, ksCSARes, crRes)
}
return nil, err
}

const name = "Relayer"
rRes := net.Resource{Closer: r, Name: name}
var resources []net.Resource
resources = append(resources, rRes, ksRes, ksCSARes, crRes)
if edcrsRes.Name != "" {
resources = append(resources, edcrsRes)
}
id, _, err := p.ServeNew(name, func(s *grpc.Server) {
pb.RegisterServiceServer(s, &goplugin.ServiceServer{Srv: r})
pb.RegisterRelayerServer(s, newChainRelayerServer(r, p.BrokerExt))
Expand All @@ -154,7 +193,7 @@
if tonService, ok := r.(types.TONService); ok {
tonpb.RegisterTONServer(s, newTONServer(tonService, p.BrokerExt))
}
}, rRes, ksRes, ksCSARes, crRes)
}, resources...)
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/loop/internal/relayer/test/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (s staticPluginRelayer) HealthReport() map[string]error {
return hp
}

func (s staticPluginRelayer) NewRelayer(ctx context.Context, config string, keystore, csaKeystore core.Keystore, capabilityRegistry core.CapabilitiesRegistry) (looptypes.Relayer, error) {
func (s staticPluginRelayer) NewRelayer(ctx context.Context, config string, keystore, csaKeystore core.Keystore, capabilityRegistry core.CapabilitiesRegistry, edcrs types.ExtraDataCodecRegistryService) (looptypes.Relayer, error) {
if s.relayer.StaticChecks && config != ConfigTOML {
return nil, fmt.Errorf("expected config %q but got %q", ConfigTOML, config)
}
Expand Down Expand Up @@ -495,7 +495,7 @@ func newRelayArgsWithProviderType(_type types.OCR2PluginType) types.RelayArgs {
func RunPlugin(t *testing.T, p looptypes.PluginRelayer) {
t.Run("Relayer", func(t *testing.T) {
ctx := t.Context()
relayer, err := p.NewRelayer(ctx, ConfigTOML, keystoretest.Keystore, keystoretest.Keystore, nil)
relayer, err := p.NewRelayer(ctx, ConfigTOML, keystoretest.Keystore, keystoretest.Keystore, nil, nil)
require.NoError(t, err)
servicetest.Run(t, relayer)
Run(t, relayer)
Expand Down Expand Up @@ -542,7 +542,7 @@ func RunFuzzPluginRelayer(f *testing.F, relayerFunc func(*testing.T) looptypes.P
}

ctx := t.Context()
_, err := relayerFunc(t).NewRelayer(ctx, fConfig, keystore, keystore, nil)
_, err := relayerFunc(t).NewRelayer(ctx, fConfig, keystore, keystore, nil, nil)

grpcUnavailableErr(t, err)
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/loop/internal/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

type PluginRelayer interface {
services.Service
NewRelayer(ctx context.Context, config string, keystore, csaKeystore core.Keystore, capabilityRegistry core.CapabilitiesRegistry) (Relayer, error)
NewRelayer(ctx context.Context, config string, keystore, csaKeystore core.Keystore, capabilityRegistry core.CapabilitiesRegistry, edcrs types.ExtraDataCodecRegistryService) (Relayer, error)
}

type MedianProvider interface {
Expand Down
4 changes: 2 additions & 2 deletions pkg/loop/relayer_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ type RelayerService struct {

// NewRelayerService returns a new [*RelayerService].
// cmd must return a new exec.Cmd each time it is called.
func NewRelayerService(lggr logger.Logger, grpcOpts GRPCOpts, cmd func() *exec.Cmd, config string, keystore core.Keystore, csaKeystore core.Keystore, capabilityRegistry core.CapabilitiesRegistry) *RelayerService {
func NewRelayerService(lggr logger.Logger, grpcOpts GRPCOpts, cmd func() *exec.Cmd, config string, keystore core.Keystore, csaKeystore core.Keystore, capabilityRegistry core.CapabilitiesRegistry, edcrs types.ExtraDataCodecRegistryService) *RelayerService {
newService := func(ctx context.Context, instance any) (Relayer, services.HealthReporter, error) {
plug, ok := instance.(PluginRelayer)
if !ok {
return nil, nil, fmt.Errorf("expected PluginRelayer but got %T", instance)
}
r, err := plug.NewRelayer(ctx, config, keystore, csaKeystore, capabilityRegistry)
r, err := plug.NewRelayer(ctx, config, keystore, csaKeystore, capabilityRegistry, edcrs)
if err != nil {
return nil, nil, fmt.Errorf("failed to create Relayer: %w", err)
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/types/provider_ccip.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type CCIPFactoryGenerator interface {
CCIPExecutionFactoryGenerator
}

// CCIPProvider is a product-specific interface that exposes the necessary components
// for running CCIP on a specific chain/chain family. It includes access to the ChainAccessor,
type CCIPProvider interface {
services.Service
ChainAccessor() ccipocr3.ChainAccessor
Expand All @@ -65,3 +67,17 @@ type CCIPProviderArgs struct {
OffRampAddress string
PluginType uint32
}

// ExtraDataCodecRegistryService maintains a registry of SourceChainExtraDataCodec instances by chain family.
// It implements the Service interface and manages the lifecycle of codec registrations.
type ExtraDataCodecRegistryService interface {
services.Service
// RegisterChainFamily pre-registers a chain family that will be initialized later.
RegisterChainFamily(chainFamily string)
// SetSourceChainCodec registers a SourceChainExtraDataCodec for the given chain family
// and marks it as initialized.
SetSourceChainCodec(chainFamily string, codec ccipocr3.SourceChainExtraDataCodec)
// GetExtraDataCodec returns the complete ExtraDataCodec map.
// This includes all registered chain families, with no-op codecs for uninitialized ones.
GetExtraDataCodec() (ccipocr3.ExtraDataCodec, error)
}
4 changes: 2 additions & 2 deletions pkg/types/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ type Relayer interface {

NewOCR3CapabilityProvider(ctx context.Context, rargs RelayArgs, pargs PluginArgs) (OCR3CapabilityProvider, error)

NewCCIPProvider(ctx context.Context, cargs CCIPProviderArgs) (CCIPProvider, error)
NewCCIPProvider(ctx context.Context, cargs CCIPProviderArgs, edcs ExtraDataCodecRegistryService) (CCIPProvider, error)
}

var _ Relayer = &UnimplementedRelayer{}
Expand Down Expand Up @@ -366,6 +366,6 @@ func (u *UnimplementedRelayer) NewOCR3CapabilityProvider(ctx context.Context, ra
return nil, status.Errorf(codes.Unimplemented, "method NewOCR3CapabilityProvider not implemented")
}

func (u *UnimplementedRelayer) NewCCIPProvider(ctx context.Context, cargs CCIPProviderArgs) (CCIPProvider, error) {
func (u *UnimplementedRelayer) NewCCIPProvider(ctx context.Context, cargs CCIPProviderArgs, edcs ExtraDataCodecRegistryService) (CCIPProvider, error) {
return nil, status.Errorf(codes.Unimplemented, "method NewCCIPProvider not implemented")
}
Loading