Skip to content

Commit db01bba

Browse files
Merge origin/develop into codex/vault-request-authorizer-in-gw
2 parents 712c362 + 544f9aa commit db01bba

33 files changed

Lines changed: 774 additions & 185 deletions

File tree

core/capabilities/remote/executable/request/server_request.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,11 @@ func (e *ServerRequest) Expired() bool {
208208
return time.Since(e.createdTime) > e.requestTimeout
209209
}
210210

211+
func (e *ServerRequest) Evictable(minRetention time.Duration) bool {
212+
age := time.Since(e.createdTime)
213+
return age > e.requestTimeout && age > minRetention
214+
}
215+
211216
func (e *ServerRequest) Cancel(ctx context.Context, err types.Error, msg string) error {
212217
e.mux.Lock()
213218
defer e.mux.Unlock()

core/capabilities/remote/executable/request/server_request_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,44 @@ func Test_ServerRequest_MessageValidation(t *testing.T) {
338338
})
339339
}
340340

341+
func Test_ServerRequest_Evictable(t *testing.T) {
342+
lggr := logger.Test(t)
343+
capability := TestCapability{}
344+
capabilityPeerID := NewP2PPeerID(t)
345+
workflowPeer := NewP2PPeerID(t)
346+
347+
callingDon := commoncap.DON{
348+
Members: []p2ptypes.PeerID{workflowPeer},
349+
ID: 1,
350+
F: 0,
351+
}
352+
353+
newRequest := func(requestTimeout time.Duration) *request.ServerRequest {
354+
req, err := request.NewServerRequest(capability, types.MethodExecute, "capabilityID", 2,
355+
capabilityPeerID, callingDon, "requestMessageID", &testDispatcher{}, requestTimeout, "", lggr)
356+
require.NoError(t, err)
357+
return req
358+
}
359+
360+
t.Run("expired but below minimum retention", func(t *testing.T) {
361+
req := newRequest(20 * time.Millisecond)
362+
time.Sleep(60 * time.Millisecond)
363+
assert.False(t, req.Evictable(200*time.Millisecond))
364+
})
365+
366+
t.Run("expired and retained past minimum retention", func(t *testing.T) {
367+
req := newRequest(20 * time.Millisecond)
368+
time.Sleep(60 * time.Millisecond)
369+
assert.True(t, req.Evictable(10*time.Millisecond))
370+
})
371+
372+
t.Run("minimum retention elapsed but request timeout still active", func(t *testing.T) {
373+
req := newRequest(200 * time.Millisecond)
374+
time.Sleep(60 * time.Millisecond)
375+
assert.False(t, req.Evictable(10*time.Millisecond))
376+
})
377+
}
378+
341379
type serverRequest interface {
342380
OnMessage(ctx context.Context, msg *types.MessageBody) error
343381
}

core/capabilities/remote/executable/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ func (r *server) expireRequests() {
222222
defer r.receiveLock.Unlock()
223223

224224
for requestID, executeReq := range r.requestIDToRequest {
225-
if executeReq.request.Expired() {
225+
if executeReq.request.Evictable(commoncap.DefaultExecutableRequestTimeout) {
226226
ctx, cancelFn := r.stopCh.NewCtx()
227227
err := executeReq.request.Cancel(ctx, types.Error_TIMEOUT, "request expired by executable server")
228228
cancelFn()

core/capabilities/remote/executable/server_test.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1000,3 +1000,101 @@ func Test_Server_Execute_WithConcurrentSetConfig(t *testing.T) {
10001000
expectedResponses := numWorkflowPeers * numExecuteCalls
10011001
require.Equal(t, expectedResponses, successCount)
10021002
}
1003+
1004+
func Test_Server_DuplicateRequestRemainsDedupedPastRequestTimeout(t *testing.T) {
1005+
ctx := testutils.Context(t)
1006+
lggr := logger.Test(t)
1007+
1008+
serverPeerID := NewP2PPeerID(t)
1009+
senderPeerID := NewP2PPeerID(t)
1010+
1011+
dispatcher := &noopDispatcher{}
1012+
server := executable.NewServer("cap_id@1.0.0", "", serverPeerID, dispatcher, lggr)
1013+
1014+
cfg := &commoncap.RemoteExecutableConfig{
1015+
RequestTimeout: 20 * time.Millisecond,
1016+
ServerMaxParallelRequests: 1,
1017+
}
1018+
capInfo := commoncap.CapabilityInfo{
1019+
ID: "cap_id@1.0.0",
1020+
CapabilityType: commoncap.CapabilityTypeTarget,
1021+
}
1022+
localDON := commoncap.DON{
1023+
ID: 1,
1024+
Members: []p2ptypes.PeerID{serverPeerID},
1025+
F: 0,
1026+
}
1027+
workflowDONs := map[uint32]commoncap.DON{
1028+
2: {
1029+
ID: 2,
1030+
Members: []p2ptypes.PeerID{senderPeerID},
1031+
F: 0,
1032+
},
1033+
}
1034+
1035+
require.NoError(t, server.SetConfig(cfg, TestCapability{}, capInfo, localDON, workflowDONs, nil))
1036+
require.NoError(t, server.Start(ctx))
1037+
defer func() {
1038+
require.NoError(t, server.Close())
1039+
}()
1040+
1041+
inputs, err := values.NewMap(map[string]any{"executeValue1": "aValue1"})
1042+
require.NoError(t, err)
1043+
rawRequest, err := pb.MarshalCapabilityRequest(commoncap.CapabilityRequest{
1044+
Metadata: commoncap.RequestMetadata{
1045+
WorkflowExecutionID: "exec-1",
1046+
},
1047+
Inputs: inputs,
1048+
})
1049+
require.NoError(t, err)
1050+
1051+
msg := &remotetypes.MessageBody{
1052+
CapabilityId: capInfo.ID,
1053+
CapabilityDonId: localDON.ID,
1054+
CallerDonId: 2,
1055+
Method: remotetypes.MethodExecute,
1056+
Payload: rawRequest,
1057+
MessageId: []byte(remotetypes.MethodExecute + ":exec-1"),
1058+
Sender: senderPeerID[:],
1059+
Receiver: serverPeerID[:],
1060+
}
1061+
1062+
server.Receive(ctx, msg)
1063+
require.Eventually(t, func() bool { return len(dispatcher.sent) == 1 }, time.Second, 10*time.Millisecond)
1064+
1065+
time.Sleep(2 * cfg.RequestTimeout)
1066+
server.Receive(ctx, msg)
1067+
1068+
time.Sleep(100 * time.Millisecond)
1069+
require.Len(t, dispatcher.sent, 1)
1070+
}
1071+
1072+
type noopDispatcher struct {
1073+
services.StateMachine
1074+
sent []*remotetypes.MessageBody
1075+
}
1076+
1077+
func (n *noopDispatcher) Name() string { return "noopDispatcher" }
1078+
1079+
func (n *noopDispatcher) Start(context.Context) error { return nil }
1080+
1081+
func (n *noopDispatcher) Close() error { return nil }
1082+
1083+
func (n *noopDispatcher) Ready() error { return nil }
1084+
1085+
func (n *noopDispatcher) HealthReport() map[string]error { return nil }
1086+
1087+
func (n *noopDispatcher) SetReceiver(string, uint32, remotetypes.Receiver) error { return nil }
1088+
1089+
func (n *noopDispatcher) RemoveReceiver(string, uint32) {}
1090+
1091+
func (n *noopDispatcher) SetReceiverForMethod(string, uint32, string, remotetypes.Receiver) error {
1092+
return nil
1093+
}
1094+
1095+
func (n *noopDispatcher) RemoveReceiverForMethod(string, uint32, string) {}
1096+
1097+
func (n *noopDispatcher) Send(peerID p2ptypes.PeerID, msgBody *remotetypes.MessageBody) error {
1098+
n.sent = append(n.sent, msgBody)
1099+
return nil
1100+
}

core/capabilities/vault/gw_handler.go

Lines changed: 2 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,13 @@ package vault
22

33
import (
44
"context"
5-
"encoding/hex"
65
"encoding/json"
76
"errors"
87
"fmt"
9-
"sort"
108
"strings"
119

1210
"go.opentelemetry.io/otel/attribute"
1311
"go.opentelemetry.io/otel/metric"
14-
"google.golang.org/protobuf/proto"
1512

1613
"github.com/smartcontractkit/chainlink-common/pkg/beholder"
1714
vaultcommon "github.com/smartcontractkit/chainlink-common/pkg/capabilities/actions/vault"
@@ -63,22 +60,20 @@ type GatewayHandler struct {
6360
services.Service
6461
eng *services.Engine
6562

66-
capRegistry core.CapabilitiesRegistry
6763
secretsService vaulttypes.SecretsService
6864
gatewayConnector gatewayConnector
6965
requestAuthorizer RequestAuthorizer
7066
lggr logger.Logger
7167
metrics *metrics
7268
}
7369

74-
func NewGatewayHandler(capabilitiesRegistry core.CapabilitiesRegistry, secretsService vaulttypes.SecretsService, connector gatewayConnector, requestAuthorizer RequestAuthorizer, lggr logger.Logger) (*GatewayHandler, error) {
70+
func NewGatewayHandler(secretsService vaulttypes.SecretsService, connector gatewayConnector, requestAuthorizer RequestAuthorizer, lggr logger.Logger) (*GatewayHandler, error) {
7571
metrics, err := newMetrics()
7672
if err != nil {
7773
return nil, fmt.Errorf("failed to create metrics: %w", err)
7874
}
7975

8076
gh := &GatewayHandler{
81-
capRegistry: capabilitiesRegistry,
8277
secretsService: secretsService,
8378
gatewayConnector: connector,
8479
requestAuthorizer: requestAuthorizer,
@@ -112,7 +107,7 @@ func (h *GatewayHandler) ID(ctx context.Context) (string, error) {
112107
}
113108

114109
func (h *GatewayHandler) Methods() []string {
115-
return vaulttypes.GetSupportedMethods(h.lggr)
110+
return vaulttypes.Methods
116111
}
117112

118113
func (h *GatewayHandler) HandleGatewayMessage(ctx context.Context, gatewayID string, req *jsonrpc.Request[json.RawMessage]) (err error) {
@@ -127,8 +122,6 @@ func (h *GatewayHandler) HandleGatewayMessage(ctx context.Context, gatewayID str
127122
break
128123
}
129124
response = h.handleSecretsCreate(ctx, gatewayID, req, owner)
130-
case vaulttypes.MethodSecretsGet:
131-
response = h.handleSecretsGet(ctx, gatewayID, req)
132125
case vaulttypes.MethodSecretsUpdate:
133126
owner, authErr := h.authorizeAndPrefixRequest(ctx, req)
134127
if authErr != nil {
@@ -256,51 +249,6 @@ func (h *GatewayHandler) handleSecretsUpdate(ctx context.Context, gatewayID stri
256249
return jsonResponse
257250
}
258251

259-
func (h *GatewayHandler) handleSecretsGet(ctx context.Context, gatewayID string, req *jsonrpc.Request[json.RawMessage]) *jsonrpc.Response[json.RawMessage] {
260-
var request vaultcommon.GetSecretsRequest
261-
if err := json.Unmarshal(*req.Params, &request); err != nil {
262-
return h.errorResponse(ctx, gatewayID, req, api.UserMessageParseError, err)
263-
}
264-
encryptionKeys, err := h.getEncryptionKeys(ctx)
265-
if err != nil {
266-
return h.errorResponse(ctx, gatewayID, req, api.FatalError, err)
267-
}
268-
getSecretsRequest := vaultcommon.GetSecretsRequest{}
269-
for _, reqItem := range request.Requests {
270-
getSecretsRequest.Requests = append(getSecretsRequest.Requests, &vaultcommon.SecretRequest{
271-
Id: &vaultcommon.SecretIdentifier{
272-
Owner: reqItem.Id.Owner,
273-
Namespace: reqItem.Id.Namespace,
274-
Key: reqItem.Id.Key,
275-
},
276-
EncryptionKeys: encryptionKeys,
277-
})
278-
}
279-
vaultCapResponse, err := h.secretsService.GetSecrets(ctx, req.ID, &getSecretsRequest)
280-
if err != nil {
281-
return h.errorResponse(ctx, gatewayID, req, api.FatalError, err)
282-
}
283-
284-
vaultResponseProto := &vaultcommon.GetSecretsResponse{}
285-
err = proto.Unmarshal(vaultCapResponse.Payload, vaultResponseProto)
286-
if err != nil {
287-
h.lggr.Errorf("Debugging: handleSecretsCreate failed to unmarshal response: %s. Payload was: %s", err.Error(), string(vaultCapResponse.Payload))
288-
return h.errorResponse(ctx, gatewayID, req, api.NodeReponseEncodingError, err)
289-
}
290-
291-
vaultAPIResponseBytes, err := json.Marshal(vaultResponseProto)
292-
if err != nil {
293-
return h.errorResponse(ctx, gatewayID, req, api.NodeReponseEncodingError, err)
294-
}
295-
vaultAPIResponseJSON := json.RawMessage(vaultAPIResponseBytes)
296-
return &jsonrpc.Response[json.RawMessage]{
297-
Version: jsonrpc.JsonRpcVersion,
298-
ID: req.ID,
299-
Method: req.Method,
300-
Result: &vaultAPIResponseJSON,
301-
}
302-
}
303-
304252
func (h *GatewayHandler) handleSecretsDelete(ctx context.Context, gatewayID string, req *jsonrpc.Request[json.RawMessage], owner string) *jsonrpc.Response[json.RawMessage] {
305253
r := &vaultcommon.DeleteSecretsRequest{}
306254
if err := json.Unmarshal(*req.Params, r); err != nil {
@@ -408,26 +356,6 @@ func (h *GatewayHandler) errorResponse(
408356
}
409357
}
410358

411-
// getEncryptionKeys retrieves the encryption keys of all members in the Workflow DON.
412-
func (h *GatewayHandler) getEncryptionKeys(ctx context.Context) ([]string, error) {
413-
myNode, err := h.capRegistry.LocalNode(ctx)
414-
if err != nil {
415-
return nil, errors.New("failed to get local node from registry" + err.Error())
416-
}
417-
418-
encryptionKeys := make([]string, 0, len(myNode.WorkflowDON.Members))
419-
for _, peerID := range myNode.WorkflowDON.Members {
420-
peerNode, err := h.capRegistry.NodeByPeerID(ctx, peerID)
421-
if err != nil {
422-
return nil, errors.New("failed to get node info for peerID: " + peerID.String() + " - " + err.Error())
423-
}
424-
encryptionKeys = append(encryptionKeys, hex.EncodeToString(peerNode.EncryptionPublicKey[:]))
425-
}
426-
// Sort the encryption keys to ensure consistent ordering across all nodes.
427-
sort.Strings(encryptionKeys)
428-
return encryptionKeys, nil
429-
}
430-
431359
func toJSONResponse(vaultCapResponse *vaulttypes.Response, method string) (*jsonrpc.Response[json.RawMessage], error) {
432360
vaultResponseBytes, err := vaultCapResponse.ToJSONRPCResult()
433361
if err != nil {

core/capabilities/vault/gw_handler_test.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111

1212
vaultcommon "github.com/smartcontractkit/chainlink-common/pkg/capabilities/actions/vault"
1313
jsonrpc "github.com/smartcontractkit/chainlink-common/pkg/jsonrpc2"
14-
core_mocks "github.com/smartcontractkit/chainlink-common/pkg/types/core/mocks"
1514
vaultcap "github.com/smartcontractkit/chainlink/v2/core/capabilities/vault"
1615
vaultcapmocks "github.com/smartcontractkit/chainlink/v2/core/capabilities/vault/mocks"
1716
"github.com/smartcontractkit/chainlink/v2/core/capabilities/vault/vaulttypes"
@@ -162,7 +161,6 @@ func TestGatewayHandler_HandleGatewayMessage(t *testing.T) {
162161
RequestId: "test-secret",
163162
Ids: []*vaultcommon.SecretIdentifier{
164163
{
165-
166164
Key: "Foo",
167165
Namespace: "Bar",
168166
Owner: "0xAbC",
@@ -243,12 +241,11 @@ func TestGatewayHandler_HandleGatewayMessage(t *testing.T) {
243241
t.Run(tt.name, func(t *testing.T) {
244242
secretsService := vaulttypesmocks.NewSecretsService(t)
245243
gwConnector := connector_mocks.NewGatewayConnector(t)
246-
capRegistry := core_mocks.NewCapabilitiesRegistry(t)
247244
requestAuthorizer := vaultcapmocks.NewRequestAuthorizer(t)
248245

249246
tt.setupMocks(secretsService, gwConnector, requestAuthorizer)
250247

251-
handler, err := vaultcap.NewGatewayHandler(capRegistry, secretsService, gwConnector, requestAuthorizer, lggr)
248+
handler, err := vaultcap.NewGatewayHandler(secretsService, gwConnector, requestAuthorizer, lggr)
252249
require.NoError(t, err)
253250

254251
err = handler.HandleGatewayMessage(ctx, "gateway-1", tt.request)
@@ -268,20 +265,19 @@ func TestGatewayHandler_Lifecycle(t *testing.T) {
268265

269266
secretsService := vaulttypesmocks.NewSecretsService(t)
270267
gwConnector := connector_mocks.NewGatewayConnector(t)
271-
capRegistry := core_mocks.NewCapabilitiesRegistry(t)
272268
requestAuthorizer := vaultcapmocks.NewRequestAuthorizer(t)
273269

274-
handler, err := vaultcap.NewGatewayHandler(capRegistry, secretsService, gwConnector, requestAuthorizer, lggr)
270+
handler, err := vaultcap.NewGatewayHandler(secretsService, gwConnector, requestAuthorizer, lggr)
275271
require.NoError(t, err)
276272

277273
t.Run("start", func(t *testing.T) {
278-
gwConnector.On("AddHandler", mock.Anything, vaulttypes.GetSupportedMethods(lggr), handler).Return(nil).Once()
274+
gwConnector.On("AddHandler", mock.Anything, vaulttypes.Methods, handler).Return(nil).Once()
279275
err := handler.Start(ctx)
280276
require.NoError(t, err)
281277
})
282278

283279
t.Run("close", func(t *testing.T) {
284-
gwConnector.On("RemoveHandler", mock.Anything, vaulttypes.GetSupportedMethods(lggr)).Return(nil).Once()
280+
gwConnector.On("RemoveHandler", mock.Anything, vaulttypes.Methods).Return(nil).Once()
285281
err := handler.Close()
286282
require.NoError(t, err)
287283
})

0 commit comments

Comments
 (0)