Skip to content

Commit 298c913

Browse files
committed
pkg/capabilities: support replacing registered capabilities after shutdown
1 parent 8f2c438 commit 298c913

7 files changed

Lines changed: 273 additions & 31 deletions

File tree

pkg/capabilities/capabilities.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
p2ptypes "github.com/smartcontractkit/libocr/ragep2p/types"
12+
"google.golang.org/grpc/connectivity"
1213
"google.golang.org/protobuf/proto"
1314
"google.golang.org/protobuf/types/known/anypb"
1415

@@ -397,6 +398,11 @@ type CapabilityInfo struct {
397398
SpendTypes []CapabilitySpendType
398399
}
399400

401+
// GetState is included to implement BaseCapability.
402+
func (c CapabilityInfo) GetState() connectivity.State {
403+
return connectivity.Idle
404+
}
405+
400406
// Parse out the version from the ID.
401407
func (c CapabilityInfo) Version() string {
402408
return c.ID[strings.Index(c.ID, "@")+1:]

pkg/capabilities/registry/base.go

Lines changed: 237 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,11 @@ import (
66
"fmt"
77
"strings"
88
"sync"
9+
"sync/atomic"
910

1011
"github.com/Masterminds/semver/v3"
12+
"google.golang.org/grpc"
13+
"google.golang.org/grpc/connectivity"
1114

1215
"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
1316
"github.com/smartcontractkit/chainlink-common/pkg/logger"
@@ -18,8 +21,20 @@ var (
1821
ErrCapabilityAlreadyExists = errors.New("capability already exists")
1922
)
2023

24+
type atomicBaseCapability interface {
25+
capabilities.BaseCapability
26+
Update(capabilities.BaseCapability) error
27+
StateGetter
28+
}
29+
30+
var _ StateGetter = (*grpc.ClientConn)(nil)
31+
32+
type StateGetter interface {
33+
GetState() connectivity.State
34+
}
35+
2136
type baseRegistry struct {
22-
m map[string]capabilities.BaseCapability
37+
m map[string]atomicBaseCapability
2338
lggr logger.Logger
2439
mu sync.RWMutex
2540
}
@@ -28,7 +43,7 @@ var _ core.CapabilitiesRegistryBase = (*baseRegistry)(nil)
2843

2944
func NewBaseRegistry(lggr logger.Logger) core.CapabilitiesRegistryBase {
3045
return &baseRegistry{
31-
m: map[string]capabilities.BaseCapability{},
46+
m: map[string]atomicBaseCapability{},
3247
lggr: logger.Named(lggr, "registries.basic"),
3348
}
3449
}
@@ -142,46 +157,240 @@ func (r *baseRegistry) Add(ctx context.Context, c capabilities.BaseCapability) e
142157
return err
143158
}
144159

145-
switch info.CapabilityType {
146-
case capabilities.CapabilityTypeTrigger:
147-
_, ok := c.(capabilities.TriggerCapability)
148-
if !ok {
149-
return errors.New("trigger capability does not satisfy TriggerCapability interface")
160+
id := info.ID
161+
bc, ok := r.m[id]
162+
if ok {
163+
if bc.GetState() != connectivity.Shutdown {
164+
return fmt.Errorf("%w: id %s found in registry", ErrCapabilityAlreadyExists, id)
150165
}
151-
case capabilities.CapabilityTypeAction, capabilities.CapabilityTypeConsensus, capabilities.CapabilityTypeTarget:
152-
_, ok := c.(capabilities.ExecutableCapability)
153-
if !ok {
154-
return errors.New("action does not satisfy ExecutableCapability interface")
166+
if err := bc.Update(c); err != nil {
167+
return fmt.Errorf("failed to update capability %s: %w", id, err)
155168
}
156-
case capabilities.CapabilityTypeCombined:
157-
_, ok := c.(capabilities.ExecutableAndTriggerCapability)
158-
if !ok {
159-
return errors.New("target capability does not satisfy ExecutableAndTriggerCapability interface")
169+
} else {
170+
var ac atomicBaseCapability
171+
switch info.CapabilityType {
172+
case capabilities.CapabilityTypeTrigger:
173+
ac = &atomicTriggerCapability{}
174+
case capabilities.CapabilityTypeAction, capabilities.CapabilityTypeConsensus, capabilities.CapabilityTypeTarget:
175+
ac = &atomicExecuteCapability{}
176+
case capabilities.CapabilityTypeCombined:
177+
ac = &atomicExecuteAndTriggerCapability{}
178+
default:
179+
return fmt.Errorf("unknown capability type: %s", info.CapabilityType)
160180
}
161-
default:
162-
return fmt.Errorf("unknown capability type: %s", info.CapabilityType)
163-
}
164-
165-
id := info.ID
166-
_, ok := r.m[id]
167-
if ok {
168-
return fmt.Errorf("%w: id %s found in registry", ErrCapabilityAlreadyExists, id)
181+
if err := ac.Update(c); err != nil {
182+
return err
183+
}
184+
r.m[id] = ac
169185
}
170-
171-
r.m[id] = c
172186
r.lggr.Infow("capability added", "id", id, "type", info.CapabilityType, "description", info.Description, "version", info.Version())
173187
return nil
174188
}
175189

176190
func (r *baseRegistry) Remove(_ context.Context, id string) error {
177191
r.mu.Lock()
178192
defer r.mu.Unlock()
179-
_, ok := r.m[id]
193+
ac, ok := r.m[id]
180194
if !ok {
181195
return fmt.Errorf("unable to remove, capability not found: %s", id)
182196
}
183-
184-
delete(r.m, id)
197+
if err := ac.Update(nil); err != nil {
198+
return fmt.Errorf("failed to remove capability %s: %w", id, err)
199+
}
185200
r.lggr.Infow("capability removed", "id", id)
186201
return nil
187202
}
203+
204+
var _ capabilities.TriggerCapability = &atomicTriggerCapability{}
205+
206+
type atomicTriggerCapability struct {
207+
atomic.Pointer[capabilities.TriggerCapability]
208+
}
209+
210+
func (a *atomicTriggerCapability) Update(c capabilities.BaseCapability) error {
211+
if c == nil {
212+
a.Store(nil)
213+
return nil
214+
}
215+
tc, ok := c.(capabilities.TriggerCapability)
216+
if !ok {
217+
return errors.New("trigger capability does not satisfy TriggerCapability interface")
218+
}
219+
a.Store(&tc)
220+
return nil
221+
}
222+
223+
func (a *atomicTriggerCapability) Info(ctx context.Context) (capabilities.CapabilityInfo, error) {
224+
c := a.Load()
225+
if c == nil {
226+
return capabilities.CapabilityInfo{}, errors.New("capability unavailable")
227+
}
228+
return (*c).Info(ctx)
229+
}
230+
231+
func (a *atomicTriggerCapability) GetState() connectivity.State {
232+
c := a.Load()
233+
if c == nil {
234+
return connectivity.Shutdown
235+
}
236+
if sg, ok := (*c).(StateGetter); ok {
237+
return sg.GetState()
238+
}
239+
return connectivity.State(-1) // unknown
240+
}
241+
242+
func (a *atomicTriggerCapability) RegisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) {
243+
c := a.Load()
244+
if c == nil {
245+
return nil, errors.New("capability unavailable")
246+
}
247+
return (*c).RegisterTrigger(ctx, request)
248+
}
249+
250+
func (a *atomicTriggerCapability) UnregisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) error {
251+
c := a.Load()
252+
if c == nil {
253+
return errors.New("capability unavailable")
254+
}
255+
return (*c).UnregisterTrigger(ctx, request)
256+
}
257+
258+
var _ capabilities.ExecutableCapability = &atomicExecuteCapability{}
259+
260+
type atomicExecuteCapability struct {
261+
atomic.Pointer[capabilities.ExecutableCapability]
262+
}
263+
264+
func (a *atomicExecuteCapability) Update(c capabilities.BaseCapability) error {
265+
if c == nil {
266+
a.Store(nil)
267+
return nil
268+
}
269+
tc, ok := c.(capabilities.ExecutableCapability)
270+
if !ok {
271+
return errors.New("action does not satisfy ExecutableCapability interface")
272+
}
273+
a.Store(&tc)
274+
return nil
275+
}
276+
277+
func (a *atomicExecuteCapability) Info(ctx context.Context) (capabilities.CapabilityInfo, error) {
278+
c := a.Load()
279+
if c == nil {
280+
return capabilities.CapabilityInfo{}, errors.New("capability unavailable")
281+
}
282+
return (*c).Info(ctx)
283+
}
284+
285+
func (a *atomicExecuteCapability) GetState() connectivity.State {
286+
c := a.Load()
287+
if c == nil {
288+
return connectivity.Shutdown
289+
}
290+
if sg, ok := (*c).(StateGetter); ok {
291+
return sg.GetState()
292+
}
293+
return connectivity.State(-1) // unknown
294+
}
295+
296+
func (a *atomicExecuteCapability) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error {
297+
c := a.Load()
298+
if c == nil {
299+
return errors.New("capability unavailable")
300+
}
301+
return (*c).RegisterToWorkflow(ctx, request)
302+
}
303+
304+
func (a *atomicExecuteCapability) UnregisterFromWorkflow(ctx context.Context, request capabilities.UnregisterFromWorkflowRequest) error {
305+
c := a.Load()
306+
if c == nil {
307+
return errors.New("capability unavailable")
308+
}
309+
return (*c).UnregisterFromWorkflow(ctx, request)
310+
}
311+
312+
func (a *atomicExecuteCapability) Execute(ctx context.Context, request capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) {
313+
c := a.Load()
314+
if c == nil {
315+
return capabilities.CapabilityResponse{}, errors.New("capability unavailable")
316+
}
317+
return (*c).Execute(ctx, request)
318+
}
319+
320+
var _ capabilities.ExecutableAndTriggerCapability = &atomicExecuteAndTriggerCapability{}
321+
322+
type atomicExecuteAndTriggerCapability struct {
323+
atomic.Pointer[capabilities.ExecutableAndTriggerCapability]
324+
}
325+
326+
func (a *atomicExecuteAndTriggerCapability) Update(c capabilities.BaseCapability) error {
327+
if c == nil {
328+
a.Store(nil)
329+
return nil
330+
}
331+
tc, ok := c.(capabilities.ExecutableAndTriggerCapability)
332+
if !ok {
333+
return errors.New("target capability does not satisfy ExecutableAndTriggerCapability interface")
334+
}
335+
a.Store(&tc)
336+
return nil
337+
}
338+
339+
func (a *atomicExecuteAndTriggerCapability) Info(ctx context.Context) (capabilities.CapabilityInfo, error) {
340+
c := a.Load()
341+
if c == nil {
342+
return capabilities.CapabilityInfo{}, errors.New("capability unavailable")
343+
}
344+
return (*c).Info(ctx)
345+
}
346+
347+
func (a *atomicExecuteAndTriggerCapability) GetState() connectivity.State {
348+
c := a.Load()
349+
if c == nil {
350+
return connectivity.Shutdown
351+
}
352+
if sg, ok := (*c).(StateGetter); ok {
353+
return sg.GetState()
354+
}
355+
return connectivity.State(-1) // unknown
356+
}
357+
358+
func (a *atomicExecuteAndTriggerCapability) RegisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) {
359+
c := a.Load()
360+
if c == nil {
361+
return nil, errors.New("capability unavailable")
362+
}
363+
return (*c).RegisterTrigger(ctx, request)
364+
}
365+
366+
func (a *atomicExecuteAndTriggerCapability) UnregisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) error {
367+
c := a.Load()
368+
if c == nil {
369+
return errors.New("capability unavailable")
370+
}
371+
return (*c).UnregisterTrigger(ctx, request)
372+
}
373+
374+
func (a *atomicExecuteAndTriggerCapability) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error {
375+
c := a.Load()
376+
if c == nil {
377+
return errors.New("capability unavailable")
378+
}
379+
return (*c).RegisterToWorkflow(ctx, request)
380+
}
381+
382+
func (a *atomicExecuteAndTriggerCapability) UnregisterFromWorkflow(ctx context.Context, request capabilities.UnregisterFromWorkflowRequest) error {
383+
c := a.Load()
384+
if c == nil {
385+
return errors.New("capability unavailable")
386+
}
387+
return (*c).UnregisterFromWorkflow(ctx, request)
388+
}
389+
390+
func (a *atomicExecuteAndTriggerCapability) Execute(ctx context.Context, request capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) {
391+
c := a.Load()
392+
if c == nil {
393+
return capabilities.CapabilityResponse{}, errors.New("capability unavailable")
394+
}
395+
return (*c).Execute(ctx, request)
396+
}

pkg/capabilities/registry/base_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,17 @@ func TestRegistry(t *testing.T) {
5050

5151
gc, err := r.Get(ctx, id)
5252
require.NoError(t, err)
53+
info, err := gc.Info(t.Context())
54+
require.NoError(t, err)
5355

54-
assert.Equal(t, c, gc)
56+
assert.Equal(t, c.CapabilityInfo, info)
5557

5658
cs, err := r.List(ctx)
5759
require.NoError(t, err)
5860
assert.Len(t, cs, 1)
59-
assert.Equal(t, c, cs[0])
61+
info, err = cs[0].Info(t.Context())
62+
require.NoError(t, err)
63+
assert.Equal(t, c.CapabilityInfo, info)
6064
}
6165

6266
func TestRegistryCompatibleVersions(t *testing.T) {

pkg/loop/internal/core/services/capability/capabilities.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"sync"
99

1010
"google.golang.org/grpc"
11+
"google.golang.org/grpc/connectivity"
1112
"google.golang.org/protobuf/types/known/emptypb"
1213

1314
"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
@@ -135,14 +136,20 @@ func InfoToReply(info capabilities.CapabilityInfo) *capabilitiespb.CapabilityInf
135136
}
136137

137138
type baseCapabilityClient struct {
139+
c *grpc.ClientConn
138140
grpc capabilitiespb.BaseCapabilityClient
139141
*net.BrokerExt
140142
}
141143

142144
var _ capabilities.BaseCapability = (*baseCapabilityClient)(nil)
143145

144146
func newBaseCapabilityClient(brokerExt *net.BrokerExt, conn *grpc.ClientConn) *baseCapabilityClient {
145-
return &baseCapabilityClient{grpc: capabilitiespb.NewBaseCapabilityClient(conn), BrokerExt: brokerExt}
147+
return &baseCapabilityClient{c: conn, grpc: capabilitiespb.NewBaseCapabilityClient(conn), BrokerExt: brokerExt}
148+
}
149+
150+
//TODO only one?
151+
func (c *baseCapabilityClient) GetState() connectivity.State {
152+
return c.c.GetState()
146153
}
147154

148155
func (c *baseCapabilityClient) Info(ctx context.Context) (capabilities.CapabilityInfo, error) {

pkg/loop/internal/core/services/capability/capabilities_registry.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
1212
capabilitiespb "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
13+
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/registry"
1314
"github.com/smartcontractkit/chainlink-common/pkg/loop/internal/net"
1415
"github.com/smartcontractkit/chainlink-common/pkg/loop/internal/pb"
1516
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
@@ -601,6 +602,10 @@ func (c *capabilitiesRegistryServer) List(ctx context.Context, _ *emptypb.Empty)
601602
return reply, nil
602603
}
603604

605+
var _ registry.StateGetter = (*TriggerCapabilityClient)(nil)
606+
var _ registry.StateGetter = (*ExecutableCapabilityClient)(nil)
607+
var _ registry.StateGetter = (*CombinedCapabilityClient)(nil)
608+
604609
func (c *capabilitiesRegistryServer) Add(ctx context.Context, request *pb.AddRequest) (*emptypb.Empty, error) {
605610
conn, err := c.Dial(request.CapabilityID)
606611
if err != nil {

0 commit comments

Comments
 (0)