Skip to content

Commit 6eda8c3

Browse files
vault: add instrumentation and parallelize observation blob broadcasts (#21737)
* vault: parallelize observation blob broadcasts * vault: fix test lint shadowing * vault: log observation timing * Add KV store operation duration metrics to Vault plugin Instrument all public KVStore methods with a histogram metric (platform_vault_plugin_kv_operation_duration_seconds) to help diagnose high P95 Observation latencies in production. Also adds a max-batch Observation test exercising 10 GetSecrets requests with 10 encryption keys each plus 20 pending queue items. * Increase buckets to account for production usage * Instrument KV and BlobBroadcaster/Fetcher interfaces * Instrument KV and BlobBroadcaster/Fetcher interfaces * Go fmt * Track KV durations as ms * Remove test * Go fmt * vault: guard nil test metrics * vault: restore plugin test import --------- Co-authored-by: Cedric Cordenier <cedric.cordenier@smartcontract.com>
1 parent 1d3fb1d commit 6eda8c3

10 files changed

Lines changed: 1190 additions & 256 deletions

File tree

core/services/ocr2/plugins/vault/kvstore.go

Lines changed: 49 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package vault
22

33
import (
4+
"context"
45
"errors"
56
"fmt"
67
"strconv"
8+
"time"
79

810
"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3_1types"
911
"google.golang.org/protobuf/proto"
@@ -20,38 +22,44 @@ const (
2022
)
2123

2224
type KVStore struct {
23-
reader ocr3_1types.KeyValueStateReader
24-
writer ocr3_1types.KeyValueStateReadWriter
25+
reader ocr3_1types.KeyValueStateReader
26+
writer ocr3_1types.KeyValueStateReadWriter
27+
metrics *pluginMetrics
28+
}
29+
30+
func (s *KVStore) trackDuration(ctx context.Context, method string, start time.Time) {
31+
s.metrics.trackKVOperation(ctx, method, time.Since(start).Milliseconds())
2532
}
2633

2734
type ReadKVStore interface {
28-
GetSecret(id *vault.SecretIdentifier) (*vault.StoredSecret, error)
29-
GetMetadata(owner string) (*vault.StoredMetadata, error)
30-
GetSecretIdentifiersCountForOwner(owner string) (int, error)
31-
GetPendingQueue() ([]*vault.StoredPendingQueueItem, error)
35+
GetSecret(ctx context.Context, id *vault.SecretIdentifier) (*vault.StoredSecret, error)
36+
GetMetadata(ctx context.Context, owner string) (*vault.StoredMetadata, error)
37+
GetSecretIdentifiersCountForOwner(ctx context.Context, owner string) (int, error)
38+
GetPendingQueue(ctx context.Context) ([]*vault.StoredPendingQueueItem, error)
3239
}
3340

3441
type WriteKVStore interface {
3542
ReadKVStore
36-
WriteSecret(id *vault.SecretIdentifier, secret *vault.StoredSecret) error
37-
WriteMetadata(owner string, metadata *vault.StoredMetadata) error
38-
DeleteSecret(id *vault.SecretIdentifier) error
39-
WritePendingQueue(pending []*vault.StoredPendingQueueItem) error
43+
WriteSecret(ctx context.Context, id *vault.SecretIdentifier, secret *vault.StoredSecret) error
44+
WriteMetadata(ctx context.Context, owner string, metadata *vault.StoredMetadata) error
45+
DeleteSecret(ctx context.Context, id *vault.SecretIdentifier) error
46+
WritePendingQueue(ctx context.Context, pending []*vault.StoredPendingQueueItem) error
4047
}
4148

42-
func NewReadStore(reader ocr3_1types.KeyValueStateReader) *KVStore {
43-
return &KVStore{reader: reader}
49+
func NewReadStore(reader ocr3_1types.KeyValueStateReader, metrics *pluginMetrics) *KVStore {
50+
return &KVStore{reader: reader, metrics: metrics}
4451
}
4552

46-
func NewWriteStore(writer ocr3_1types.KeyValueStateReadWriter) *KVStore {
47-
return &KVStore{reader: writer, writer: writer}
53+
func NewWriteStore(writer ocr3_1types.KeyValueStateReadWriter, metrics *pluginMetrics) *KVStore {
54+
return &KVStore{reader: writer, writer: writer, metrics: metrics}
4855
}
4956

50-
func (s *KVStore) GetSecret(id *vault.SecretIdentifier) (*vault.StoredSecret, error) {
57+
func (s *KVStore) GetSecret(ctx context.Context, id *vault.SecretIdentifier) (*vault.StoredSecret, error) {
58+
defer s.trackDuration(ctx, "GetSecret", time.Now())
5159
if id == nil {
5260
return nil, errors.New("id cannot be nil")
5361
}
54-
found, err := s.metadataContainsID(id)
62+
found, err := s.metadataContainsID(ctx, id)
5563
if err != nil {
5664
return nil, fmt.Errorf("failed to check if metadata contains id: %w", err)
5765
}
@@ -77,7 +85,8 @@ func (s *KVStore) GetSecret(id *vault.SecretIdentifier) (*vault.StoredSecret, er
7785
return secret, nil
7886
}
7987

80-
func (s *KVStore) GetMetadata(owner string) (*vault.StoredMetadata, error) {
88+
func (s *KVStore) GetMetadata(ctx context.Context, owner string) (*vault.StoredMetadata, error) {
89+
defer s.trackDuration(ctx, "GetMetadata", time.Now())
8190
b, err := s.reader.Read([]byte(metadataPrefix + owner))
8291
if err != nil {
8392
return nil, fmt.Errorf("failed to read metadata: %w", err)
@@ -95,8 +104,9 @@ func (s *KVStore) GetMetadata(owner string) (*vault.StoredMetadata, error) {
95104
return md, nil
96105
}
97106

98-
func (s *KVStore) GetSecretIdentifiersCountForOwner(owner string) (int, error) {
99-
md, err := s.GetMetadata(owner)
107+
func (s *KVStore) GetSecretIdentifiersCountForOwner(ctx context.Context, owner string) (int, error) {
108+
defer s.trackDuration(ctx, "GetSecretIdentifiersCountForOwner", time.Now())
109+
md, err := s.GetMetadata(ctx, owner)
100110
if err != nil {
101111
return 0, fmt.Errorf("failed to get metadata for owner %s: %w", owner, err)
102112
}
@@ -108,7 +118,8 @@ func (s *KVStore) GetSecretIdentifiersCountForOwner(owner string) (int, error) {
108118
return count, nil
109119
}
110120

111-
func (s *KVStore) WriteMetadata(owner string, metadata *vault.StoredMetadata) error {
121+
func (s *KVStore) WriteMetadata(ctx context.Context, owner string, metadata *vault.StoredMetadata) error {
122+
defer s.trackDuration(ctx, "WriteMetadata", time.Now())
112123
if metadata == nil {
113124
return errors.New("metadata cannot be nil")
114125
}
@@ -125,11 +136,11 @@ func (s *KVStore) WriteMetadata(owner string, metadata *vault.StoredMetadata) er
125136
return nil
126137
}
127138

128-
func (s *KVStore) metadataContainsID(id *vault.SecretIdentifier) (bool, error) {
139+
func (s *KVStore) metadataContainsID(ctx context.Context, id *vault.SecretIdentifier) (bool, error) {
129140
if id == nil {
130141
return false, errors.New("id cannot be nil")
131142
}
132-
md, err := s.GetMetadata(id.Owner)
143+
md, err := s.GetMetadata(ctx, id.Owner)
133144
if err != nil {
134145
return false, fmt.Errorf("failed to get metadata for owner %s: %w", id.Owner, err)
135146
}
@@ -147,11 +158,11 @@ func (s *KVStore) metadataContainsID(id *vault.SecretIdentifier) (bool, error) {
147158
return false, nil
148159
}
149160

150-
func (s *KVStore) addIDToMetadata(id *vault.SecretIdentifier) error {
161+
func (s *KVStore) addIDToMetadata(ctx context.Context, id *vault.SecretIdentifier) error {
151162
if id == nil {
152163
return errors.New("id cannot be nil")
153164
}
154-
md, err := s.GetMetadata(id.Owner)
165+
md, err := s.GetMetadata(ctx, id.Owner)
155166
if err != nil {
156167
return fmt.Errorf("failed to get metadata for owner %s: %w", id.Owner, err)
157168
}
@@ -171,19 +182,19 @@ func (s *KVStore) addIDToMetadata(id *vault.SecretIdentifier) error {
171182
md.SecretIdentifiers = append(md.SecretIdentifiers, id)
172183
}
173184

174-
err = s.WriteMetadata(id.Owner, md)
185+
err = s.WriteMetadata(ctx, id.Owner, md)
175186
if err != nil {
176187
return fmt.Errorf("failed to write metadata for owner %s: %w", id.Owner, err)
177188
}
178189

179190
return nil
180191
}
181192

182-
func (s *KVStore) removeIDFromMetadata(id *vault.SecretIdentifier) error {
193+
func (s *KVStore) removeIDFromMetadata(ctx context.Context, id *vault.SecretIdentifier) error {
183194
if id == nil {
184195
return errors.New("id cannot be nil")
185196
}
186-
md, err := s.GetMetadata(id.Owner)
197+
md, err := s.GetMetadata(ctx, id.Owner)
187198
if err != nil {
188199
return fmt.Errorf("failed to get metadata for owner %s: %w", id.Owner, err)
189200
}
@@ -209,15 +220,16 @@ func (s *KVStore) removeIDFromMetadata(id *vault.SecretIdentifier) error {
209220
newMd := &vault.StoredMetadata{
210221
SecretIdentifiers: si,
211222
}
212-
err = s.WriteMetadata(id.Owner, newMd)
223+
err = s.WriteMetadata(ctx, id.Owner, newMd)
213224
if err != nil {
214225
return fmt.Errorf("failed to write metadata for owner %s: %w", id.Owner, err)
215226
}
216227

217228
return nil
218229
}
219230

220-
func (s *KVStore) WriteSecret(id *vault.SecretIdentifier, secret *vault.StoredSecret) error {
231+
func (s *KVStore) WriteSecret(ctx context.Context, id *vault.SecretIdentifier, secret *vault.StoredSecret) error {
232+
defer s.trackDuration(ctx, "WriteSecret", time.Now())
221233
if id == nil {
222234
return errors.New("id cannot be nil")
223235
}
@@ -231,18 +243,19 @@ func (s *KVStore) WriteSecret(id *vault.SecretIdentifier, secret *vault.StoredSe
231243
return fmt.Errorf("failed to write secret: %w", err)
232244
}
233245

234-
if err := s.addIDToMetadata(id); err != nil {
246+
if err := s.addIDToMetadata(ctx, id); err != nil {
235247
return fmt.Errorf("failed to add id to metadata: %w", err)
236248
}
237249

238250
return nil
239251
}
240252

241-
func (s *KVStore) DeleteSecret(id *vault.SecretIdentifier) error {
253+
func (s *KVStore) DeleteSecret(ctx context.Context, id *vault.SecretIdentifier) error {
254+
defer s.trackDuration(ctx, "DeleteSecret", time.Now())
242255
if id == nil {
243256
return errors.New("id cannot be nil")
244257
}
245-
err := s.removeIDFromMetadata(id)
258+
err := s.removeIDFromMetadata(ctx, id)
246259
if err != nil {
247260
return fmt.Errorf("failed to remove id from metadata: %w", err)
248261
}
@@ -255,7 +268,8 @@ func (s *KVStore) DeleteSecret(id *vault.SecretIdentifier) error {
255268
return nil
256269
}
257270

258-
func (s *KVStore) GetPendingQueue() ([]*vault.StoredPendingQueueItem, error) {
271+
func (s *KVStore) GetPendingQueue(ctx context.Context) ([]*vault.StoredPendingQueueItem, error) {
272+
defer s.trackDuration(ctx, "GetPendingQueue", time.Now())
259273
indexBytes, err := s.reader.Read([]byte(pendingQueueIndex))
260274
if err != nil {
261275
return nil, fmt.Errorf("failed to read pending queue index: %w", err)
@@ -320,7 +334,8 @@ func (s *KVStore) deletePendingQueue() error {
320334
return nil
321335
}
322336

323-
func (s *KVStore) WritePendingQueue(pending []*vault.StoredPendingQueueItem) error {
337+
func (s *KVStore) WritePendingQueue(ctx context.Context, pending []*vault.StoredPendingQueueItem) error {
338+
defer s.trackDuration(ctx, "WritePendingQueue", time.Now())
324339
err := s.deletePendingQueue()
325340
if err != nil {
326341
return fmt.Errorf("failed to delete pending requests: %w", err)

0 commit comments

Comments
 (0)