Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/flat-moose-check.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink-deployments-framework": minor
---

feat: adds HMAC authentication support for catalog remote
70 changes: 35 additions & 35 deletions datastore/catalog/remote/address_ref_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,26 +53,26 @@ func (s *catalogAddressRefStore) get(
ignoreTransaction bool,
key datastore.AddressRefKey,
) (datastore.AddressRef, error) {
// Create a bidirectional stream
stream, err := s.client.DataAccess()
if err != nil {
return datastore.AddressRef{}, fmt.Errorf("failed to create data access stream: %w", err)
}

// Create the find request with the key converted to a filter
filter := s.keyToFilter(key)
findRequest := &pb.AddressReferenceFindRequest{
KeyFilter: filter,
IgnoreTransaction: ignoreTransaction,
}

// Send the request
// Create the request
request := &pb.DataAccessRequest{
Operation: &pb.DataAccessRequest_AddressReferenceFindRequest{
AddressReferenceFindRequest: findRequest,
},
}

// Create a bidirectional stream with the initial request for HMAC
stream, err := s.client.DataAccess(request)
if err != nil {
return datastore.AddressRef{}, fmt.Errorf("failed to create data access stream: %w", err)
}

if sendErr := stream.Send(request); sendErr != nil {
return datastore.AddressRef{}, fmt.Errorf("failed to send find request: %w", sendErr)
}
Expand Down Expand Up @@ -120,12 +120,6 @@ func (s *catalogAddressRefStore) get(

// Fetch returns a copy of all AddressRef in the catalog.
func (s *catalogAddressRefStore) Fetch(_ context.Context) ([]datastore.AddressRef, error) {
// Create a bidirectional stream
stream, err := s.client.DataAccess()
if err != nil {
return nil, fmt.Errorf("failed to create data access stream: %w", err)
}

// Create the find request with an empty filter to get all records
// We only filter by domain and environment to get all records for this store's scope
filter := &pb.AddressReferenceKeyFilter{
Expand All @@ -138,13 +132,19 @@ func (s *catalogAddressRefStore) Fetch(_ context.Context) ([]datastore.AddressRe
KeyFilter: filter,
}

// Send the request
// Create the request
request := &pb.DataAccessRequest{
Operation: &pb.DataAccessRequest_AddressReferenceFindRequest{
AddressReferenceFindRequest: findRequest,
},
}

// Create a bidirectional stream with the initial request for HMAC
stream, err := s.client.DataAccess(request)
if err != nil {
return nil, fmt.Errorf("failed to create data access stream: %w", err)
}

if sendErr := stream.Send(request); sendErr != nil {
return nil, fmt.Errorf("failed to send find request: %w", sendErr)
}
Expand Down Expand Up @@ -212,12 +212,6 @@ func (s *catalogAddressRefStore) Filter(
}

func (s *catalogAddressRefStore) Add(_ context.Context, record datastore.AddressRef) error {
// Create a bidirectional stream
stream, err := s.client.DataAccess()
if err != nil {
return fmt.Errorf("failed to create data access stream: %w", err)
}

// Convert the datastore record to protobuf
protoRef := s.addressRefToProto(record)

Expand All @@ -227,13 +221,19 @@ func (s *catalogAddressRefStore) Add(_ context.Context, record datastore.Address
Semantics: pb.EditSemantics_SEMANTICS_INSERT,
}

// Send the edit request
// Create the request
editReq := &pb.DataAccessRequest{
Operation: &pb.DataAccessRequest_AddressReferenceEditRequest{
AddressReferenceEditRequest: editRequest,
},
}

// Create a bidirectional stream with the initial request for HMAC
stream, err := s.client.DataAccess(editReq)
if err != nil {
return fmt.Errorf("failed to create data access stream: %w", err)
}

if sendErr := stream.Send(editReq); sendErr != nil {
return fmt.Errorf("failed to send edit request: %w", sendErr)
}
Expand All @@ -259,12 +259,6 @@ func (s *catalogAddressRefStore) Add(_ context.Context, record datastore.Address
}

func (s *catalogAddressRefStore) Upsert(_ context.Context, record datastore.AddressRef) error {
// Create a bidirectional stream
stream, err := s.client.DataAccess()
if err != nil {
return fmt.Errorf("failed to create data access stream: %w", err)
}

// Convert the datastore record to protobuf
protoRef := s.addressRefToProto(record)

Expand All @@ -274,13 +268,19 @@ func (s *catalogAddressRefStore) Upsert(_ context.Context, record datastore.Addr
Semantics: pb.EditSemantics_SEMANTICS_UPSERT,
}

// Send the edit request
// Create the request
request := &pb.DataAccessRequest{
Operation: &pb.DataAccessRequest_AddressReferenceEditRequest{
AddressReferenceEditRequest: editRequest,
},
}

// Create a bidirectional stream with the initial request for HMAC
stream, err := s.client.DataAccess(request)
if err != nil {
return fmt.Errorf("failed to create data access stream: %w", err)
}

if sendErr := stream.Send(request); sendErr != nil {
return fmt.Errorf("failed to send edit request: %w", sendErr)
}
Expand Down Expand Up @@ -319,12 +319,6 @@ func (s *catalogAddressRefStore) Update(ctx context.Context, record datastore.Ad
}

// Record exists, proceed with updating it
// Create a bidirectional stream
stream, streamErr := s.client.DataAccess()
if streamErr != nil {
return fmt.Errorf("failed to create data access stream: %w", streamErr)
}

// Convert the datastore record to protobuf
protoRef := s.addressRefToProto(record)

Expand All @@ -334,13 +328,19 @@ func (s *catalogAddressRefStore) Update(ctx context.Context, record datastore.Ad
Semantics: pb.EditSemantics_SEMANTICS_UPDATE,
}

// Send the edit request
// Create the request
editReq := &pb.DataAccessRequest{
Operation: &pb.DataAccessRequest_AddressReferenceEditRequest{
AddressReferenceEditRequest: editRequest,
},
}

// Create a bidirectional stream with the initial request for HMAC
stream, streamErr := s.client.DataAccess(editReq)
if streamErr != nil {
return fmt.Errorf("failed to create data access stream: %w", streamErr)
}

if sendErr := stream.Send(editReq); sendErr != nil {
return fmt.Errorf("failed to send edit request: %w", sendErr)
}
Expand Down
2 changes: 1 addition & 1 deletion datastore/catalog/remote/address_ref_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ func setupTestStore(t *testing.T, domain, environment string) *catalogAddressRef
}

// Test if the service is actually available by making a simple call
_, err = catalogClient.DataAccess()
_, err = catalogClient.DataAccess(&pb.DataAccessRequest{})
if err != nil {
t.Skipf("gRPC service not available at %s: %v. Skipping integration tests.", address, err)
return nil
Expand Down
39 changes: 21 additions & 18 deletions datastore/catalog/remote/chain_metadata_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,7 @@ func (s *catalogChainMetadataStore) Get(
}

func (s *catalogChainMetadataStore) get(ignoreTransaction bool, key datastore.ChainMetadataKey) (datastore.ChainMetadata, error) {
stream, err := s.client.DataAccess()
if err != nil {
return datastore.ChainMetadata{}, fmt.Errorf("failed to create gRPC stream: %w", err)
}

// Send find request
// Create find request
findReq := &pb.DataAccessRequest{
Operation: &pb.DataAccessRequest_ChainMetadataFindRequest{
ChainMetadataFindRequest: &pb.ChainMetadataFindRequest{
Expand All @@ -134,6 +129,12 @@ func (s *catalogChainMetadataStore) get(ignoreTransaction bool, key datastore.Ch
},
}

// Create stream with the initial request for HMAC
stream, err := s.client.DataAccess(findReq)
if err != nil {
return datastore.ChainMetadata{}, fmt.Errorf("failed to create gRPC stream: %w", err)
}

if sendErr := stream.Send(findReq); sendErr != nil {
return datastore.ChainMetadata{}, fmt.Errorf("failed to send find request: %w", sendErr)
}
Expand Down Expand Up @@ -181,12 +182,7 @@ func (s *catalogChainMetadataStore) get(ignoreTransaction bool, key datastore.Ch

// Fetch returns a copy of all ChainMetadata in the catalog.
func (s *catalogChainMetadataStore) Fetch(_ context.Context) ([]datastore.ChainMetadata, error) {
stream, err := s.client.DataAccess()
if err != nil {
return nil, fmt.Errorf("failed to create gRPC stream: %w", err)
}

// Send find request with domain and environment filter only (fetch all)
// Create find request with domain and environment filter only (fetch all)
findReq := &pb.DataAccessRequest{
Operation: &pb.DataAccessRequest_ChainMetadataFindRequest{
ChainMetadataFindRequest: &pb.ChainMetadataFindRequest{
Expand All @@ -198,6 +194,12 @@ func (s *catalogChainMetadataStore) Fetch(_ context.Context) ([]datastore.ChainM
},
}

// Create stream with the initial request for HMAC
stream, err := s.client.DataAccess(findReq)
if err != nil {
return nil, fmt.Errorf("failed to create gRPC stream: %w", err)
}

if sendErr := stream.Send(findReq); sendErr != nil {
return nil, fmt.Errorf("failed to send find request: %w", sendErr)
}
Expand Down Expand Up @@ -347,16 +349,11 @@ func (s *catalogChainMetadataStore) Delete(_ context.Context, _ datastore.ChainM

// editRecord is a helper method that handles Add, Upsert, and Update operations
func (s *catalogChainMetadataStore) editRecord(record datastore.ChainMetadata, semantics pb.EditSemantics) error {
stream, err := s.client.DataAccess()
if err != nil {
return fmt.Errorf("failed to create gRPC stream: %w", err)
}

// Get the current version for this record
key := record.Key()
version := s.getVersion(key)

// Send edit request
// Create edit request
editReq := &pb.DataAccessRequest{
Operation: &pb.DataAccessRequest_ChainMetadataEditRequest{
ChainMetadataEditRequest: &pb.ChainMetadataEditRequest{
Expand All @@ -366,6 +363,12 @@ func (s *catalogChainMetadataStore) editRecord(record datastore.ChainMetadata, s
},
}

// Create stream with the initial request for HMAC
stream, err := s.client.DataAccess(editReq)
if err != nil {
return fmt.Errorf("failed to create gRPC stream: %w", err)
}

if sendErr := stream.Send(editReq); sendErr != nil {
return fmt.Errorf("failed to send edit request: %w", sendErr)
}
Expand Down
2 changes: 1 addition & 1 deletion datastore/catalog/remote/chain_metadata_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func setupTestChainStore(t *testing.T, domain, environment string) *catalogChain
}

// Test if the service is actually available by making a simple call
_, err = catalogClient.DataAccess()
_, err = catalogClient.DataAccess(&pb.DataAccessRequest{})
if err != nil {
t.Skipf("gRPC service not available at %s: %v. Skipping integration tests.", address, err)
return nil
Expand Down
39 changes: 21 additions & 18 deletions datastore/catalog/remote/contract_metadata_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,7 @@ func (s *catalogContractMetadataStore) Get(
}

func (s *catalogContractMetadataStore) get(ignoreTransaction bool, key datastore.ContractMetadataKey) (datastore.ContractMetadata, error) {
stream, err := s.client.DataAccess()
if err != nil {
return datastore.ContractMetadata{}, fmt.Errorf("failed to create gRPC stream: %w", err)
}

// Send find request
// Create find request
findReq := &pb.DataAccessRequest{
Operation: &pb.DataAccessRequest_ContractMetadataFindRequest{
ContractMetadataFindRequest: &pb.ContractMetadataFindRequest{
Expand All @@ -138,6 +133,12 @@ func (s *catalogContractMetadataStore) get(ignoreTransaction bool, key datastore
},
}

// Create stream with the initial request for HMAC
stream, err := s.client.DataAccess(findReq)
if err != nil {
return datastore.ContractMetadata{}, fmt.Errorf("failed to create gRPC stream: %w", err)
}

if sendErr := stream.Send(findReq); sendErr != nil {
return datastore.ContractMetadata{}, fmt.Errorf("failed to send find request: %w", sendErr)
}
Expand Down Expand Up @@ -185,12 +186,7 @@ func (s *catalogContractMetadataStore) get(ignoreTransaction bool, key datastore

// Fetch returns a copy of all ContractMetadata in the catalog.
func (s *catalogContractMetadataStore) Fetch(_ context.Context) ([]datastore.ContractMetadata, error) {
stream, err := s.client.DataAccess()
if err != nil {
return nil, fmt.Errorf("failed to create gRPC stream: %w", err)
}

// Send find request with domain and environment filter only (fetch all)
// Create find request with domain and environment filter only (fetch all)
findReq := &pb.DataAccessRequest{
Operation: &pb.DataAccessRequest_ContractMetadataFindRequest{
ContractMetadataFindRequest: &pb.ContractMetadataFindRequest{
Expand All @@ -202,6 +198,12 @@ func (s *catalogContractMetadataStore) Fetch(_ context.Context) ([]datastore.Con
},
}

// Create stream with the initial request for HMAC
stream, err := s.client.DataAccess(findReq)
if err != nil {
return nil, fmt.Errorf("failed to create gRPC stream: %w", err)
}

if sendErr := stream.Send(findReq); sendErr != nil {
return nil, fmt.Errorf("failed to send find request: %w", sendErr)
}
Expand Down Expand Up @@ -354,16 +356,11 @@ func (s *catalogContractMetadataStore) Delete(_ context.Context, _ datastore.Con

// editRecord is a helper method that handles Add, Upsert, and Update operations
func (s *catalogContractMetadataStore) editRecord(record datastore.ContractMetadata, semantics pb.EditSemantics) error {
stream, err := s.client.DataAccess()
if err != nil {
return fmt.Errorf("failed to create gRPC stream: %w", err)
}

// Get the current version for this record
key := record.Key()
version := s.getVersion(key)

// Send edit request
// Create edit request
editReq := &pb.DataAccessRequest{
Operation: &pb.DataAccessRequest_ContractMetadataEditRequest{
ContractMetadataEditRequest: &pb.ContractMetadataEditRequest{
Expand All @@ -373,6 +370,12 @@ func (s *catalogContractMetadataStore) editRecord(record datastore.ContractMetad
},
}

// Create stream with the initial request for HMAC
stream, err := s.client.DataAccess(editReq)
if err != nil {
return fmt.Errorf("failed to create gRPC stream: %w", err)
}

if sendErr := stream.Send(editReq); sendErr != nil {
return fmt.Errorf("failed to send edit request: %w", sendErr)
}
Expand Down
2 changes: 1 addition & 1 deletion datastore/catalog/remote/contract_metadata_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func setupTestContractStore(t *testing.T, domain, environment string) *catalogCo
}

// Test if the gRPC service is actually available by making a simple call
_, err = catalogClient.DataAccess()
_, err = catalogClient.DataAccess(&pb.DataAccessRequest{})
if err != nil {
t.Skipf("gRPC service not available at %s: %v. Skipping integration tests.", address, err)
return nil
Expand Down
5 changes: 4 additions & 1 deletion datastore/catalog/remote/datastore_tx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"google.golang.org/grpc/credentials/insecure"

"github.com/smartcontractkit/chainlink-deployments-framework/datastore"

pb "github.com/smartcontractkit/chainlink-protos/op-catalog/v1/datastore"
)

//nolint:paralleltest
Expand Down Expand Up @@ -448,7 +451,7 @@ func setupStore(t *testing.T, ctx context.Context) (*catalogDataStore, error) {
return nil, fmt.Errorf("failed to connect to gRPC server at %s: %w. Skipping integration tests", address, err)
}
// Test if the service is actually available by making a simple call
_, err = catalogClient.DataAccess()
_, err = catalogClient.DataAccess(&pb.DataAccessRequest{})
if err != nil {
return nil, fmt.Errorf("gRPC service not available at %s: %w. Skipping integration tests", address, err)
}
Expand Down
Loading
Loading