Skip to content

Commit 6bf7d2c

Browse files
feat: add KVStoreWrapper with owner migration adapter for vault org_id migration (#21691)
* feat: add OwnerMigrationStore adapter for vault org_id migration Add a transparent adapter layer (OwnerMigrationReadStore / OwnerMigrationWriteStore) that sits between the vault plugin and the KV store to handle the migration of secrets from workflow_owner-keyed entries to org_id-keyed entries. The adapter implements the same ReadKVStore / WriteKVStore interfaces and provides: - Dual-owner lookup on reads (org_id first, workflow_owner fallback) - Metadata merging and deduplication across both owners for list operations - org_id-based writes for all new/updated secrets - Lazy migration: deletes legacy workflow_owner entries on update - Dual-owner deletion with cleanup of both owners - Pass-through for pending queue operations (not owner-scoped) This is a standalone component (not wired into the plugin yet) with comprehensive unit tests covering all operations and migration scenarios. * refactor: rename to KVStoreWrapper with internal ownerMigrationAdapter Restructure the owner migration layer into two types: - KVStoreWrapper (exported): the abstraction the plugin interacts with - ownerMigrationAdapter (unexported): internal migration logic orgID/workflowOwner are passed per method call rather than stored in the struct, matching the plugin's one-store-per-call pattern where a single instance processes batches with different owners. Add Criticalw log on unexpected duplicate during metadata merge. Rename files: owner_migration_store.go → kvstore_wrapper.go. Made-with: Cursor * feat: gate KVStoreWrapper with migrationEnabled flag Add migrationEnabled bool to KVStoreWrapper constructor. When false, all methods pass through directly to the inner KVStore, bypassing the ownerMigrationAdapter entirely. The caller resolves the gate from cresettings.Default.VaultOrgIdAsSecretOwnerEnabled. Add 14 unit tests verifying passthrough behavior when disabled. Made-with: Cursor * fix: update KVStoreWrapper for context.Context interface changes after develop merge - Add context.Context as first param to all KVStoreWrapper and adapter methods - Rename test helper to newMigrationTestStore to avoid redeclaration with kvstore_test.go - Update NewWriteStore calls to include pluginMetrics parameter Made-with: Cursor
1 parent 584f557 commit 6bf7d2c

2 files changed

Lines changed: 1203 additions & 0 deletions

File tree

Lines changed: 322 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,322 @@
1+
package vault
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/actions/vault"
8+
"github.com/smartcontractkit/chainlink/v2/core/capabilities/vault/vaulttypes"
9+
"github.com/smartcontractkit/chainlink/v2/core/logger"
10+
)
11+
12+
// KVStoreWrapper provides a migration-aware layer over the underlying KVStore.
13+
// A single instance is created per plugin function call (matching the existing
14+
// one-store-per-call pattern), and orgID/workflowOwner are passed per operation
15+
// since a batch may contain requests from different owners.
16+
//
17+
// When migrationEnabled is false the wrapper is a pure pass-through: every
18+
// call goes directly to the inner store and the orgID/workflowOwner parameters
19+
// are ignored. Gate this with cresettings.Default.VaultOrgIdAsSecretOwnerEnabled.
20+
type KVStoreWrapper struct {
21+
store WriteKVStore
22+
adapter *ownerMigrationAdapter
23+
migrationEnabled bool
24+
}
25+
26+
// NewKVStoreWrapper creates a wrapper around the given store.
27+
// When migrationEnabled is true, an internal ownerMigrationAdapter handles
28+
// the transition from workflow_owner-keyed entries to org_id-keyed entries.
29+
// When false, all operations pass through directly to the inner store.
30+
func NewKVStoreWrapper(store WriteKVStore, migrationEnabled bool, lggr logger.Logger) *KVStoreWrapper {
31+
w := &KVStoreWrapper{
32+
store: store,
33+
migrationEnabled: migrationEnabled,
34+
}
35+
if migrationEnabled {
36+
w.adapter = newOwnerMigrationAdapter(store, lggr)
37+
}
38+
return w
39+
}
40+
41+
// GetSecret tries orgID first, falling back to workflowOwner for legacy entries.
42+
// When migration is disabled, delegates directly to the inner store using id as-is.
43+
func (w *KVStoreWrapper) GetSecret(ctx context.Context, id *vault.SecretIdentifier, orgID, workflowOwner string) (*vault.StoredSecret, error) {
44+
if !w.migrationEnabled {
45+
return w.store.GetSecret(ctx, id)
46+
}
47+
return w.adapter.getSecret(ctx, id, orgID, workflowOwner)
48+
}
49+
50+
// GetMetadata merges metadata from both orgID and workflowOwner, deduplicating
51+
// by namespace::key and rewriting all Owner fields to orgID.
52+
// When migration is disabled, delegates directly to the inner store using orgID.
53+
//
54+
// The merged count cannot exceed the per-owner secret limit: deduplication by
55+
// namespace::key collapses entries that exist under both owners (transient
56+
// mid-migration state) into a single entry, so the result reflects the true
57+
// number of unique secrets the owner has.
58+
func (w *KVStoreWrapper) GetMetadata(ctx context.Context, orgID, workflowOwner string) (*vault.StoredMetadata, error) {
59+
if !w.migrationEnabled {
60+
return w.store.GetMetadata(ctx, orgID)
61+
}
62+
return w.adapter.getMetadata(ctx, orgID, workflowOwner)
63+
}
64+
65+
// GetSecretIdentifiersCountForOwner returns the count of unique secrets across
66+
// both orgID and workflowOwner after deduplication.
67+
// When migration is disabled, delegates directly to the inner store using orgID.
68+
func (w *KVStoreWrapper) GetSecretIdentifiersCountForOwner(ctx context.Context, orgID, workflowOwner string) (int, error) {
69+
if !w.migrationEnabled {
70+
return w.store.GetSecretIdentifiersCountForOwner(ctx, orgID)
71+
}
72+
return w.adapter.getSecretIdentifiersCountForOwner(ctx, orgID, workflowOwner)
73+
}
74+
75+
// WriteSecret writes the secret under orgID. If a legacy entry exists under
76+
// workflowOwner with the same namespace/key, it is deleted (lazy migration).
77+
// When migration is disabled, delegates directly to the inner store.
78+
func (w *KVStoreWrapper) WriteSecret(ctx context.Context, id *vault.SecretIdentifier, secret *vault.StoredSecret, orgID, workflowOwner string) error {
79+
if !w.migrationEnabled {
80+
return w.store.WriteSecret(ctx, id, secret)
81+
}
82+
return w.adapter.writeSecret(ctx, id, secret, orgID, workflowOwner)
83+
}
84+
85+
// WriteMetadata writes metadata under orgID.
86+
// When migration is disabled, delegates directly to the inner store.
87+
func (w *KVStoreWrapper) WriteMetadata(ctx context.Context, orgID string, metadata *vault.StoredMetadata) error {
88+
if !w.migrationEnabled {
89+
return w.store.WriteMetadata(ctx, orgID, metadata)
90+
}
91+
return w.adapter.writeMetadata(ctx, orgID, metadata)
92+
}
93+
94+
// DeleteSecret deletes the secret from orgID if present, falling back to
95+
// workflowOwner for legacy entries. If the secret exists under both owners
96+
// (transient mid-migration state), both entries are deleted.
97+
// When migration is disabled, delegates directly to the inner store.
98+
func (w *KVStoreWrapper) DeleteSecret(ctx context.Context, id *vault.SecretIdentifier, orgID, workflowOwner string) error {
99+
if !w.migrationEnabled {
100+
return w.store.DeleteSecret(ctx, id)
101+
}
102+
return w.adapter.deleteSecret(ctx, id, orgID, workflowOwner)
103+
}
104+
105+
// GetPendingQueue is always a pass-through (pending queue is not owner-scoped).
106+
func (w *KVStoreWrapper) GetPendingQueue(ctx context.Context) ([]*vault.StoredPendingQueueItem, error) {
107+
return w.store.GetPendingQueue(ctx)
108+
}
109+
110+
// WritePendingQueue is always a pass-through (pending queue is not owner-scoped).
111+
func (w *KVStoreWrapper) WritePendingQueue(ctx context.Context, pending []*vault.StoredPendingQueueItem) error {
112+
return w.store.WritePendingQueue(ctx, pending)
113+
}
114+
115+
// ownerMigrationAdapter handles the migration of secrets from workflow_owner-keyed
116+
// entries to org_id-keyed entries. It performs dual-lookup reads, org_id-based
117+
// writes, lazy migration on update, metadata merge for list, and dual-owner
118+
// deletion.
119+
type ownerMigrationAdapter struct {
120+
store WriteKVStore
121+
lggr logger.Logger
122+
}
123+
124+
func newOwnerMigrationAdapter(store WriteKVStore, lggr logger.Logger) *ownerMigrationAdapter {
125+
return &ownerMigrationAdapter{store: store, lggr: lggr}
126+
}
127+
128+
func (a *ownerMigrationAdapter) getSecret(ctx context.Context, id *vault.SecretIdentifier, orgID, workflowOwner string) (*vault.StoredSecret, error) {
129+
if id == nil {
130+
return a.store.GetSecret(ctx, id)
131+
}
132+
133+
orgIDSid := withOwner(id, orgID)
134+
secret, err := a.store.GetSecret(ctx, orgIDSid)
135+
if err != nil {
136+
return nil, err
137+
}
138+
if secret != nil {
139+
return secret, nil
140+
}
141+
142+
if !needsMigration(orgID, workflowOwner) {
143+
return nil, nil
144+
}
145+
146+
woSid := withOwner(id, workflowOwner)
147+
return a.store.GetSecret(ctx, woSid)
148+
}
149+
150+
func (a *ownerMigrationAdapter) getMetadata(ctx context.Context, orgID, workflowOwner string) (*vault.StoredMetadata, error) {
151+
orgMd, err := a.store.GetMetadata(ctx, orgID)
152+
if err != nil {
153+
return nil, err
154+
}
155+
156+
if !needsMigration(orgID, workflowOwner) {
157+
return orgMd, nil
158+
}
159+
160+
woMd, err := a.store.GetMetadata(ctx, workflowOwner)
161+
if err != nil {
162+
return nil, err
163+
}
164+
165+
return mergeMetadata(orgMd, woMd, orgID, a.lggr), nil
166+
}
167+
168+
func (a *ownerMigrationAdapter) getSecretIdentifiersCountForOwner(ctx context.Context, orgID, workflowOwner string) (int, error) {
169+
md, err := a.getMetadata(ctx, orgID, workflowOwner)
170+
if err != nil {
171+
return 0, err
172+
}
173+
if md == nil {
174+
return 0, nil
175+
}
176+
return len(md.SecretIdentifiers), nil
177+
}
178+
179+
func (a *ownerMigrationAdapter) writeSecret(ctx context.Context, id *vault.SecretIdentifier, secret *vault.StoredSecret, orgID, workflowOwner string) error {
180+
if id == nil {
181+
return a.store.WriteSecret(ctx, id, secret)
182+
}
183+
184+
orgIDSid := withOwner(id, orgID)
185+
if err := a.store.WriteSecret(ctx, orgIDSid, secret); err != nil {
186+
return err
187+
}
188+
189+
if !needsMigration(orgID, workflowOwner) {
190+
return nil
191+
}
192+
193+
woSid := withOwner(id, workflowOwner)
194+
legacySecret, err := a.store.GetSecret(ctx, woSid)
195+
if err != nil {
196+
return fmt.Errorf("failed to check for legacy entry during write: %w", err)
197+
}
198+
if legacySecret != nil {
199+
if err := a.store.DeleteSecret(ctx, woSid); err != nil {
200+
return fmt.Errorf("failed to delete legacy entry during migration: %w", err)
201+
}
202+
}
203+
204+
return nil
205+
}
206+
207+
func (a *ownerMigrationAdapter) writeMetadata(ctx context.Context, orgID string, metadata *vault.StoredMetadata) error {
208+
return a.store.WriteMetadata(ctx, orgID, metadata)
209+
}
210+
211+
func (a *ownerMigrationAdapter) deleteSecret(ctx context.Context, id *vault.SecretIdentifier, orgID, workflowOwner string) error {
212+
if id == nil {
213+
return a.store.DeleteSecret(ctx, id)
214+
}
215+
216+
orgIDSid := withOwner(id, orgID)
217+
orgSecret, err := a.store.GetSecret(ctx, orgIDSid)
218+
if err != nil {
219+
return fmt.Errorf("failed to check org_id entry for deletion: %w", err)
220+
}
221+
if orgSecret != nil {
222+
if err := a.store.DeleteSecret(ctx, orgIDSid); err != nil {
223+
return fmt.Errorf("failed to delete org_id entry: %w", err)
224+
}
225+
if needsMigration(orgID, workflowOwner) {
226+
woSid := withOwner(id, workflowOwner)
227+
woSecret, woErr := a.store.GetSecret(ctx, woSid)
228+
if woErr != nil {
229+
return fmt.Errorf("failed to check legacy entry after org_id deletion: %w", woErr)
230+
}
231+
if woSecret != nil {
232+
if woErr = a.store.DeleteSecret(ctx, woSid); woErr != nil {
233+
return fmt.Errorf("failed to clean up legacy entry after org_id deletion: %w", woErr)
234+
}
235+
}
236+
}
237+
return nil
238+
}
239+
240+
if needsMigration(orgID, workflowOwner) {
241+
woSid := withOwner(id, workflowOwner)
242+
woSecret, woErr := a.store.GetSecret(ctx, woSid)
243+
if woErr != nil {
244+
return fmt.Errorf("failed to check legacy entry for deletion: %w", woErr)
245+
}
246+
if woSecret != nil {
247+
return a.store.DeleteSecret(ctx, woSid)
248+
}
249+
}
250+
251+
// Not found under either owner — delegate to inner which will produce
252+
// the appropriate error from metadata removal.
253+
return a.store.DeleteSecret(ctx, orgIDSid)
254+
}
255+
256+
// --- shared helpers ---
257+
258+
func withOwner(id *vault.SecretIdentifier, owner string) *vault.SecretIdentifier {
259+
return &vault.SecretIdentifier{
260+
Key: id.Key,
261+
Namespace: id.Namespace,
262+
Owner: owner,
263+
}
264+
}
265+
266+
func needsMigration(orgID, workflowOwner string) bool {
267+
return workflowOwner != "" && workflowOwner != orgID
268+
}
269+
270+
// mergeMetadata combines metadata from org_id and workflow_owner, deduplicating
271+
// by namespace::key and rewriting all Owner fields to orgID.
272+
func mergeMetadata(orgMd, woMd *vault.StoredMetadata, orgID string, lggr logger.Logger) *vault.StoredMetadata {
273+
if orgMd == nil && woMd == nil {
274+
return nil
275+
}
276+
277+
seen := map[string]bool{}
278+
var merged []*vault.SecretIdentifier
279+
280+
addEntries := func(md *vault.StoredMetadata, source string) {
281+
if md == nil {
282+
return
283+
}
284+
for _, id := range md.SecretIdentifiers {
285+
dk := deduplicationKey(id)
286+
if seen[dk] {
287+
lggr.Criticalw(
288+
"duplicate secret identifier found during owner migration metadata merge",
289+
"orgID", orgID,
290+
"duplicateKey", dk,
291+
"namespace", id.Namespace,
292+
"key", id.Key,
293+
"owner", id.Owner,
294+
"source", source,
295+
)
296+
continue
297+
}
298+
seen[dk] = true
299+
merged = append(merged, &vault.SecretIdentifier{
300+
Key: id.Key,
301+
Namespace: id.Namespace,
302+
Owner: orgID,
303+
})
304+
}
305+
}
306+
307+
// org_id entries take priority in deduplication.
308+
addEntries(orgMd, "org_id")
309+
addEntries(woMd, "workflow_owner")
310+
311+
return &vault.StoredMetadata{
312+
SecretIdentifiers: merged,
313+
}
314+
}
315+
316+
func deduplicationKey(id *vault.SecretIdentifier) string {
317+
namespace := id.Namespace
318+
if namespace == "" {
319+
namespace = vaulttypes.DefaultNamespace
320+
}
321+
return namespace + "::" + id.Key
322+
}

0 commit comments

Comments
 (0)