-
Notifications
You must be signed in to change notification settings - Fork 28
pkg/capabilities: support replacing registered capabilities after shutdown #1639
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,8 +6,11 @@ import ( | |
| "fmt" | ||
| "strings" | ||
| "sync" | ||
| "sync/atomic" | ||
|
|
||
| "github.com/Masterminds/semver/v3" | ||
| "google.golang.org/grpc" | ||
| "google.golang.org/grpc/connectivity" | ||
|
|
||
| "github.com/smartcontractkit/chainlink-common/pkg/capabilities" | ||
| "github.com/smartcontractkit/chainlink-common/pkg/logger" | ||
|
|
@@ -18,8 +21,22 @@ var ( | |
| ErrCapabilityAlreadyExists = errors.New("capability already exists") | ||
| ) | ||
|
|
||
| // atomicBaseCapability extends [capabilities.BaseCapability] to support atomic updates and forward client state checks. | ||
| type atomicBaseCapability interface { | ||
| capabilities.BaseCapability | ||
| Update(capabilities.BaseCapability) error | ||
| StateGetter | ||
| } | ||
|
|
||
| var _ StateGetter = (*grpc.ClientConn)(nil) | ||
|
|
||
| // StateGetter is implemented by GRPC client connections. | ||
| type StateGetter interface { | ||
| GetState() connectivity.State | ||
| } | ||
|
|
||
| type baseRegistry struct { | ||
| m map[string]capabilities.BaseCapability | ||
| m map[string]atomicBaseCapability | ||
| lggr logger.Logger | ||
| mu sync.RWMutex | ||
| } | ||
|
|
@@ -28,7 +45,7 @@ var _ core.CapabilitiesRegistryBase = (*baseRegistry)(nil) | |
|
|
||
| func NewBaseRegistry(lggr logger.Logger) core.CapabilitiesRegistryBase { | ||
| return &baseRegistry{ | ||
| m: map[string]capabilities.BaseCapability{}, | ||
| m: map[string]atomicBaseCapability{}, | ||
| lggr: logger.Named(lggr, "registries.basic"), | ||
| } | ||
| } | ||
|
|
@@ -142,46 +159,243 @@ func (r *baseRegistry) Add(ctx context.Context, c capabilities.BaseCapability) e | |
| return err | ||
| } | ||
|
|
||
| switch info.CapabilityType { | ||
| case capabilities.CapabilityTypeTrigger: | ||
| _, ok := c.(capabilities.TriggerCapability) | ||
| if !ok { | ||
| return errors.New("trigger capability does not satisfy TriggerCapability interface") | ||
| id := info.ID | ||
| bc, ok := r.m[id] | ||
| if ok { | ||
| switch state := bc.GetState(); state { | ||
| case connectivity.Shutdown, connectivity.TransientFailure, connectivity.Idle: | ||
| // allow replace | ||
| default: | ||
| return fmt.Errorf("%w: id %s found in registry: state %s", ErrCapabilityAlreadyExists, id, state) | ||
| } | ||
| case capabilities.CapabilityTypeAction, capabilities.CapabilityTypeConsensus, capabilities.CapabilityTypeTarget: | ||
| _, ok := c.(capabilities.ExecutableCapability) | ||
| if !ok { | ||
| return errors.New("action does not satisfy ExecutableCapability interface") | ||
| if err := bc.Update(c); err != nil { | ||
| return fmt.Errorf("failed to update capability %s: %w", id, err) | ||
| } | ||
| case capabilities.CapabilityTypeCombined: | ||
| _, ok := c.(capabilities.ExecutableAndTriggerCapability) | ||
| if !ok { | ||
| return errors.New("target capability does not satisfy ExecutableAndTriggerCapability interface") | ||
| } else { | ||
| var ac atomicBaseCapability | ||
| switch info.CapabilityType { | ||
| case capabilities.CapabilityTypeTrigger: | ||
| ac = &atomicTriggerCapability{} | ||
| case capabilities.CapabilityTypeAction, capabilities.CapabilityTypeConsensus, capabilities.CapabilityTypeTarget: | ||
| ac = &atomicExecuteCapability{} | ||
| case capabilities.CapabilityTypeCombined: | ||
| ac = &atomicExecuteAndTriggerCapability{} | ||
| default: | ||
| return fmt.Errorf("unknown capability type: %s", info.CapabilityType) | ||
| } | ||
| default: | ||
| return fmt.Errorf("unknown capability type: %s", info.CapabilityType) | ||
| } | ||
|
|
||
| id := info.ID | ||
| _, ok := r.m[id] | ||
| if ok { | ||
| return fmt.Errorf("%w: id %s found in registry", ErrCapabilityAlreadyExists, id) | ||
| if err := ac.Update(c); err != nil { | ||
| return err | ||
| } | ||
| r.m[id] = ac | ||
| } | ||
|
|
||
| r.m[id] = c | ||
| r.lggr.Infow("capability added", "id", id, "type", info.CapabilityType, "description", info.Description, "version", info.Version()) | ||
| return nil | ||
| } | ||
|
|
||
| func (r *baseRegistry) Remove(_ context.Context, id string) error { | ||
| r.mu.Lock() | ||
| defer r.mu.Unlock() | ||
| _, ok := r.m[id] | ||
| ac, ok := r.m[id] | ||
| if !ok { | ||
| return fmt.Errorf("unable to remove, capability not found: %s", id) | ||
| } | ||
|
|
||
| delete(r.m, id) | ||
| if err := ac.Update(nil); err != nil { | ||
| return fmt.Errorf("failed to remove capability %s: %w", id, err) | ||
| } | ||
| r.lggr.Infow("capability removed", "id", id) | ||
| return nil | ||
| } | ||
|
|
||
| var _ capabilities.TriggerCapability = &atomicTriggerCapability{} | ||
|
|
||
| type atomicTriggerCapability struct { | ||
| atomic.Pointer[capabilities.TriggerCapability] | ||
| } | ||
|
|
||
| func (a *atomicTriggerCapability) Update(c capabilities.BaseCapability) error { | ||
| if c == nil { | ||
| a.Store(nil) | ||
| return nil | ||
| } | ||
| tc, ok := c.(capabilities.TriggerCapability) | ||
| if !ok { | ||
| return errors.New("trigger capability does not satisfy TriggerCapability interface") | ||
| } | ||
| a.Store(&tc) | ||
| return nil | ||
| } | ||
|
|
||
| func (a *atomicTriggerCapability) Info(ctx context.Context) (capabilities.CapabilityInfo, error) { | ||
| c := a.Load() | ||
| if c == nil { | ||
| return capabilities.CapabilityInfo{}, errors.New("capability unavailable") | ||
| } | ||
| return (*c).Info(ctx) | ||
| } | ||
|
|
||
| func (a *atomicTriggerCapability) GetState() connectivity.State { | ||
| c := a.Load() | ||
| if c == nil { | ||
| return connectivity.Shutdown | ||
| } | ||
| if sg, ok := (*c).(StateGetter); ok { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we need the type cast check here but not on 252, 260? |
||
| return sg.GetState() | ||
| } | ||
| return connectivity.State(-1) // unknown | ||
| } | ||
|
|
||
| func (a *atomicTriggerCapability) RegisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) { | ||
| c := a.Load() | ||
| if c == nil { | ||
| return nil, errors.New("capability unavailable") | ||
| } | ||
| return (*c).RegisterTrigger(ctx, request) | ||
| } | ||
|
|
||
| func (a *atomicTriggerCapability) UnregisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) error { | ||
| c := a.Load() | ||
| if c == nil { | ||
| return errors.New("capability unavailable") | ||
| } | ||
| return (*c).UnregisterTrigger(ctx, request) | ||
| } | ||
|
|
||
| var _ capabilities.ExecutableCapability = &atomicExecuteCapability{} | ||
|
|
||
| type atomicExecuteCapability struct { | ||
| atomic.Pointer[capabilities.ExecutableCapability] | ||
| } | ||
|
|
||
| func (a *atomicExecuteCapability) Update(c capabilities.BaseCapability) error { | ||
| if c == nil { | ||
| a.Store(nil) | ||
| return nil | ||
| } | ||
| tc, ok := c.(capabilities.ExecutableCapability) | ||
| if !ok { | ||
| return errors.New("action does not satisfy ExecutableCapability interface") | ||
| } | ||
| a.Store(&tc) | ||
| return nil | ||
| } | ||
|
|
||
| func (a *atomicExecuteCapability) Info(ctx context.Context) (capabilities.CapabilityInfo, error) { | ||
| c := a.Load() | ||
| if c == nil { | ||
| return capabilities.CapabilityInfo{}, errors.New("capability unavailable") | ||
| } | ||
| return (*c).Info(ctx) | ||
| } | ||
|
|
||
| func (a *atomicExecuteCapability) GetState() connectivity.State { | ||
| c := a.Load() | ||
| if c == nil { | ||
| return connectivity.Shutdown | ||
| } | ||
| if sg, ok := (*c).(StateGetter); ok { | ||
| return sg.GetState() | ||
| } | ||
| return connectivity.State(-1) // unknown | ||
| } | ||
|
|
||
| func (a *atomicExecuteCapability) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error { | ||
| c := a.Load() | ||
| if c == nil { | ||
| return errors.New("capability unavailable") | ||
| } | ||
| return (*c).RegisterToWorkflow(ctx, request) | ||
| } | ||
|
|
||
| func (a *atomicExecuteCapability) UnregisterFromWorkflow(ctx context.Context, request capabilities.UnregisterFromWorkflowRequest) error { | ||
| c := a.Load() | ||
| if c == nil { | ||
| return errors.New("capability unavailable") | ||
| } | ||
| return (*c).UnregisterFromWorkflow(ctx, request) | ||
| } | ||
|
|
||
| func (a *atomicExecuteCapability) Execute(ctx context.Context, request capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) { | ||
| c := a.Load() | ||
| if c == nil { | ||
| return capabilities.CapabilityResponse{}, errors.New("capability unavailable") | ||
| } | ||
| return (*c).Execute(ctx, request) | ||
| } | ||
|
|
||
| var _ capabilities.ExecutableAndTriggerCapability = &atomicExecuteAndTriggerCapability{} | ||
|
|
||
| type atomicExecuteAndTriggerCapability struct { | ||
| atomic.Pointer[capabilities.ExecutableAndTriggerCapability] | ||
| } | ||
|
|
||
| func (a *atomicExecuteAndTriggerCapability) Update(c capabilities.BaseCapability) error { | ||
| if c == nil { | ||
| a.Store(nil) | ||
| return nil | ||
| } | ||
| tc, ok := c.(capabilities.ExecutableAndTriggerCapability) | ||
| if !ok { | ||
| return errors.New("target capability does not satisfy ExecutableAndTriggerCapability interface") | ||
| } | ||
| a.Store(&tc) | ||
| return nil | ||
| } | ||
|
|
||
| func (a *atomicExecuteAndTriggerCapability) Info(ctx context.Context) (capabilities.CapabilityInfo, error) { | ||
| c := a.Load() | ||
| if c == nil { | ||
| return capabilities.CapabilityInfo{}, errors.New("capability unavailable") | ||
| } | ||
| return (*c).Info(ctx) | ||
| } | ||
|
|
||
| func (a *atomicExecuteAndTriggerCapability) GetState() connectivity.State { | ||
| c := a.Load() | ||
| if c == nil { | ||
| return connectivity.Shutdown | ||
| } | ||
| if sg, ok := (*c).(StateGetter); ok { | ||
| return sg.GetState() | ||
| } | ||
| return connectivity.State(-1) // unknown | ||
| } | ||
|
|
||
| func (a *atomicExecuteAndTriggerCapability) RegisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) { | ||
| c := a.Load() | ||
| if c == nil { | ||
| return nil, errors.New("capability unavailable") | ||
| } | ||
| return (*c).RegisterTrigger(ctx, request) | ||
| } | ||
|
|
||
| func (a *atomicExecuteAndTriggerCapability) UnregisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) error { | ||
| c := a.Load() | ||
| if c == nil { | ||
| return errors.New("capability unavailable") | ||
| } | ||
| return (*c).UnregisterTrigger(ctx, request) | ||
| } | ||
|
|
||
| func (a *atomicExecuteAndTriggerCapability) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error { | ||
| c := a.Load() | ||
| if c == nil { | ||
| return errors.New("capability unavailable") | ||
| } | ||
| return (*c).RegisterToWorkflow(ctx, request) | ||
| } | ||
|
|
||
| func (a *atomicExecuteAndTriggerCapability) UnregisterFromWorkflow(ctx context.Context, request capabilities.UnregisterFromWorkflowRequest) error { | ||
| c := a.Load() | ||
| if c == nil { | ||
| return errors.New("capability unavailable") | ||
| } | ||
| return (*c).UnregisterFromWorkflow(ctx, request) | ||
| } | ||
|
|
||
| func (a *atomicExecuteAndTriggerCapability) Execute(ctx context.Context, request capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) { | ||
| c := a.Load() | ||
| if c == nil { | ||
| return capabilities.CapabilityResponse{}, errors.New("capability unavailable") | ||
| } | ||
| return (*c).Execute(ctx, request) | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't want to include
Idlehere, but it is apparently necessary.