Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,7 @@ func TestReduceAggregator_Aggregate(t *testing.T) {
"BenchmarkPrice": uint64(100),
"Timestamp": 12341414929,
})
require.NoError(t, err)
mockValueWithNil.Underlying["BenchmarkPrice"] = nil // simulate failed wraping of uint64
return map[commontypes.OracleID][]values.Value{1: {mockValue}, 2: {mockValue}, 3: {mockValue}, 4: {mockValueWithNil}}
},
Expand Down
13 changes: 5 additions & 8 deletions pkg/capabilities/consensus/ocr3/ocr3.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ocr3

import (
"context"
"errors"
"time"

"github.com/jonboulle/clockwork"
Expand Down Expand Up @@ -101,15 +102,11 @@ func (o *Capability) NewValidationService(ctx context.Context) (core.ValidationS
}

func (o *Capability) Close() error {
o.Plugin.Close()
err := o.Plugin.Close()

if o.capabilityRegistry == nil {
return nil
if o.capabilityRegistry != nil {
err = errors.Join(err, o.capabilityRegistry.Remove(context.TODO(), o.config.capability.ID))
}

if err := o.capabilityRegistry.Remove(context.TODO(), o.config.capability.ID); err != nil {
return err
}

return nil
return err
}
1 change: 1 addition & 0 deletions pkg/capabilities/consensus/requests/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func TestOCR3Store(t *testing.T) {

t.Run("rangeN", func(t *testing.T) {
err := s.Add(&ocr3.ReportRequest{WorkflowExecutionID: uuid.New().String(), ExpiresAt: n.Add(1 * time.Hour)})
require.NoError(t, err)
r, err := s.RangeN(0, 1)
assert.NoError(t, err)
assert.Len(t, r, 1)
Expand Down
115 changes: 57 additions & 58 deletions pkg/loop/internal/core/services/capability/capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
caperrors "github.com/smartcontractkit/chainlink-common/pkg/capabilities/errors"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
capabilitiespb "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink-common/pkg/loop/internal/net"
"github.com/smartcontractkit/chainlink-protos/cre/go/values"
)
Expand Down Expand Up @@ -67,8 +66,8 @@ func RegisterExecutableCapabilityServer(server *grpc.Server, broker net.Broker,
BrokerConfig: brokerCfg,
Broker: broker,
}
capabilitiespb.RegisterExecutableServer(server, newExecutableServer(bext, impl))
capabilitiespb.RegisterBaseCapabilityServer(server, newBaseCapabilityServer(impl))
pb.RegisterExecutableServer(server, newExecutableServer(bext, impl))
pb.RegisterBaseCapabilityServer(server, newBaseCapabilityServer(impl))
return nil
}

Expand All @@ -77,13 +76,13 @@ func RegisterTriggerCapabilityServer(server *grpc.Server, broker net.Broker, bro
BrokerConfig: brokerCfg,
Broker: broker,
}
capabilitiespb.RegisterTriggerExecutableServer(server, newTriggerExecutableServer(bext, impl))
capabilitiespb.RegisterBaseCapabilityServer(server, newBaseCapabilityServer(impl))
pb.RegisterTriggerExecutableServer(server, newTriggerExecutableServer(bext, impl))
pb.RegisterBaseCapabilityServer(server, newBaseCapabilityServer(impl))
return nil
}

type baseCapabilityServer struct {
capabilitiespb.UnimplementedBaseCapabilityServer
pb.UnimplementedBaseCapabilityServer

impl capabilities.BaseCapability
}
Expand All @@ -92,9 +91,9 @@ func newBaseCapabilityServer(impl capabilities.BaseCapability) *baseCapabilitySe
return &baseCapabilityServer{impl: impl}
}

var _ capabilitiespb.BaseCapabilityServer = (*baseCapabilityServer)(nil)
var _ pb.BaseCapabilityServer = (*baseCapabilityServer)(nil)

func (c *baseCapabilityServer) Info(ctx context.Context, request *emptypb.Empty) (*capabilitiespb.CapabilityInfoReply, error) {
func (c *baseCapabilityServer) Info(ctx context.Context, request *emptypb.Empty) (*pb.CapabilityInfoReply, error) {
info, err := c.impl.Info(ctx)
if err != nil {
return nil, err
Expand All @@ -103,31 +102,31 @@ func (c *baseCapabilityServer) Info(ctx context.Context, request *emptypb.Empty)
return InfoToReply(info), nil
}

func InfoToReply(info capabilities.CapabilityInfo) *capabilitiespb.CapabilityInfoReply {
var ct capabilitiespb.CapabilityType
func InfoToReply(info capabilities.CapabilityInfo) *pb.CapabilityInfoReply {
var ct pb.CapabilityType
switch info.CapabilityType {
case capabilities.CapabilityTypeTrigger:
ct = capabilitiespb.CapabilityType_CAPABILITY_TYPE_TRIGGER
ct = pb.CapabilityType_CAPABILITY_TYPE_TRIGGER
case capabilities.CapabilityTypeAction:
ct = capabilitiespb.CapabilityType_CAPABILITY_TYPE_ACTION
ct = pb.CapabilityType_CAPABILITY_TYPE_ACTION
case capabilities.CapabilityTypeConsensus:
ct = capabilitiespb.CapabilityType_CAPABILITY_TYPE_CONSENSUS
ct = pb.CapabilityType_CAPABILITY_TYPE_CONSENSUS
case capabilities.CapabilityTypeTarget:
ct = capabilitiespb.CapabilityType_CAPABILITY_TYPE_TARGET
ct = pb.CapabilityType_CAPABILITY_TYPE_TARGET
case capabilities.CapabilityTypeCombined:
ct = capabilitiespb.CapabilityType_CAPABILITY_TYPE_COMBINED
ct = pb.CapabilityType_CAPABILITY_TYPE_COMBINED
case capabilities.CapabilityTypeUnknown:
ct = capabilitiespb.CapabilityType_CAPABILITY_TYPE_UNKNOWN
ct = pb.CapabilityType_CAPABILITY_TYPE_UNKNOWN
default:
ct = capabilitiespb.CapabilityType_CAPABILITY_TYPE_UNKNOWN
ct = pb.CapabilityType_CAPABILITY_TYPE_UNKNOWN
}

types := make([]string, len(info.SpendTypes))
for idx, sType := range info.SpendTypes {
types[idx] = string(sType)
}

return &capabilitiespb.CapabilityInfoReply{
return &pb.CapabilityInfoReply{
Id: info.ID,
CapabilityType: ct,
Description: info.Description,
Expand All @@ -138,14 +137,14 @@ func InfoToReply(info capabilities.CapabilityInfo) *capabilitiespb.CapabilityInf

type baseCapabilityClient struct {
c net.ClientConnInterface
grpc capabilitiespb.BaseCapabilityClient
grpc pb.BaseCapabilityClient
*net.BrokerExt
}

var _ capabilities.BaseCapability = (*baseCapabilityClient)(nil)

func newBaseCapabilityClient(brokerExt *net.BrokerExt, conn net.ClientConnInterface) *baseCapabilityClient {
return &baseCapabilityClient{c: conn, grpc: capabilitiespb.NewBaseCapabilityClient(conn), BrokerExt: brokerExt}
return &baseCapabilityClient{c: conn, grpc: pb.NewBaseCapabilityClient(conn), BrokerExt: brokerExt}
}
func (c *baseCapabilityClient) GetState() connectivity.State {
return c.c.GetState()
Expand All @@ -160,20 +159,20 @@ func (c *baseCapabilityClient) Info(ctx context.Context) (capabilities.Capabilit
return InfoReplyToInfo(reply)
}

func InfoReplyToInfo(resp *capabilitiespb.CapabilityInfoReply) (capabilities.CapabilityInfo, error) {
func InfoReplyToInfo(resp *pb.CapabilityInfoReply) (capabilities.CapabilityInfo, error) {
var ct capabilities.CapabilityType
switch resp.CapabilityType {
case capabilitiespb.CapabilityTypeTrigger:
case pb.CapabilityTypeTrigger:
ct = capabilities.CapabilityTypeTrigger
case capabilitiespb.CapabilityTypeAction:
case pb.CapabilityTypeAction:
ct = capabilities.CapabilityTypeAction
case capabilitiespb.CapabilityTypeConsensus:
case pb.CapabilityTypeConsensus:
ct = capabilities.CapabilityTypeConsensus
case capabilitiespb.CapabilityTypeTarget:
case pb.CapabilityTypeTarget:
ct = capabilities.CapabilityTypeTarget
case capabilitiespb.CapabilityTypeCombined:
case pb.CapabilityTypeCombined:
ct = capabilities.CapabilityTypeCombined
case capabilitiespb.CapabilityTypeUnknown:
case pb.CapabilityTypeUnknown:
return capabilities.CapabilityInfo{}, fmt.Errorf("invalid capability type: %s", ct)
}

Expand All @@ -192,7 +191,7 @@ func InfoReplyToInfo(resp *capabilitiespb.CapabilityInfoReply) (capabilities.Cap
}

type triggerExecutableServer struct {
capabilitiespb.UnimplementedTriggerExecutableServer
pb.UnimplementedTriggerExecutableServer
*net.BrokerExt

impl capabilities.TriggerExecutable
Expand All @@ -205,17 +204,17 @@ func newTriggerExecutableServer(brokerExt *net.BrokerExt, impl capabilities.Trig
}
}

var _ capabilitiespb.TriggerExecutableServer = (*triggerExecutableServer)(nil)
var _ pb.TriggerExecutableServer = (*triggerExecutableServer)(nil)

func (t *triggerExecutableServer) AckEvent(ctx context.Context, req *capabilitiespb.AckEventRequest) (*emptypb.Empty, error) {
func (t *triggerExecutableServer) AckEvent(ctx context.Context, req *pb.AckEventRequest) (*emptypb.Empty, error) {
if err := t.impl.AckEvent(ctx, req.TriggerId, req.EventId, req.Method); err != nil {
return nil, fmt.Errorf("error acking event: %w", err)
}
return &emptypb.Empty{}, nil
}

func (t *triggerExecutableServer) RegisterTrigger(request *capabilitiespb.TriggerRegistrationRequest,
server capabilitiespb.TriggerExecutable_RegisterTriggerServer) error {
func (t *triggerExecutableServer) RegisterTrigger(request *pb.TriggerRegistrationRequest,
server pb.TriggerExecutable_RegisterTriggerServer) error {
req, err := pb.TriggerRegistrationRequestFromProto(request)
if err != nil {
return fmt.Errorf("could not unmarshal capability request: %w", err)
Expand All @@ -231,9 +230,9 @@ func (t *triggerExecutableServer) RegisterTrigger(request *capabilitiespb.Trigge
if errors.As(err, &capErr) {
errorString = capErr.SerializeToString()
}
msg := &capabilitiespb.TriggerResponseMessage{
Message: &capabilitiespb.TriggerResponseMessage_Response{
Response: &capabilitiespb.TriggerResponse{
msg := &pb.TriggerResponseMessage{
Message: &pb.TriggerResponseMessage_Response{
Response: &pb.TriggerResponse{
Error: errorString,
},
},
Expand All @@ -242,8 +241,8 @@ func (t *triggerExecutableServer) RegisterTrigger(request *capabilitiespb.Trigge
}

// Send ACK response to client
msg := &capabilitiespb.TriggerResponseMessage{
Message: &capabilitiespb.TriggerResponseMessage_Ack{
msg := &pb.TriggerResponseMessage{
Message: &pb.TriggerResponseMessage_Ack{
Ack: &emptypb.Empty{},
},
}
Expand All @@ -268,8 +267,8 @@ func (t *triggerExecutableServer) RegisterTrigger(request *capabilitiespb.Trigge
return nil
}

msg := &capabilitiespb.TriggerResponseMessage{
Message: &capabilitiespb.TriggerResponseMessage_Response{
msg := &pb.TriggerResponseMessage{
Message: &pb.TriggerResponseMessage_Response{
Response: pb.TriggerResponseToProto(resp),
},
}
Expand All @@ -280,7 +279,7 @@ func (t *triggerExecutableServer) RegisterTrigger(request *capabilitiespb.Trigge
}
}

func (t *triggerExecutableServer) UnregisterTrigger(ctx context.Context, request *capabilitiespb.TriggerRegistrationRequest) (*emptypb.Empty, error) {
func (t *triggerExecutableServer) UnregisterTrigger(ctx context.Context, request *pb.TriggerRegistrationRequest) (*emptypb.Empty, error) {
req, err := pb.TriggerRegistrationRequestFromProto(request)
if err != nil {
return nil, fmt.Errorf("could not unmarshal capability request: %w", err)
Expand All @@ -293,7 +292,7 @@ func (t *triggerExecutableServer) UnregisterTrigger(ctx context.Context, request
}

type triggerExecutableClient struct {
grpc capabilitiespb.TriggerExecutableClient
grpc pb.TriggerExecutableClient
*net.BrokerExt

// manage cancelation of gRPC client stream by trigger ID
Expand All @@ -302,7 +301,7 @@ type triggerExecutableClient struct {
}

func (t *triggerExecutableClient) AckEvent(ctx context.Context, triggerId string, eventId string, method string) error {
req := &capabilitiespb.AckEventRequest{
req := &pb.AckEventRequest{
TriggerId: triggerId,
EventId: eventId,
Method: method,
Expand Down Expand Up @@ -383,14 +382,14 @@ func (t *triggerExecutableClient) UnregisterTrigger(ctx context.Context, req cap

func newTriggerExecutableClient(brokerExt *net.BrokerExt, conn grpc.ClientConnInterface) *triggerExecutableClient {
return &triggerExecutableClient{
grpc: capabilitiespb.NewTriggerExecutableClient(conn),
grpc: pb.NewTriggerExecutableClient(conn),
BrokerExt: brokerExt,
cancelFuncs: make(map[string]func()),
}
}

type executableServer struct {
capabilitiespb.UnimplementedExecutableServer
pb.UnimplementedExecutableServer
*net.BrokerExt

impl capabilities.Executable
Expand All @@ -406,9 +405,9 @@ func newExecutableServer(brokerExt *net.BrokerExt, impl capabilities.Executable)
}
}

var _ capabilitiespb.ExecutableServer = (*executableServer)(nil)
var _ pb.ExecutableServer = (*executableServer)(nil)

func (c *executableServer) RegisterToWorkflow(ctx context.Context, req *capabilitiespb.RegisterToWorkflowRequest) (*emptypb.Empty, error) {
func (c *executableServer) RegisterToWorkflow(ctx context.Context, req *pb.RegisterToWorkflowRequest) (*emptypb.Empty, error) {
config, err := values.FromMapValueProto(req.Config)
if err != nil {
return nil, fmt.Errorf("could not unmarshal config into map: %w", err)
Expand All @@ -424,7 +423,7 @@ func (c *executableServer) RegisterToWorkflow(ctx context.Context, req *capabili
return &emptypb.Empty{}, err
}

func (c *executableServer) UnregisterFromWorkflow(ctx context.Context, req *capabilitiespb.UnregisterFromWorkflowRequest) (*emptypb.Empty, error) {
func (c *executableServer) UnregisterFromWorkflow(ctx context.Context, req *pb.UnregisterFromWorkflowRequest) (*emptypb.Empty, error) {
config, err := values.FromMapValueProto(req.Config)
if err != nil {
return nil, fmt.Errorf("could not unmarshal config into map: %w", err)
Expand All @@ -440,22 +439,22 @@ func (c *executableServer) UnregisterFromWorkflow(ctx context.Context, req *capa
return &emptypb.Empty{}, err
}

func (c *executableServer) Execute(reqpb *capabilitiespb.CapabilityRequest, server capabilitiespb.Executable_ExecuteServer) error {
func (c *executableServer) Execute(reqpb *pb.CapabilityRequest, server pb.Executable_ExecuteServer) error {
req, err := pb.CapabilityRequestFromProto(reqpb)
if err != nil {
return fmt.Errorf("could not unmarshal capability request: %w", err)
}

var responseMessage *capabilitiespb.CapabilityResponse
var responseMessage *pb.CapabilityResponse
response, err := c.impl.Execute(server.Context(), req)
if err != nil {
var capabilityError caperrors.Error
if errors.As(err, &capabilityError) {
responseMessage = &capabilitiespb.CapabilityResponse{Error: capabilityError.SerializeToString()}
responseMessage = &pb.CapabilityResponse{Error: capabilityError.SerializeToString()}
} else {
// All other errors are treated as private visibility and are marked as such to prevent accidental or malicious
// reporting of sensitive information by prefixing the error message with the remote reportable identifier.
responseMessage = &capabilitiespb.CapabilityResponse{Error: caperrors.PrePendPrivateVisibilityIdentifier(err.Error())}
responseMessage = &pb.CapabilityResponse{Error: caperrors.PrePendPrivateVisibilityIdentifier(err.Error())}
}
} else {
responseMessage = pb.CapabilityResponseToProto(response)
Expand All @@ -469,13 +468,13 @@ func (c *executableServer) Execute(reqpb *capabilitiespb.CapabilityRequest, serv
}

type executableClient struct {
grpc capabilitiespb.ExecutableClient
grpc pb.ExecutableClient
*net.BrokerExt
}

func newExecutableClient(brokerExt *net.BrokerExt, conn grpc.ClientConnInterface) *executableClient {
return &executableClient{
grpc: capabilitiespb.NewExecutableClient(conn),
grpc: pb.NewExecutableClient(conn),
BrokerExt: brokerExt,
}
}
Expand Down Expand Up @@ -511,9 +510,9 @@ func (c *executableClient) UnregisterFromWorkflow(ctx context.Context, req capab
config = req.Config
}

r := &capabilitiespb.UnregisterFromWorkflowRequest{
r := &pb.UnregisterFromWorkflowRequest{
Config: values.ProtoMap(config),
Metadata: &capabilitiespb.RegistrationMetadata{
Metadata: &pb.RegistrationMetadata{
WorkflowId: req.Metadata.WorkflowID,
ReferenceId: req.Metadata.ReferenceID,
},
Expand All @@ -529,9 +528,9 @@ func (c *executableClient) RegisterToWorkflow(ctx context.Context, req capabilit
config = req.Config
}

r := &capabilitiespb.RegisterToWorkflowRequest{
r := &pb.RegisterToWorkflowRequest{
Config: values.ProtoMap(config),
Metadata: &capabilitiespb.RegistrationMetadata{
Metadata: &pb.RegistrationMetadata{
WorkflowId: req.Metadata.WorkflowID,
ReferenceId: req.Metadata.ReferenceID,
},
Expand All @@ -543,7 +542,7 @@ func (c *executableClient) RegisterToWorkflow(ctx context.Context, req capabilit

func forwardTriggerResponsesToChannel(
ctx context.Context,
receive func() (*capabilitiespb.TriggerResponseMessage, error),
receive func() (*pb.TriggerResponseMessage, error),
) (<-chan capabilities.TriggerResponse, error) {
responseCh := make(chan capabilities.TriggerResponse)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,6 @@ type staticExecutePluginCodec struct {
}

func newStaticExecutePluginCodec(lggr logger.Logger, cfg staticExecutePluginCodecConfig) staticExecutePluginCodec {
lggr = logger.Named(lggr, "staticExecutePluginCodec")
return staticExecutePluginCodec{
staticExecutePluginCodecConfig: cfg,
}
Expand Down
Loading
Loading