Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/large-wombats-fall.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink-deployments-framework": patch
---

fix(datastore): preserve staged deletes across Merge composition
36 changes: 32 additions & 4 deletions datastore/memory_datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,26 @@ func (s *MemoryDataStore) WriteMetadata(bundle MetadataBundle, opts ...WriteMeta
return WriteMetadataToDataStore(s, bundle, opts...)
}

// Merge merges the given mutable data store into the current MemoryDataStore.
// Merge applies records and staged deletions from other onto this MemoryDataStore.
//
// Staged deletions (other.<Store>.DeletedRemoteKeys) are propagated by appending the
// key to s.<Store>.DeletedRemoteKeys via RemoteDelete and then removing the record
// from s.<Store>.Records via Delete (tolerating the per-store NotFound sentinel).
// The DeletedRemoteKeys append is what lets chained operations preserve delete intent
// across intermediate Merges.
//
// NotFound on the Delete step is tolerated because the source's staged key may
// legitimately not be present in the destination's Records (RemoteDelete is allowed
// to stage deletes for records that exist only in the remote backing store), and
// because we want repeated Merges of the same source to be idempotent.
//
// Precedence: a live record in other.<Store>.Records overrides a staged delete in
// s.<Store>.DeletedRemoteKeys for the same key. This is a side-effect of Upsert,
// which clears the key from DeletedRemoteKeys whenever a record is upserted (so a
// direct `RemoteDelete(k); Upsert(rec-with-k)` cancels the staged delete on the
// same store). Staged deletes are only "sticky" on the source side. Callers that
// need a destination-side staged delete to survive Merge must ensure the source
// either omits the live record or stages the delete itself.
Comment on lines +78 to +84

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar here too? Do users need to know all this before using it? I find readin this kind of overwhelming :D

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this context is important for explaining the behavior of Merge(), as some aspects of its implementation can otherwise appear opaque or difficult to understand.

func (s *MemoryDataStore) Merge(other DataStore) error {
// Fetch address ref records from the other data store
addressRefs, err := other.Addresses().Fetch()
Expand All @@ -84,7 +103,10 @@ func (s *MemoryDataStore) Merge(other DataStore) error {
if keyErr != nil {
return fmt.Errorf("failed to parse address ref key: %w", keyErr)
}
if err = s.AddressRefStore.Delete(key); err != nil {
if err = s.AddressRefStore.RemoteDelete(key); err != nil {
return err
}
if err = s.AddressRefStore.Delete(key); err != nil && !errors.Is(err, ErrAddressRefNotFound) {
return err
}
}
Expand All @@ -109,7 +131,10 @@ func (s *MemoryDataStore) Merge(other DataStore) error {
if keyErr != nil {
return fmt.Errorf("failed to parse chain metadata key: %w", keyErr)
}
if err = s.ChainMetadataStore.Delete(key); err != nil {
if err = s.ChainMetadataStore.RemoteDelete(key); err != nil {
return err
}
if err = s.ChainMetadataStore.Delete(key); err != nil && !errors.Is(err, ErrChainMetadataNotFound) {
return err
}
}
Expand All @@ -135,7 +160,10 @@ func (s *MemoryDataStore) Merge(other DataStore) error {
if keyErr != nil {
return fmt.Errorf("failed to parse contract metadata key: %w", keyErr)
}
if err = s.ContractMetadataStore.Delete(key); err != nil {
if err = s.ContractMetadataStore.RemoteDelete(key); err != nil {
return err
}
if err = s.ContractMetadataStore.Delete(key); err != nil && !errors.Is(err, ErrContractMetadataNotFound) {
return err
}
}
Expand Down
136 changes: 127 additions & 9 deletions datastore/memory_datastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ func TestMemoryDataStore_Merge(t *testing.T) {
excpectedChainMetadataCount int
expectedContractMetadataCount int
expectedError error
expectedAddressDRK []string
expectedChainMetaDRK []string
expectedContractMetaDRK []string
}{
{
name: "Merge single address",
Expand All @@ -72,7 +75,7 @@ func TestMemoryDataStore_Merge(t *testing.T) {
expectedContractMetadataCount: 1,
},
{
name: "Merge deletions errors: delete address ref record that does not exist produces an error",
name: "Merge propagate deletions: stages delete for address ref record not present in destination",
setup: func() (*MemoryDataStore, *MemoryDataStore) {
dataStore1 := NewMemoryDataStore()
dataStore2 := NewMemoryDataStore()
Expand All @@ -90,17 +93,18 @@ func TestMemoryDataStore_Merge(t *testing.T) {
require.NoError(t, err, "Adding env metadata to dataStore1 should not fail")

// dataStore2 stages a record for deletion that does not exist in dataStore1
require.NoError(t, dataStore2.Addresses().RemoteDelete(NewAddressRefKey(0, "typeA", semver.MustParse("1.0.0"), "q")))
stagedKey := NewAddressRefKey(0, "typeA", semver.MustParse("1.0.0"), "q")
require.NoError(t, dataStore2.Addresses().RemoteDelete(stagedKey))

return dataStore1, dataStore2
},
expectedAddrRefsCount: 1,
excpectedChainMetadataCount: 1,
expectedContractMetadataCount: 1,
expectedError: ErrAddressRefNotFound,
expectedAddressDRK: []string{NewAddressRefKey(0, "typeA", semver.MustParse("1.0.0"), "q").String()},
},
{
name: "Merge deletions errors: delete chain metadata record that does not exist produces an error",
name: "Merge propagate deletions: stages delete for chain metadata record not present in destination",
setup: func() (*MemoryDataStore, *MemoryDataStore) {
dataStore1 := NewMemoryDataStore()
dataStore2 := NewMemoryDataStore()
Expand All @@ -118,17 +122,18 @@ func TestMemoryDataStore_Merge(t *testing.T) {
require.NoError(t, err, "Adding env metadata to dataStore2 should not fail")

// dataStore2 stages a record for deletion that does not exist in dataStore1
require.NoError(t, dataStore2.ChainMetadata().RemoteDelete(NewChainMetadataKey(10)))
stagedKey := NewChainMetadataKey(10)
require.NoError(t, dataStore2.ChainMetadata().RemoteDelete(stagedKey))

return dataStore1, dataStore2
},
expectedAddrRefsCount: 1,
excpectedChainMetadataCount: 1,
expectedContractMetadataCount: 1,
expectedError: ErrChainMetadataNotFound,
expectedChainMetaDRK: []string{NewChainMetadataKey(10).String()},
},
{
name: "Merge deletions errors: delete contract metadata record that does not exist produces an error",
name: "Merge propagate deletions: stages delete for contract metadata record not present in destination",
setup: func() (*MemoryDataStore, *MemoryDataStore) {
dataStore1 := NewMemoryDataStore()
dataStore2 := NewMemoryDataStore()
Expand All @@ -146,14 +151,15 @@ func TestMemoryDataStore_Merge(t *testing.T) {
require.NoError(t, err, "Adding env metadata to dataStore2 should not fail")

// dataStore2 stages a record for deletion that does not exist in dataStore1
require.NoError(t, dataStore2.ContractMetadata().RemoteDelete(NewContractMetadataKey(10, "0x111")))
stagedKey := NewContractMetadataKey(10, "0x111")
require.NoError(t, dataStore2.ContractMetadata().RemoteDelete(stagedKey))

return dataStore1, dataStore2
},
expectedAddrRefsCount: 1,
excpectedChainMetadataCount: 1,
expectedContractMetadataCount: 1,
expectedError: ErrContractMetadataNotFound,
expectedContractMetaDRK: []string{NewContractMetadataKey(10, "0x111").String()},
},
{
name: "Merge propagate deletions: deletes record from remote data store",
Expand Down Expand Up @@ -183,6 +189,9 @@ func TestMemoryDataStore_Merge(t *testing.T) {
expectedAddrRefsCount: 0,
excpectedChainMetadataCount: 0,
expectedContractMetadataCount: 0,
expectedAddressDRK: []string{addressRefRecord.Key().String()},
expectedChainMetaDRK: []string{chainMetadataRecord.Key().String()},
expectedContractMetaDRK: []string{contractMetadataRecord.Key().String()},
},
{
name: "Match existing address with labels",
Expand Down Expand Up @@ -275,6 +284,115 @@ func TestMemoryDataStore_Merge(t *testing.T) {

_, err = dataStore1.EnvMetadata().Get()
require.NoError(t, err, "Fetching env metadata from dataStore1 should not fail")

// Verify staged deletes were propagated to DeletedRemoteKeys
if len(tt.expectedAddressDRK) > 0 {
require.ElementsMatch(t, tt.expectedAddressDRK, dataStore1.AddressRefStore.DeletedRemoteKeys,
"dataStore1 AddressRefStore.DeletedRemoteKeys should contain the expected staged keys after merge")
}
if len(tt.expectedChainMetaDRK) > 0 {
require.ElementsMatch(t, tt.expectedChainMetaDRK, dataStore1.ChainMetadataStore.DeletedRemoteKeys,
"dataStore1 ChainMetadataStore.DeletedRemoteKeys should contain the expected staged keys after merge")
}
if len(tt.expectedContractMetaDRK) > 0 {
require.ElementsMatch(t, tt.expectedContractMetaDRK, dataStore1.ContractMetadataStore.DeletedRemoteKeys,
"dataStore1 ContractMetadataStore.DeletedRemoteKeys should contain the expected staged keys after merge")
}
})
}
}

func TestMemoryDataStore_Merge_ChainedComposition(t *testing.T) {
t.Parallel()

recA := AddressRef{
Address: "0xAAA",
Type: "typeA",
Version: semver.MustParse("2.0.0"),
Qualifier: "qa",
}

t.Run("Chained merge preserves DRK across two hops", func(t *testing.T) {
t.Parallel()

dataStore1 := NewMemoryDataStore()
require.NoError(t, dataStore1.Addresses().Add(recA))

dataStore2 := NewMemoryDataStore()
require.NoError(t, dataStore2.Addresses().RemoteDelete(recA.Key()))

dataStore3 := NewMemoryDataStore()
require.NoError(t, dataStore3.Merge(dataStore2.Seal()))

dataStore4 := NewMemoryDataStore()
require.NoError(t, dataStore4.Merge(dataStore3.Seal()))

require.Contains(t, dataStore4.AddressRefStore.DeletedRemoteKeys, recA.Key().String(),
"DRK should survive two merge hops")

addressRefs, err := dataStore4.Addresses().Fetch()
require.NoError(t, err)
require.Empty(t, addressRefs, "recA should not appear in dataStore4 records after chained deletes")
})

t.Run("Merge is idempotent on staged deletes", func(t *testing.T) {
t.Parallel()

dataStore1 := NewMemoryDataStore()
require.NoError(t, dataStore1.Addresses().Add(recA))

dataStore2 := NewMemoryDataStore()
require.NoError(t, dataStore2.Addresses().RemoteDelete(recA.Key()))

sealed := dataStore2.Seal()

require.NoError(t, dataStore1.Merge(sealed), "first merge should succeed")
require.NoError(t, dataStore1.Merge(sealed), "second merge should succeed (idempotent)")

addressRefs, err := dataStore1.Addresses().Fetch()
require.NoError(t, err)
require.Empty(t, addressRefs, "recA should not be present after idempotent merges")

require.Contains(t, dataStore1.AddressRefStore.DeletedRemoteKeys, recA.Key().String(),
"DRK should be present after idempotent merges")
})
}

// TestMemoryDataStore_Merge_SourceLiveRecordClearsDestStagedDelete pins the
// precedence rule documented on MemoryDataStore.Merge: when src.<Store>.Records
// contains a live record for key K and src.<Store>.DeletedRemoteKeys does NOT
// contain K, Merge's upsert clears K from dst.<Store>.DeletedRemoteKeys.
//
// In other words, a live record on the source side overrides a staged delete on
// the destination side for the same key. Staged deletes are sticky only on the
// source side of Merge.
//
// If this test starts failing, the precedence note on MemoryDataStore.Merge is
// out of date — update either the assertion or the docstring to keep them in sync.
func TestMemoryDataStore_Merge_SourceLiveRecordClearsDestStagedDelete(t *testing.T) {
t.Parallel()

rec := AddressRef{
Address: "0xBEEF",
Type: "typeP",
Version: semver.MustParse("1.0.0"),
Qualifier: "qP",
}

dst := NewMemoryDataStore()
require.NoError(t, dst.Addresses().RemoteDelete(rec.Key()))
require.Contains(t, dst.AddressRefStore.DeletedRemoteKeys, rec.Key().String(),
"precondition: dst.DRK should contain the staged key before Merge")

src := NewMemoryDataStore()
require.NoError(t, src.Addresses().Add(rec))

require.NoError(t, dst.Merge(src.Seal()))

addressRefs, err := dst.Addresses().Fetch()
require.NoError(t, err)
require.Len(t, addressRefs, 1, "dst should now contain src's live record")

require.NotContains(t, dst.AddressRefStore.DeletedRemoteKeys, rec.Key().String(),
"dst.DRK should no longer contain the previously-staged key — Upsert cleared it")
}
Loading