Skip to content

Commit ee68518

Browse files
committed
pkg/capabilities: support replacing registered capabilities after shutdown
1 parent 61a523e commit ee68518

10 files changed

Lines changed: 295 additions & 44 deletions

File tree

pkg/capabilities/registry/base.go

Lines changed: 240 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,11 @@ import (
66
"fmt"
77
"strings"
88
"sync"
9+
"sync/atomic"
910

1011
"github.com/Masterminds/semver/v3"
12+
"google.golang.org/grpc"
13+
"google.golang.org/grpc/connectivity"
1114

1215
"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
1316
"github.com/smartcontractkit/chainlink-common/pkg/logger"
@@ -18,8 +21,22 @@ var (
1821
ErrCapabilityAlreadyExists = errors.New("capability already exists")
1922
)
2023

24+
// atomicBaseCapability extends [capabilities.BaseCapability] to support atomic updates and forward client state checks.
25+
type atomicBaseCapability interface {
26+
capabilities.BaseCapability
27+
Update(capabilities.BaseCapability) error
28+
StateGetter
29+
}
30+
31+
var _ StateGetter = (*grpc.ClientConn)(nil)
32+
33+
// StateGetter is implemented by GRPC client connections.
34+
type StateGetter interface {
35+
GetState() connectivity.State
36+
}
37+
2138
type baseRegistry struct {
22-
m map[string]capabilities.BaseCapability
39+
m map[string]atomicBaseCapability
2340
lggr logger.Logger
2441
mu sync.RWMutex
2542
}
@@ -28,7 +45,7 @@ var _ core.CapabilitiesRegistryBase = (*baseRegistry)(nil)
2845

2946
func NewBaseRegistry(lggr logger.Logger) core.CapabilitiesRegistryBase {
3047
return &baseRegistry{
31-
m: map[string]capabilities.BaseCapability{},
48+
m: map[string]atomicBaseCapability{},
3249
lggr: logger.Named(lggr, "registries.basic"),
3350
}
3451
}
@@ -142,46 +159,241 @@ func (r *baseRegistry) Add(ctx context.Context, c capabilities.BaseCapability) e
142159
return err
143160
}
144161

145-
switch info.CapabilityType {
146-
case capabilities.CapabilityTypeTrigger:
147-
_, ok := c.(capabilities.TriggerCapability)
148-
if !ok {
149-
return errors.New("trigger capability does not satisfy TriggerCapability interface")
162+
id := info.ID
163+
bc, ok := r.m[id]
164+
if ok {
165+
switch state := bc.GetState(); state {
166+
case connectivity.Shutdown, connectivity.TransientFailure:
167+
return fmt.Errorf("%w: id %s found in registry: state %s", ErrCapabilityAlreadyExists, id, state)
150168
}
151-
case capabilities.CapabilityTypeAction, capabilities.CapabilityTypeConsensus, capabilities.CapabilityTypeTarget:
152-
_, ok := c.(capabilities.ExecutableCapability)
153-
if !ok {
154-
return errors.New("action does not satisfy ExecutableCapability interface")
169+
if err := bc.Update(c); err != nil {
170+
return fmt.Errorf("failed to update capability %s: %w", id, err)
155171
}
156-
case capabilities.CapabilityTypeCombined:
157-
_, ok := c.(capabilities.ExecutableAndTriggerCapability)
158-
if !ok {
159-
return errors.New("target capability does not satisfy ExecutableAndTriggerCapability interface")
172+
} else {
173+
var ac atomicBaseCapability
174+
switch info.CapabilityType {
175+
case capabilities.CapabilityTypeTrigger:
176+
ac = &atomicTriggerCapability{}
177+
case capabilities.CapabilityTypeAction, capabilities.CapabilityTypeConsensus, capabilities.CapabilityTypeTarget:
178+
ac = &atomicExecuteCapability{}
179+
case capabilities.CapabilityTypeCombined:
180+
ac = &atomicExecuteAndTriggerCapability{}
181+
default:
182+
return fmt.Errorf("unknown capability type: %s", info.CapabilityType)
160183
}
161-
default:
162-
return fmt.Errorf("unknown capability type: %s", info.CapabilityType)
163-
}
164-
165-
id := info.ID
166-
_, ok := r.m[id]
167-
if ok {
168-
return fmt.Errorf("%w: id %s found in registry", ErrCapabilityAlreadyExists, id)
184+
if err := ac.Update(c); err != nil {
185+
return err
186+
}
187+
r.m[id] = ac
169188
}
170-
171-
r.m[id] = c
172189
r.lggr.Infow("capability added", "id", id, "type", info.CapabilityType, "description", info.Description, "version", info.Version())
173190
return nil
174191
}
175192

176193
func (r *baseRegistry) Remove(_ context.Context, id string) error {
177194
r.mu.Lock()
178195
defer r.mu.Unlock()
179-
_, ok := r.m[id]
196+
ac, ok := r.m[id]
180197
if !ok {
181198
return fmt.Errorf("unable to remove, capability not found: %s", id)
182199
}
183-
184-
delete(r.m, id)
200+
if err := ac.Update(nil); err != nil {
201+
return fmt.Errorf("failed to remove capability %s: %w", id, err)
202+
}
185203
r.lggr.Infow("capability removed", "id", id)
186204
return nil
187205
}
206+
207+
var _ capabilities.TriggerCapability = &atomicTriggerCapability{}
208+
209+
type atomicTriggerCapability struct {
210+
atomic.Pointer[capabilities.TriggerCapability]
211+
}
212+
213+
func (a *atomicTriggerCapability) Update(c capabilities.BaseCapability) error {
214+
if c == nil {
215+
a.Store(nil)
216+
return nil
217+
}
218+
tc, ok := c.(capabilities.TriggerCapability)
219+
if !ok {
220+
return errors.New("trigger capability does not satisfy TriggerCapability interface")
221+
}
222+
a.Store(&tc)
223+
return nil
224+
}
225+
226+
func (a *atomicTriggerCapability) Info(ctx context.Context) (capabilities.CapabilityInfo, error) {
227+
c := a.Load()
228+
if c == nil {
229+
return capabilities.CapabilityInfo{}, errors.New("capability unavailable")
230+
}
231+
return (*c).Info(ctx)
232+
}
233+
234+
func (a *atomicTriggerCapability) GetState() connectivity.State {
235+
c := a.Load()
236+
if c == nil {
237+
return connectivity.Shutdown
238+
}
239+
if sg, ok := (*c).(StateGetter); ok {
240+
return sg.GetState()
241+
}
242+
return connectivity.State(-1) // unknown
243+
}
244+
245+
func (a *atomicTriggerCapability) RegisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) {
246+
c := a.Load()
247+
if c == nil {
248+
return nil, errors.New("capability unavailable")
249+
}
250+
return (*c).RegisterTrigger(ctx, request)
251+
}
252+
253+
func (a *atomicTriggerCapability) UnregisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) error {
254+
c := a.Load()
255+
if c == nil {
256+
return errors.New("capability unavailable")
257+
}
258+
return (*c).UnregisterTrigger(ctx, request)
259+
}
260+
261+
var _ capabilities.ExecutableCapability = &atomicExecuteCapability{}
262+
263+
type atomicExecuteCapability struct {
264+
atomic.Pointer[capabilities.ExecutableCapability]
265+
}
266+
267+
func (a *atomicExecuteCapability) Update(c capabilities.BaseCapability) error {
268+
if c == nil {
269+
a.Store(nil)
270+
return nil
271+
}
272+
tc, ok := c.(capabilities.ExecutableCapability)
273+
if !ok {
274+
return errors.New("action does not satisfy ExecutableCapability interface")
275+
}
276+
a.Store(&tc)
277+
return nil
278+
}
279+
280+
func (a *atomicExecuteCapability) Info(ctx context.Context) (capabilities.CapabilityInfo, error) {
281+
c := a.Load()
282+
if c == nil {
283+
return capabilities.CapabilityInfo{}, errors.New("capability unavailable")
284+
}
285+
return (*c).Info(ctx)
286+
}
287+
288+
func (a *atomicExecuteCapability) GetState() connectivity.State {
289+
c := a.Load()
290+
if c == nil {
291+
return connectivity.Shutdown
292+
}
293+
if sg, ok := (*c).(StateGetter); ok {
294+
return sg.GetState()
295+
}
296+
return connectivity.State(-1) // unknown
297+
}
298+
299+
func (a *atomicExecuteCapability) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error {
300+
c := a.Load()
301+
if c == nil {
302+
return errors.New("capability unavailable")
303+
}
304+
return (*c).RegisterToWorkflow(ctx, request)
305+
}
306+
307+
func (a *atomicExecuteCapability) UnregisterFromWorkflow(ctx context.Context, request capabilities.UnregisterFromWorkflowRequest) error {
308+
c := a.Load()
309+
if c == nil {
310+
return errors.New("capability unavailable")
311+
}
312+
return (*c).UnregisterFromWorkflow(ctx, request)
313+
}
314+
315+
func (a *atomicExecuteCapability) Execute(ctx context.Context, request capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) {
316+
c := a.Load()
317+
if c == nil {
318+
return capabilities.CapabilityResponse{}, errors.New("capability unavailable")
319+
}
320+
return (*c).Execute(ctx, request)
321+
}
322+
323+
var _ capabilities.ExecutableAndTriggerCapability = &atomicExecuteAndTriggerCapability{}
324+
325+
type atomicExecuteAndTriggerCapability struct {
326+
atomic.Pointer[capabilities.ExecutableAndTriggerCapability]
327+
}
328+
329+
func (a *atomicExecuteAndTriggerCapability) Update(c capabilities.BaseCapability) error {
330+
if c == nil {
331+
a.Store(nil)
332+
return nil
333+
}
334+
tc, ok := c.(capabilities.ExecutableAndTriggerCapability)
335+
if !ok {
336+
return errors.New("target capability does not satisfy ExecutableAndTriggerCapability interface")
337+
}
338+
a.Store(&tc)
339+
return nil
340+
}
341+
342+
func (a *atomicExecuteAndTriggerCapability) Info(ctx context.Context) (capabilities.CapabilityInfo, error) {
343+
c := a.Load()
344+
if c == nil {
345+
return capabilities.CapabilityInfo{}, errors.New("capability unavailable")
346+
}
347+
return (*c).Info(ctx)
348+
}
349+
350+
func (a *atomicExecuteAndTriggerCapability) GetState() connectivity.State {
351+
c := a.Load()
352+
if c == nil {
353+
return connectivity.Shutdown
354+
}
355+
if sg, ok := (*c).(StateGetter); ok {
356+
return sg.GetState()
357+
}
358+
return connectivity.State(-1) // unknown
359+
}
360+
361+
func (a *atomicExecuteAndTriggerCapability) RegisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) {
362+
c := a.Load()
363+
if c == nil {
364+
return nil, errors.New("capability unavailable")
365+
}
366+
return (*c).RegisterTrigger(ctx, request)
367+
}
368+
369+
func (a *atomicExecuteAndTriggerCapability) UnregisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) error {
370+
c := a.Load()
371+
if c == nil {
372+
return errors.New("capability unavailable")
373+
}
374+
return (*c).UnregisterTrigger(ctx, request)
375+
}
376+
377+
func (a *atomicExecuteAndTriggerCapability) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error {
378+
c := a.Load()
379+
if c == nil {
380+
return errors.New("capability unavailable")
381+
}
382+
return (*c).RegisterToWorkflow(ctx, request)
383+
}
384+
385+
func (a *atomicExecuteAndTriggerCapability) UnregisterFromWorkflow(ctx context.Context, request capabilities.UnregisterFromWorkflowRequest) error {
386+
c := a.Load()
387+
if c == nil {
388+
return errors.New("capability unavailable")
389+
}
390+
return (*c).UnregisterFromWorkflow(ctx, request)
391+
}
392+
393+
func (a *atomicExecuteAndTriggerCapability) Execute(ctx context.Context, request capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) {
394+
c := a.Load()
395+
if c == nil {
396+
return capabilities.CapabilityResponse{}, errors.New("capability unavailable")
397+
}
398+
return (*c).Execute(ctx, request)
399+
}

pkg/capabilities/registry/base_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,17 @@ func TestRegistry(t *testing.T) {
5050

5151
gc, err := r.Get(ctx, id)
5252
require.NoError(t, err)
53+
info, err := gc.Info(t.Context())
54+
require.NoError(t, err)
5355

54-
assert.Equal(t, c, gc)
56+
assert.Equal(t, c.CapabilityInfo, info)
5557

5658
cs, err := r.List(ctx)
5759
require.NoError(t, err)
5860
assert.Len(t, cs, 1)
59-
assert.Equal(t, c, cs[0])
61+
info, err = cs[0].Info(t.Context())
62+
require.NoError(t, err)
63+
assert.Equal(t, c.CapabilityInfo, info)
6064
}
6165

6266
func TestRegistryCompatibleVersions(t *testing.T) {

pkg/loop/internal/core/services/capability/capabilities.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"sync"
99

1010
"google.golang.org/grpc"
11+
"google.golang.org/grpc/connectivity"
1112
"google.golang.org/protobuf/types/known/emptypb"
1213

1314
"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
@@ -22,7 +23,7 @@ type TriggerCapabilityClient struct {
2223
*baseCapabilityClient
2324
}
2425

25-
func NewTriggerCapabilityClient(brokerExt *net.BrokerExt, conn grpc.ClientConnInterface) capabilities.TriggerCapability {
26+
func NewTriggerCapabilityClient(brokerExt *net.BrokerExt, conn net.ClientConnInterface) capabilities.TriggerCapability {
2627
return &TriggerCapabilityClient{
2728
triggerExecutableClient: newTriggerExecutableClient(brokerExt, conn),
2829
baseCapabilityClient: newBaseCapabilityClient(brokerExt, conn),
@@ -39,7 +40,7 @@ type ExecutableCapability interface {
3940
capabilities.BaseCapability
4041
}
4142

42-
func NewExecutableCapabilityClient(brokerExt *net.BrokerExt, conn grpc.ClientConnInterface) ExecutableCapability {
43+
func NewExecutableCapabilityClient(brokerExt *net.BrokerExt, conn net.ClientConnInterface) ExecutableCapability {
4344
return &ExecutableCapabilityClient{
4445
executableClient: newExecutableClient(brokerExt, conn),
4546
baseCapabilityClient: newBaseCapabilityClient(brokerExt, conn),
@@ -52,7 +53,7 @@ type CombinedCapabilityClient struct {
5253
*triggerExecutableClient
5354
}
5455

55-
func NewCombinedCapabilityClient(brokerExt *net.BrokerExt, conn grpc.ClientConnInterface) ExecutableCapability {
56+
func NewCombinedCapabilityClient(brokerExt *net.BrokerExt, conn net.ClientConnInterface) ExecutableCapability {
5657
return &CombinedCapabilityClient{
5758
executableClient: newExecutableClient(brokerExt, conn),
5859
baseCapabilityClient: newBaseCapabilityClient(brokerExt, conn),
@@ -135,14 +136,18 @@ func InfoToReply(info capabilities.CapabilityInfo) *capabilitiespb.CapabilityInf
135136
}
136137

137138
type baseCapabilityClient struct {
139+
c net.ClientConnInterface
138140
grpc capabilitiespb.BaseCapabilityClient
139141
*net.BrokerExt
140142
}
141143

142144
var _ capabilities.BaseCapability = (*baseCapabilityClient)(nil)
143145

144-
func newBaseCapabilityClient(brokerExt *net.BrokerExt, conn grpc.ClientConnInterface) *baseCapabilityClient {
145-
return &baseCapabilityClient{grpc: capabilitiespb.NewBaseCapabilityClient(conn), BrokerExt: brokerExt}
146+
func newBaseCapabilityClient(brokerExt *net.BrokerExt, conn net.ClientConnInterface) *baseCapabilityClient {
147+
return &baseCapabilityClient{c: conn, grpc: capabilitiespb.NewBaseCapabilityClient(conn), BrokerExt: brokerExt}
148+
}
149+
func (c *baseCapabilityClient) GetState() connectivity.State {
150+
return c.c.GetState()
146151
}
147152

148153
func (c *baseCapabilityClient) Info(ctx context.Context) (capabilities.CapabilityInfo, error) {

0 commit comments

Comments
 (0)