Skip to content

Commit 3ba8202

Browse files
[CLD-755]: fix(catalog/memory): replace pgtest with in memory map (#507)
The current memory catalog client implementation uses pgtest which requires postgres binary available in the host machine and also actually spins up a postgres service when running the test. We want a true in memory implementation. Remove usage of pgtest and replace with map for a true inmemory client implementation of catalog JIRA: https://smartcontract-it.atlassian.net/browse/CLD-755 Previously ``` github.com/smartcontractkit/chainlink-deployments-framework/datastore/catalog/memory 36.685s coverage: 83.3% of statements ``` Now ``` github.com/smartcontractkit/chainlink-deployments-framework/datastore/catalog/memory 1.028s coverage: 84.8% of statements ```
1 parent 62ed5d0 commit 3ba8202

17 files changed

+781
-969
lines changed

.changeset/light-ends-hammer.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
"chainlink-deployments-framework": patch
3+
---
4+
5+
fix(catalog/memory): convert to true inmemory implementation
6+
7+
Remove dependency on pgtest

datastore/catalog/memory/README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,3 @@ This implementation maintains the Catalog stores in-memory as simple tables,
44
for use in tests. It is test-only, and should not be used in any changeset
55
execution into a deployed environment.
66

7-
> TODO: Implement the in-memory catalog.

datastore/catalog/memory/address_ref_store.go

Lines changed: 24 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -2,51 +2,22 @@ package memory
22

33
import (
44
"context"
5-
"database/sql"
65
"errors"
76
"fmt"
87

9-
"github.com/lib/pq"
10-
118
"github.com/smartcontractkit/chainlink-deployments-framework/datastore"
129
)
1310

14-
const (
15-
query_ADDRESS_REFERENCE_BY_ID = `
16-
SELECT * from address_references
17-
WHERE chain_selector = $1
18-
AND contract_type = $2
19-
AND version = $3
20-
AND qualifier = $4`
21-
query_ALL_ADDRESS_REFERENCES = `
22-
SELECT chain_selector, contract_type, version, qualifier, address, label_set
23-
FROM address_references`
24-
query_ADD_ADDRESS_REFERENCE = `
25-
INSERT INTO address_references (chain_selector, contract_type, version, qualifier, address, label_set)
26-
VALUES ($1, $2, $3, $4, $5, $6)`
27-
query_UPSERT_ADDRESS_REFERENCE = query_ADD_ADDRESS_REFERENCE + `
28-
ON CONFLICT ON CONSTRAINT address_references_pkey
29-
DO UPDATE SET address = excluded.address, label_set = excluded.label_set`
30-
query_UPDATE_ADDRESS_REFERENCE = `
31-
UPDATE address_references SET
32-
address = $5,
33-
label_set = $6
34-
WHERE chain_selector = $1
35-
AND contract_type = $2
36-
AND version = $3
37-
AND qualifier = $4`
38-
)
39-
4011
type memoryAddressRefStore struct {
41-
db *dbController
12+
storage *memoryStorage
4213
}
4314

4415
// Ensure memoryAddressRefStore implements the V2 interface
4516
var _ datastore.MutableRefStoreV2[datastore.AddressRefKey, datastore.AddressRef] = &memoryAddressRefStore{}
4617

47-
func newCatalogAddressRefStore(db *dbController) *memoryAddressRefStore {
18+
func newCatalogAddressRefStore(storage *memoryStorage) *memoryAddressRefStore {
4819
return &memoryAddressRefStore{
49-
db: db,
20+
storage: storage,
5021
}
5122
}
5223

@@ -58,75 +29,15 @@ func (s *memoryAddressRefStore) Get(ctx context.Context, key datastore.AddressRe
5829
ignoreTransactions = true
5930
}
6031
}
61-
var db DB
62-
if ignoreTransactions {
63-
db = s.db.base
64-
} else {
65-
db = s.db
66-
}
67-
rows, err := db.QueryContext(ctx, query_ADDRESS_REFERENCE_BY_ID, key.ChainSelector(), key.Type().String(), key.Version().String(), key.Qualifier())
68-
defer func(rows *sql.Rows) {
69-
if rows != nil {
70-
_ = rows.Close()
71-
}
72-
}(rows)
73-
if err != nil {
74-
return datastore.AddressRef{}, err
75-
}
7632

77-
count := 0
78-
row := &datastore.AddressRef{}
79-
for rows.Next() {
80-
count++
81-
array := pq.StringArray{}
82-
err = rows.Scan(&row.ChainSelector, &row.Type, &row.Version, &row.Qualifier, &row.Address, &array)
83-
for _, label := range array {
84-
row.Labels.Add(label)
85-
}
86-
if err != nil {
87-
return datastore.AddressRef{}, err
88-
}
89-
}
33+
compositeKey := addressRefKey(key.ChainSelector(), key.Type().String(), key.Version().String(), key.Qualifier())
9034

91-
switch count {
92-
case 0:
93-
return *row, datastore.ErrAddressRefNotFound
94-
case 1:
95-
return *row, nil
96-
default:
97-
err = fmt.Errorf("expected a single row, got %d", count)
98-
return *row, err
99-
}
35+
return s.storage.getAddressRef(ctx, compositeKey, ignoreTransactions)
10036
}
10137

10238
// Fetch returns a copy of all AddressRefs in the catalog.
10339
func (s *memoryAddressRefStore) Fetch(ctx context.Context) ([]datastore.AddressRef, error) {
104-
rows, err := s.db.QueryContext(ctx, query_ALL_ADDRESS_REFERENCES)
105-
defer func(rows *sql.Rows) {
106-
if rows != nil {
107-
_ = rows.Close()
108-
}
109-
}(rows)
110-
111-
if err != nil {
112-
return nil, err
113-
}
114-
var refs []datastore.AddressRef
115-
116-
for rows.Next() {
117-
row := &datastore.AddressRef{}
118-
array := pq.StringArray{}
119-
err = rows.Scan(&row.ChainSelector, &row.Type, &row.Version, &row.Qualifier, &row.Address, &array)
120-
for _, label := range array {
121-
row.Labels.Add(label)
122-
}
123-
if err != nil {
124-
return refs, err
125-
}
126-
refs = append(refs, *row)
127-
}
128-
129-
return refs, nil
40+
return s.storage.getAllAddressRefs(ctx)
13041
}
13142

13243
// Filter returns a copy of all AddressRef in the catalog that match the provided filter.
@@ -153,45 +64,35 @@ func (s *memoryAddressRefStore) Filter(
15364
}
15465

15566
func (s *memoryAddressRefStore) Add(ctx context.Context, r datastore.AddressRef) error {
156-
return s.edit(ctx, query_ADD_ADDRESS_REFERENCE, r)
67+
compositeKey := addressRefKey(r.ChainSelector, r.Type.String(), r.Version.String(), r.Qualifier)
68+
69+
// Check if the record already exists
70+
_, err := s.storage.getAddressRef(ctx, compositeKey, false)
71+
if err == nil {
72+
return errors.New("address reference already exists")
73+
}
74+
if !errors.Is(err, datastore.ErrAddressRefNotFound) {
75+
return err
76+
}
77+
78+
return s.storage.setAddressRef(ctx, compositeKey, r)
15779
}
15880

15981
func (s *memoryAddressRefStore) Upsert(ctx context.Context, r datastore.AddressRef) error {
160-
return s.edit(ctx, query_UPSERT_ADDRESS_REFERENCE, r)
82+
compositeKey := addressRefKey(r.ChainSelector, r.Type.String(), r.Version.String(), r.Qualifier)
83+
return s.storage.setAddressRef(ctx, compositeKey, r)
16184
}
16285

16386
func (s *memoryAddressRefStore) Update(ctx context.Context, r datastore.AddressRef) error {
164-
return s.edit(ctx, query_UPDATE_ADDRESS_REFERENCE, r)
165-
}
87+
compositeKey := addressRefKey(r.ChainSelector, r.Type.String(), r.Version.String(), r.Qualifier)
16688

167-
func (s *memoryAddressRefStore) edit(ctx context.Context, qry string, r datastore.AddressRef) error {
168-
result, err := s.db.ExecContext(
169-
ctx,
170-
qry,
171-
r.ChainSelector,
172-
r.Type.String(),
173-
r.Version.String(),
174-
r.Qualifier,
175-
r.Address,
176-
pq.StringArray(r.Labels.List()),
177-
)
89+
// Check if the record exists first
90+
_, err := s.storage.getAddressRef(ctx, compositeKey, false)
17891
if err != nil {
17992
return err
18093
}
181-
count, err := result.RowsAffected()
182-
if err != nil {
183-
return err
184-
}
185-
if count != 1 {
186-
switch qry {
187-
case query_UPDATE_ADDRESS_REFERENCE:
188-
return datastore.ErrAddressRefNotFound
189-
default:
190-
return fmt.Errorf("expected 1 row affected, got %d", count)
191-
}
192-
}
19394

194-
return nil
95+
return s.storage.setAddressRef(ctx, compositeKey, r)
19596
}
19697

19798
func (s *memoryAddressRefStore) Delete(_ context.Context, _ datastore.AddressRefKey) error {

datastore/catalog/memory/address_ref_store_test.go

Lines changed: 21 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@ import (
1414
"github.com/smartcontractkit/chainlink-deployments-framework/datastore"
1515
)
1616

17-
//nolint:paralleltest // Subtests share database instance, cannot run in parallel
1817
func TestCatalogAddressRefStore_Get(t *testing.T) {
19-
store, closer := setupTestStore(t)
20-
18+
t.Parallel()
2119
t.Run("not found", func(t *testing.T) {
20+
t.Parallel()
21+
store := setupTestStore(t)
2222
version := semver.MustParse("99.99.99")
2323
key := datastore.NewAddressRefKey(99999999, "NonExistentContract", version, "nonexistent")
2424
_, err := store.Get(t.Context(), key)
@@ -27,6 +27,8 @@ func TestCatalogAddressRefStore_Get(t *testing.T) {
2727
})
2828

2929
t.Run("success", func(t *testing.T) {
30+
t.Parallel()
31+
store := setupTestStore(t)
3032
addressRef := newRandomAddressRef()
3133
err := store.Add(t.Context(), addressRef)
3234
require.NoError(t, err)
@@ -40,11 +42,10 @@ func TestCatalogAddressRefStore_Get(t *testing.T) {
4042
require.Equal(t, addressRef.Address, result.Address)
4143
require.Equal(t, addressRef.Labels, result.Labels)
4244
})
43-
defer closer()
4445
}
4546

46-
//nolint:paralleltest // Subtests share database instance, cannot run in parallel
4747
func TestCatalogAddressRefStore_Add(t *testing.T) {
48+
t.Parallel()
4849
tests := []struct {
4950
name string
5051
setup func(store *memoryAddressRefStore) datastore.AddressRef
@@ -74,9 +75,8 @@ func TestCatalogAddressRefStore_Add(t *testing.T) {
7475

7576
for _, tt := range tests {
7677
t.Run(tt.name, func(t *testing.T) {
77-
// Create a fresh store for each test case to avoid concurrency issues
78-
store, closer := setupTestStore(t)
79-
defer closer()
78+
t.Parallel()
79+
store := setupTestStore(t)
8080

8181
addressRef := tt.setup(store)
8282

@@ -108,8 +108,8 @@ func TestCatalogAddressRefStore_Add(t *testing.T) {
108108
}
109109
}
110110

111-
//nolint:paralleltest // Subtests share database instance, cannot run in parallel
112111
func TestCatalogAddressRefStore_Update(t *testing.T) {
112+
t.Parallel()
113113
tests := []struct {
114114
name string
115115
setup func(store *memoryAddressRefStore) datastore.AddressRef
@@ -155,9 +155,8 @@ func TestCatalogAddressRefStore_Update(t *testing.T) {
155155

156156
for _, tt := range tests {
157157
t.Run(tt.name, func(t *testing.T) {
158-
// Create a fresh store for each test case to avoid concurrency issues
159-
store, closer := setupTestStore(t)
160-
defer closer()
158+
t.Parallel()
159+
store := setupTestStore(t)
161160

162161
addressRef := tt.setup(store)
163162

@@ -180,8 +179,8 @@ func TestCatalogAddressRefStore_Update(t *testing.T) {
180179
}
181180
}
182181

183-
//nolint:paralleltest // Subtests share database instance, cannot run in parallel
184182
func TestCatalogAddressRefStore_Upsert(t *testing.T) {
183+
t.Parallel()
185184
tests := []struct {
186185
name string
187186
setup func(store *memoryAddressRefStore) datastore.AddressRef
@@ -230,9 +229,8 @@ func TestCatalogAddressRefStore_Upsert(t *testing.T) {
230229

231230
for _, tt := range tests {
232231
t.Run(tt.name, func(t *testing.T) {
233-
// Create a fresh store for each test case to avoid concurrency issues
234-
store, closer := setupTestStore(t)
235-
defer closer()
232+
t.Parallel()
233+
store := setupTestStore(t)
236234

237235
addressRef := tt.setup(store)
238236

@@ -246,10 +244,9 @@ func TestCatalogAddressRefStore_Upsert(t *testing.T) {
246244
}
247245
}
248246

249-
//nolint:paralleltest // Subtests share database instance, cannot run in parallel
250247
func TestCatalogAddressRefStore_Delete(t *testing.T) {
251-
store, closer := setupTestStore(t)
252-
defer closer()
248+
t.Parallel()
249+
store := setupTestStore(t)
253250

254251
version := semver.MustParse("1.0.0")
255252
key := datastore.NewAddressRefKey(12345, "LinkToken", version, "test")
@@ -262,8 +259,8 @@ func TestCatalogAddressRefStore_Delete(t *testing.T) {
262259
require.Contains(t, err.Error(), "delete operation not supported")
263260
}
264261

265-
//nolint:paralleltest // Subtests share database instance, cannot run in parallel
266262
func TestCatalogAddressRefStore_FetchAndFilter(t *testing.T) {
263+
t.Parallel()
267264
tests := []struct {
268265
name string
269266
operation string
@@ -412,9 +409,8 @@ func TestCatalogAddressRefStore_FetchAndFilter(t *testing.T) {
412409

413410
for _, tt := range tests {
414411
t.Run(tt.name, func(t *testing.T) {
415-
// Create a fresh store for each test case to avoid concurrency issues
416-
store, closer := setupTestStore(t)
417-
defer closer()
412+
t.Parallel()
413+
store := setupTestStore(t)
418414

419415
addressRef1, addressRef2 := tt.setup(store)
420416

@@ -444,14 +440,12 @@ func TestCatalogAddressRefStore_FetchAndFilter(t *testing.T) {
444440
}
445441

446442
// setupTestStore creates a real gRPC client connection to a local service
447-
func setupTestStore(t *testing.T) (*memoryAddressRefStore, func()) {
443+
func setupTestStore(t *testing.T) *memoryAddressRefStore {
448444
t.Helper()
449445
store, err := NewMemoryCatalogDataStore()
450446
require.NoError(t, err)
451447

452-
return store.Addresses().(*memoryAddressRefStore), func() {
453-
require.NoError(t, store.Close())
454-
}
448+
return store.Addresses().(*memoryAddressRefStore)
455449
}
456450

457451
// randomHex generates a random hex string of specified length

0 commit comments

Comments
 (0)