Skip to content

Commit 940dec7

Browse files
committed
pkg/capabilities: support replacing registered capabilities after shutdown
1 parent d611017 commit 940dec7

5 files changed

Lines changed: 246 additions & 10 deletions

File tree

pkg/capabilities/capabilities.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
p2ptypes "github.com/smartcontractkit/libocr/ragep2p/types"
12+
"google.golang.org/grpc/connectivity"
1213
"google.golang.org/protobuf/proto"
1314
"google.golang.org/protobuf/types/known/anypb"
1415

@@ -209,6 +210,7 @@ type Validatable interface {
209210
// or extension in the future.
210211
type BaseCapability interface {
211212
Info(ctx context.Context) (CapabilityInfo, error)
213+
GetState() connectivity.State
212214
}
213215

214216
type TriggerRegistrationRequest struct {
@@ -397,6 +399,11 @@ type CapabilityInfo struct {
397399
SpendTypes []CapabilitySpendType
398400
}
399401

402+
// GetState is included to implement BaseCapability.
403+
func (c CapabilityInfo) GetState() connectivity.State {
404+
return connectivity.Idle
405+
}
406+
400407
// Parse out the version from the ID.
401408
func (c CapabilityInfo) Version() string {
402409
return c.ID[strings.Index(c.ID, "@")+1:]

pkg/capabilities/registry/base.go

Lines changed: 221 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@ import (
66
"fmt"
77
"strings"
88
"sync"
9+
"sync/atomic"
910

1011
"github.com/Masterminds/semver/v3"
12+
"google.golang.org/grpc/connectivity"
1113

1214
"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
1315
"github.com/smartcontractkit/chainlink-common/pkg/logger"
@@ -18,8 +20,13 @@ var (
1820
ErrCapabilityAlreadyExists = errors.New("capability already exists")
1921
)
2022

23+
type atomicBaseCapability interface {
24+
capabilities.BaseCapability
25+
Update(capabilities.BaseCapability) error
26+
}
27+
2128
type baseRegistry struct {
22-
m map[string]capabilities.BaseCapability
29+
m map[string]atomicBaseCapability
2330
lggr logger.Logger
2431
mu sync.RWMutex
2532
}
@@ -28,7 +35,7 @@ var _ core.CapabilitiesRegistryBase = (*baseRegistry)(nil)
2835

2936
func NewBaseRegistry(lggr logger.Logger) core.CapabilitiesRegistryBase {
3037
return &baseRegistry{
31-
m: map[string]capabilities.BaseCapability{},
38+
m: map[string]atomicBaseCapability{},
3239
lggr: logger.Named(lggr, "registries.basic"),
3340
}
3441
}
@@ -163,25 +170,230 @@ func (r *baseRegistry) Add(ctx context.Context, c capabilities.BaseCapability) e
163170
}
164171

165172
id := info.ID
166-
_, ok := r.m[id]
173+
bc, ok := r.m[id]
167174
if ok {
168-
return fmt.Errorf("%w: id %s found in registry", ErrCapabilityAlreadyExists, id)
175+
if bc.GetState() != connectivity.Shutdown {
176+
return fmt.Errorf("%w: id %s found in registry", ErrCapabilityAlreadyExists, id)
177+
}
178+
if err := bc.Update(c); err != nil {
179+
return fmt.Errorf("failed to update capability %s: %w", id, err)
180+
}
181+
} else {
182+
var ac atomicBaseCapability
183+
switch info.CapabilityType {
184+
case capabilities.CapabilityTypeTrigger:
185+
ac = &atomicTriggerCapability{}
186+
case capabilities.CapabilityTypeAction, capabilities.CapabilityTypeConsensus, capabilities.CapabilityTypeTarget:
187+
ac = &atomicExecuteCapability{}
188+
case capabilities.CapabilityTypeCombined:
189+
ac = &atomicExecuteAndTriggerCapability{}
190+
default:
191+
return fmt.Errorf("unknown capability type: %s", info.CapabilityType)
192+
}
193+
if err := ac.Update(c); err != nil {
194+
return err
195+
}
196+
r.m[id] = ac
169197
}
170-
171-
r.m[id] = c
172198
r.lggr.Infow("capability added", "id", id, "type", info.CapabilityType, "description", info.Description, "version", info.Version())
173199
return nil
174200
}
175201

176202
func (r *baseRegistry) Remove(_ context.Context, id string) error {
177203
r.mu.Lock()
178204
defer r.mu.Unlock()
179-
_, ok := r.m[id]
205+
ac, ok := r.m[id]
180206
if !ok {
181207
return fmt.Errorf("unable to remove, capability not found: %s", id)
182208
}
183-
184-
delete(r.m, id)
209+
if err := ac.Update(nil); err != nil {
210+
return fmt.Errorf("failed to remove capability %s: %w", id, err)
211+
}
185212
r.lggr.Infow("capability removed", "id", id)
186213
return nil
187214
}
215+
216+
var _ capabilities.TriggerCapability = &atomicTriggerCapability{}
217+
218+
type atomicTriggerCapability struct {
219+
atomic.Pointer[capabilities.TriggerCapability]
220+
}
221+
222+
func (a *atomicTriggerCapability) Update(c capabilities.BaseCapability) error {
223+
if c == nil {
224+
a.Store(nil)
225+
return nil
226+
}
227+
tc, ok := c.(capabilities.TriggerCapability)
228+
if !ok {
229+
return errors.New("trigger capability does not satisfy TriggerCapability interface")
230+
}
231+
a.Store(&tc)
232+
return nil
233+
}
234+
235+
func (a *atomicTriggerCapability) Info(ctx context.Context) (capabilities.CapabilityInfo, error) {
236+
c := a.Load()
237+
if c == nil {
238+
return capabilities.CapabilityInfo{}, errors.New("capability unavailable")
239+
}
240+
return (*c).Info(ctx)
241+
}
242+
243+
func (a *atomicTriggerCapability) GetState() connectivity.State {
244+
c := a.Load()
245+
if c == nil {
246+
return connectivity.Shutdown
247+
}
248+
return (*c).GetState()
249+
}
250+
251+
func (a *atomicTriggerCapability) RegisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) {
252+
c := a.Load()
253+
if c == nil {
254+
return nil, errors.New("capability unavailable")
255+
}
256+
return (*c).RegisterTrigger(ctx, request)
257+
}
258+
259+
func (a *atomicTriggerCapability) UnregisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) error {
260+
c := a.Load()
261+
if c == nil {
262+
return errors.New("capability unavailable")
263+
}
264+
return (*c).UnregisterTrigger(ctx, request)
265+
}
266+
267+
var _ capabilities.ExecutableCapability = &atomicExecuteCapability{}
268+
269+
type atomicExecuteCapability struct {
270+
atomic.Pointer[capabilities.ExecutableCapability]
271+
}
272+
273+
func (a *atomicExecuteCapability) Update(c capabilities.BaseCapability) error {
274+
if c == nil {
275+
a.Store(nil)
276+
return nil
277+
}
278+
tc, ok := c.(capabilities.ExecutableCapability)
279+
if !ok {
280+
return errors.New("action does not satisfy ExecutableCapability interface")
281+
}
282+
a.Store(&tc)
283+
return nil
284+
}
285+
286+
func (a *atomicExecuteCapability) Info(ctx context.Context) (capabilities.CapabilityInfo, error) {
287+
c := a.Load()
288+
if c == nil {
289+
return capabilities.CapabilityInfo{}, errors.New("capability unavailable")
290+
}
291+
return (*c).Info(ctx)
292+
}
293+
294+
func (a *atomicExecuteCapability) GetState() connectivity.State {
295+
c := a.Load()
296+
if c == nil {
297+
return connectivity.Shutdown
298+
}
299+
return (*c).GetState()
300+
}
301+
302+
func (a *atomicExecuteCapability) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error {
303+
c := a.Load()
304+
if c == nil {
305+
return errors.New("capability unavailable")
306+
}
307+
return (*c).RegisterToWorkflow(ctx, request)
308+
}
309+
310+
func (a *atomicExecuteCapability) UnregisterFromWorkflow(ctx context.Context, request capabilities.UnregisterFromWorkflowRequest) error {
311+
c := a.Load()
312+
if c == nil {
313+
return errors.New("capability unavailable")
314+
}
315+
return (*c).UnregisterFromWorkflow(ctx, request)
316+
}
317+
318+
func (a *atomicExecuteCapability) Execute(ctx context.Context, request capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) {
319+
c := a.Load()
320+
if c == nil {
321+
return capabilities.CapabilityResponse{}, errors.New("capability unavailable")
322+
}
323+
return (*c).Execute(ctx, request)
324+
}
325+
326+
var _ capabilities.ExecutableAndTriggerCapability = &atomicExecuteAndTriggerCapability{}
327+
328+
type atomicExecuteAndTriggerCapability struct {
329+
atomic.Pointer[capabilities.ExecutableAndTriggerCapability]
330+
}
331+
332+
func (a *atomicExecuteAndTriggerCapability) Update(c capabilities.BaseCapability) error {
333+
if c == nil {
334+
a.Store(nil)
335+
return nil
336+
}
337+
tc, ok := c.(capabilities.ExecutableAndTriggerCapability)
338+
if !ok {
339+
return errors.New("target capability does not satisfy ExecutableAndTriggerCapability interface")
340+
}
341+
a.Store(&tc)
342+
return nil
343+
}
344+
345+
func (a *atomicExecuteAndTriggerCapability) Info(ctx context.Context) (capabilities.CapabilityInfo, error) {
346+
c := a.Load()
347+
if c == nil {
348+
return capabilities.CapabilityInfo{}, errors.New("capability unavailable")
349+
}
350+
return (*c).Info(ctx)
351+
}
352+
353+
func (a *atomicExecuteAndTriggerCapability) GetState() connectivity.State {
354+
c := a.Load()
355+
if c == nil {
356+
return connectivity.Shutdown
357+
}
358+
return (*c).GetState()
359+
}
360+
361+
func (a *atomicExecuteAndTriggerCapability) RegisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) {
362+
c := a.Load()
363+
if c == nil {
364+
return nil, errors.New("capability unavailable")
365+
}
366+
return (*c).RegisterTrigger(ctx, request)
367+
}
368+
369+
func (a *atomicExecuteAndTriggerCapability) UnregisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) error {
370+
c := a.Load()
371+
if c == nil {
372+
return errors.New("capability unavailable")
373+
}
374+
return (*c).UnregisterTrigger(ctx, request)
375+
}
376+
377+
func (a *atomicExecuteAndTriggerCapability) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error {
378+
c := a.Load()
379+
if c == nil {
380+
return errors.New("capability unavailable")
381+
}
382+
return (*c).RegisterToWorkflow(ctx, request)
383+
}
384+
385+
func (a *atomicExecuteAndTriggerCapability) UnregisterFromWorkflow(ctx context.Context, request capabilities.UnregisterFromWorkflowRequest) error {
386+
c := a.Load()
387+
if c == nil {
388+
return errors.New("capability unavailable")
389+
}
390+
return (*c).UnregisterFromWorkflow(ctx, request)
391+
}
392+
393+
func (a *atomicExecuteAndTriggerCapability) Execute(ctx context.Context, request capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) {
394+
c := a.Load()
395+
if c == nil {
396+
return capabilities.CapabilityResponse{}, errors.New("capability unavailable")
397+
}
398+
return (*c).Execute(ctx, request)
399+
}

pkg/loop/internal/core/services/capability/capabilities.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"sync"
99

1010
"google.golang.org/grpc"
11+
"google.golang.org/grpc/connectivity"
1112
"google.golang.org/protobuf/types/known/emptypb"
1213

1314
"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
@@ -135,14 +136,19 @@ func InfoToReply(info capabilities.CapabilityInfo) *capabilitiespb.CapabilityInf
135136
}
136137

137138
type baseCapabilityClient struct {
139+
c *grpc.ClientConn
138140
grpc capabilitiespb.BaseCapabilityClient
139141
*net.BrokerExt
140142
}
141143

142144
var _ capabilities.BaseCapability = (*baseCapabilityClient)(nil)
143145

144146
func newBaseCapabilityClient(brokerExt *net.BrokerExt, conn *grpc.ClientConn) *baseCapabilityClient {
145-
return &baseCapabilityClient{grpc: capabilitiespb.NewBaseCapabilityClient(conn), BrokerExt: brokerExt}
147+
return &baseCapabilityClient{c: conn, grpc: capabilitiespb.NewBaseCapabilityClient(conn), BrokerExt: brokerExt}
148+
}
149+
150+
func (c *baseCapabilityClient) GetState() connectivity.State {
151+
return c.c.GetState()
146152
}
147153

148154
func (c *baseCapabilityClient) Info(ctx context.Context) (capabilities.CapabilityInfo, error) {

pkg/loop/internal/core/services/capability/capabilities_registry_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/stretchr/testify/mock"
1212
"github.com/stretchr/testify/require"
1313
"google.golang.org/grpc"
14+
"google.golang.org/grpc/connectivity"
1415

1516
p2ptypes "github.com/smartcontractkit/libocr/ragep2p/types"
1617

@@ -29,6 +30,10 @@ type mockBaseCapability struct {
2930
info capabilities.CapabilityInfo
3031
}
3132

33+
func (f *mockBaseCapability) GetState() connectivity.State {
34+
return connectivity.Idle
35+
}
36+
3237
func (f *mockBaseCapability) Info(ctx context.Context) (capabilities.CapabilityInfo, error) {
3338
return f.info, nil
3439
}

pkg/loop/internal/test/test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package test
33
import (
44
"context"
55

6+
"google.golang.org/grpc/connectivity"
7+
68
"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
79
"github.com/smartcontractkit/chainlink-common/pkg/services"
810

@@ -33,6 +35,10 @@ var _ capabilities.BaseCapability = (*baseCapability)(nil)
3335
type baseCapability struct {
3436
}
3537

38+
func (e baseCapability) GetState() connectivity.State {
39+
return connectivity.Idle
40+
}
41+
3642
func (e baseCapability) Info(ctx context.Context) (capabilities.CapabilityInfo, error) {
3743
return CapabilityInfo, nil
3844
}

0 commit comments

Comments
 (0)