Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
270 changes: 242 additions & 28 deletions pkg/capabilities/registry/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ import (
"fmt"
"strings"
"sync"
"sync/atomic"

"github.com/Masterminds/semver/v3"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
Expand All @@ -18,8 +21,22 @@ var (
ErrCapabilityAlreadyExists = errors.New("capability already exists")
)

// atomicBaseCapability extends [capabilities.BaseCapability] to support atomic updates and forward client state checks.
type atomicBaseCapability interface {
capabilities.BaseCapability
Update(capabilities.BaseCapability) error
StateGetter
}

var _ StateGetter = (*grpc.ClientConn)(nil)

// StateGetter is implemented by GRPC client connections.
type StateGetter interface {
GetState() connectivity.State
}

type baseRegistry struct {
m map[string]capabilities.BaseCapability
m map[string]atomicBaseCapability
lggr logger.Logger
mu sync.RWMutex
}
Expand All @@ -28,7 +45,7 @@ var _ core.CapabilitiesRegistryBase = (*baseRegistry)(nil)

func NewBaseRegistry(lggr logger.Logger) core.CapabilitiesRegistryBase {
return &baseRegistry{
m: map[string]capabilities.BaseCapability{},
m: map[string]atomicBaseCapability{},
lggr: logger.Named(lggr, "registries.basic"),
}
}
Expand Down Expand Up @@ -142,46 +159,243 @@ func (r *baseRegistry) Add(ctx context.Context, c capabilities.BaseCapability) e
return err
}

switch info.CapabilityType {
case capabilities.CapabilityTypeTrigger:
_, ok := c.(capabilities.TriggerCapability)
if !ok {
return errors.New("trigger capability does not satisfy TriggerCapability interface")
id := info.ID
bc, ok := r.m[id]
if ok {
switch state := bc.GetState(); state {
case connectivity.Shutdown, connectivity.TransientFailure, connectivity.Idle:
// allow replace
Comment on lines +166 to +167
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to include Idle here, but it is apparently necessary.

default:
return fmt.Errorf("%w: id %s found in registry: state %s", ErrCapabilityAlreadyExists, id, state)
}
case capabilities.CapabilityTypeAction, capabilities.CapabilityTypeConsensus, capabilities.CapabilityTypeTarget:
_, ok := c.(capabilities.ExecutableCapability)
if !ok {
return errors.New("action does not satisfy ExecutableCapability interface")
if err := bc.Update(c); err != nil {
return fmt.Errorf("failed to update capability %s: %w", id, err)
}
case capabilities.CapabilityTypeCombined:
_, ok := c.(capabilities.ExecutableAndTriggerCapability)
if !ok {
return errors.New("target capability does not satisfy ExecutableAndTriggerCapability interface")
} else {
var ac atomicBaseCapability
switch info.CapabilityType {
case capabilities.CapabilityTypeTrigger:
ac = &atomicTriggerCapability{}
case capabilities.CapabilityTypeAction, capabilities.CapabilityTypeConsensus, capabilities.CapabilityTypeTarget:
ac = &atomicExecuteCapability{}
case capabilities.CapabilityTypeCombined:
ac = &atomicExecuteAndTriggerCapability{}
default:
return fmt.Errorf("unknown capability type: %s", info.CapabilityType)
}
default:
return fmt.Errorf("unknown capability type: %s", info.CapabilityType)
}

id := info.ID
_, ok := r.m[id]
if ok {
return fmt.Errorf("%w: id %s found in registry", ErrCapabilityAlreadyExists, id)
if err := ac.Update(c); err != nil {
return err
}
r.m[id] = ac
}

r.m[id] = c
r.lggr.Infow("capability added", "id", id, "type", info.CapabilityType, "description", info.Description, "version", info.Version())
return nil
}

func (r *baseRegistry) Remove(_ context.Context, id string) error {
r.mu.Lock()
defer r.mu.Unlock()
_, ok := r.m[id]
ac, ok := r.m[id]
if !ok {
return fmt.Errorf("unable to remove, capability not found: %s", id)
}

delete(r.m, id)
if err := ac.Update(nil); err != nil {
return fmt.Errorf("failed to remove capability %s: %w", id, err)
}
r.lggr.Infow("capability removed", "id", id)
return nil
}

var _ capabilities.TriggerCapability = &atomicTriggerCapability{}

type atomicTriggerCapability struct {
atomic.Pointer[capabilities.TriggerCapability]
}

func (a *atomicTriggerCapability) Update(c capabilities.BaseCapability) error {
if c == nil {
a.Store(nil)
return nil
}
tc, ok := c.(capabilities.TriggerCapability)
if !ok {
return errors.New("trigger capability does not satisfy TriggerCapability interface")
}
a.Store(&tc)
return nil
}

func (a *atomicTriggerCapability) Info(ctx context.Context) (capabilities.CapabilityInfo, error) {
c := a.Load()
if c == nil {
return capabilities.CapabilityInfo{}, errors.New("capability unavailable")
}
return (*c).Info(ctx)
}

func (a *atomicTriggerCapability) GetState() connectivity.State {
c := a.Load()
if c == nil {
return connectivity.Shutdown
}
if sg, ok := (*c).(StateGetter); ok {
Copy link
Copy Markdown
Contributor

@krehermann krehermann Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need the type cast check here but not on 252, 260?

return sg.GetState()
}
return connectivity.State(-1) // unknown
}

func (a *atomicTriggerCapability) RegisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) {
c := a.Load()
if c == nil {
return nil, errors.New("capability unavailable")
}
return (*c).RegisterTrigger(ctx, request)
}

func (a *atomicTriggerCapability) UnregisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) error {
c := a.Load()
if c == nil {
return errors.New("capability unavailable")
}
return (*c).UnregisterTrigger(ctx, request)
}

var _ capabilities.ExecutableCapability = &atomicExecuteCapability{}

type atomicExecuteCapability struct {
atomic.Pointer[capabilities.ExecutableCapability]
}

func (a *atomicExecuteCapability) Update(c capabilities.BaseCapability) error {
if c == nil {
a.Store(nil)
return nil
}
tc, ok := c.(capabilities.ExecutableCapability)
if !ok {
return errors.New("action does not satisfy ExecutableCapability interface")
}
a.Store(&tc)
return nil
}

func (a *atomicExecuteCapability) Info(ctx context.Context) (capabilities.CapabilityInfo, error) {
c := a.Load()
if c == nil {
return capabilities.CapabilityInfo{}, errors.New("capability unavailable")
}
return (*c).Info(ctx)
}

func (a *atomicExecuteCapability) GetState() connectivity.State {
c := a.Load()
if c == nil {
return connectivity.Shutdown
}
if sg, ok := (*c).(StateGetter); ok {
return sg.GetState()
}
return connectivity.State(-1) // unknown
}

func (a *atomicExecuteCapability) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error {
c := a.Load()
if c == nil {
return errors.New("capability unavailable")
}
return (*c).RegisterToWorkflow(ctx, request)
}

func (a *atomicExecuteCapability) UnregisterFromWorkflow(ctx context.Context, request capabilities.UnregisterFromWorkflowRequest) error {
c := a.Load()
if c == nil {
return errors.New("capability unavailable")
}
return (*c).UnregisterFromWorkflow(ctx, request)
}

func (a *atomicExecuteCapability) Execute(ctx context.Context, request capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) {
c := a.Load()
if c == nil {
return capabilities.CapabilityResponse{}, errors.New("capability unavailable")
}
return (*c).Execute(ctx, request)
}

var _ capabilities.ExecutableAndTriggerCapability = &atomicExecuteAndTriggerCapability{}

type atomicExecuteAndTriggerCapability struct {
atomic.Pointer[capabilities.ExecutableAndTriggerCapability]
}

func (a *atomicExecuteAndTriggerCapability) Update(c capabilities.BaseCapability) error {
if c == nil {
a.Store(nil)
return nil
}
tc, ok := c.(capabilities.ExecutableAndTriggerCapability)
if !ok {
return errors.New("target capability does not satisfy ExecutableAndTriggerCapability interface")
}
a.Store(&tc)
return nil
}

func (a *atomicExecuteAndTriggerCapability) Info(ctx context.Context) (capabilities.CapabilityInfo, error) {
c := a.Load()
if c == nil {
return capabilities.CapabilityInfo{}, errors.New("capability unavailable")
}
return (*c).Info(ctx)
}

func (a *atomicExecuteAndTriggerCapability) GetState() connectivity.State {
c := a.Load()
if c == nil {
return connectivity.Shutdown
}
if sg, ok := (*c).(StateGetter); ok {
return sg.GetState()
}
return connectivity.State(-1) // unknown
}

func (a *atomicExecuteAndTriggerCapability) RegisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) {
c := a.Load()
if c == nil {
return nil, errors.New("capability unavailable")
}
return (*c).RegisterTrigger(ctx, request)
}

func (a *atomicExecuteAndTriggerCapability) UnregisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) error {
c := a.Load()
if c == nil {
return errors.New("capability unavailable")
}
return (*c).UnregisterTrigger(ctx, request)
}

func (a *atomicExecuteAndTriggerCapability) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error {
c := a.Load()
if c == nil {
return errors.New("capability unavailable")
}
return (*c).RegisterToWorkflow(ctx, request)
}

func (a *atomicExecuteAndTriggerCapability) UnregisterFromWorkflow(ctx context.Context, request capabilities.UnregisterFromWorkflowRequest) error {
c := a.Load()
if c == nil {
return errors.New("capability unavailable")
}
return (*c).UnregisterFromWorkflow(ctx, request)
}

func (a *atomicExecuteAndTriggerCapability) Execute(ctx context.Context, request capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) {
c := a.Load()
if c == nil {
return capabilities.CapabilityResponse{}, errors.New("capability unavailable")
}
return (*c).Execute(ctx, request)
}
8 changes: 6 additions & 2 deletions pkg/capabilities/registry/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,17 @@ func TestRegistry(t *testing.T) {

gc, err := r.Get(ctx, id)
require.NoError(t, err)
info, err := gc.Info(t.Context())
require.NoError(t, err)

assert.Equal(t, c, gc)
assert.Equal(t, c.CapabilityInfo, info)

cs, err := r.List(ctx)
require.NoError(t, err)
assert.Len(t, cs, 1)
assert.Equal(t, c, cs[0])
info, err = cs[0].Info(t.Context())
require.NoError(t, err)
assert.Equal(t, c.CapabilityInfo, info)
}

func TestRegistryCompatibleVersions(t *testing.T) {
Expand Down
15 changes: 10 additions & 5 deletions pkg/loop/internal/core/services/capability/capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"

"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/protobuf/types/known/emptypb"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
Expand All @@ -22,7 +23,7 @@ type TriggerCapabilityClient struct {
*baseCapabilityClient
}

func NewTriggerCapabilityClient(brokerExt *net.BrokerExt, conn grpc.ClientConnInterface) capabilities.TriggerCapability {
func NewTriggerCapabilityClient(brokerExt *net.BrokerExt, conn net.ClientConnInterface) capabilities.TriggerCapability {
return &TriggerCapabilityClient{
triggerExecutableClient: newTriggerExecutableClient(brokerExt, conn),
baseCapabilityClient: newBaseCapabilityClient(brokerExt, conn),
Expand All @@ -39,7 +40,7 @@ type ExecutableCapability interface {
capabilities.BaseCapability
}

func NewExecutableCapabilityClient(brokerExt *net.BrokerExt, conn grpc.ClientConnInterface) ExecutableCapability {
func NewExecutableCapabilityClient(brokerExt *net.BrokerExt, conn net.ClientConnInterface) ExecutableCapability {
return &ExecutableCapabilityClient{
executableClient: newExecutableClient(brokerExt, conn),
baseCapabilityClient: newBaseCapabilityClient(brokerExt, conn),
Expand All @@ -52,7 +53,7 @@ type CombinedCapabilityClient struct {
*triggerExecutableClient
}

func NewCombinedCapabilityClient(brokerExt *net.BrokerExt, conn grpc.ClientConnInterface) ExecutableCapability {
func NewCombinedCapabilityClient(brokerExt *net.BrokerExt, conn net.ClientConnInterface) ExecutableCapability {
return &CombinedCapabilityClient{
executableClient: newExecutableClient(brokerExt, conn),
baseCapabilityClient: newBaseCapabilityClient(brokerExt, conn),
Expand Down Expand Up @@ -135,14 +136,18 @@ func InfoToReply(info capabilities.CapabilityInfo) *capabilitiespb.CapabilityInf
}

type baseCapabilityClient struct {
c net.ClientConnInterface
grpc capabilitiespb.BaseCapabilityClient
*net.BrokerExt
}

var _ capabilities.BaseCapability = (*baseCapabilityClient)(nil)

func newBaseCapabilityClient(brokerExt *net.BrokerExt, conn grpc.ClientConnInterface) *baseCapabilityClient {
return &baseCapabilityClient{grpc: capabilitiespb.NewBaseCapabilityClient(conn), BrokerExt: brokerExt}
func newBaseCapabilityClient(brokerExt *net.BrokerExt, conn net.ClientConnInterface) *baseCapabilityClient {
return &baseCapabilityClient{c: conn, grpc: capabilitiespb.NewBaseCapabilityClient(conn), BrokerExt: brokerExt}
}
func (c *baseCapabilityClient) GetState() connectivity.State {
return c.c.GetState()
}

func (c *baseCapabilityClient) Info(ctx context.Context) (capabilities.CapabilityInfo, error) {
Expand Down
Loading
Loading