From 55fa64fc758a15542e628828d74238f3c2869736 Mon Sep 17 00:00:00 2001 From: DimitriosNaikopoulos Date: Wed, 12 Nov 2025 17:47:27 +0000 Subject: [PATCH 1/3] CLD-769: Improve catalog client reliability --- datastore/catalog/remote/grpc.go | 45 +++++++++++++++++++++++++++ datastore/catalog/remote/grpc_test.go | 14 +++++++++ 2 files changed, 59 insertions(+) diff --git a/datastore/catalog/remote/grpc.go b/datastore/catalog/remote/grpc.go index ecb75fc04..966415d20 100644 --- a/datastore/catalog/remote/grpc.go +++ b/datastore/catalog/remote/grpc.go @@ -4,13 +4,33 @@ import ( "context" "fmt" "sync" + "time" pb "github.com/smartcontractkit/chainlink-protos/op-catalog/v1/datastore" "google.golang.org/grpc" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/keepalive" "google.golang.org/protobuf/proto" ) +const retryPolicy = `{ + "methodConfig": [{ + "name": [{"service": "op_catalog.v1.datastore.Datastore"}], + "retryPolicy": { + "maxAttempts": 5, + "initialBackoff": "0.1s", + "maxBackoff": "1s", + "backoffMultiplier": 2, + "retryableStatusCodes": [ + "UNAVAILABLE", + "DEADLINE_EXCEEDED", + "INTERNAL", + "RESOURCE_EXHAUSTED" + ] + } + }] +}` + type CatalogClient struct { protoClient pb.DatastoreClient // ctx is cached here, because we need the context that created the client, not the current @@ -23,6 +43,7 @@ type CatalogClient struct { // //nolint:containedctx ctx context.Context + conn *grpc.ClientConn cachedStream grpc.BidiStreamingClient[pb.DataAccessRequest, pb.DataAccessResponse] hmacConfig *HMACAuthConfig streamInitOnce sync.Once @@ -55,6 +76,7 @@ func (c *CatalogClient) DataAccess(req proto.Message) (grpc.BidiStreamingClient[ return c.cachedStream, c.streamInitErr } +// CloseStream closes the current stream. func (c *CatalogClient) CloseStream() error { if c.cachedStream == nil { return nil @@ -68,6 +90,18 @@ func (c *CatalogClient) CloseStream() error { return nil } +// Close closes the underlying gRPC connection. +func (c *CatalogClient) Close() error { + if c.cachedStream != nil { + return fmt.Errorf("stream is not closed") + } + + if c.conn != nil { + return c.conn.Close() + } + return nil +} + type CatalogConfig struct { GRPC string Creds credentials.TransportCredentials @@ -98,6 +132,7 @@ func NewCatalogClient(ctx context.Context, cfg CatalogConfig) (*CatalogClient, e client := CatalogClient{ ctx: ctx, hmacConfig: cfg.HMACConfig, + conn: conn, protoClient: pb.NewDatastoreClient(conn), } @@ -122,6 +157,16 @@ func newCatalogConnection(cfg CatalogConfig) (*grpc.ClientConn, error) { opts = append(opts, grpc.WithAuthority(cfg.HMACConfig.Authority)) } + // Keepalive for long-lived bidirectional streams + // Ping every 20 seconds, wait up to 10 seconds for a response + opts = append(opts, grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: 20 * time.Second, + Timeout: 10 * time.Second, + PermitWithoutStream: true, + })) + + opts = append(opts, grpc.WithDefaultServiceConfig(retryPolicy)) + conn, err := grpc.NewClient(cfg.GRPC, opts...) if err != nil { return nil, err diff --git a/datastore/catalog/remote/grpc_test.go b/datastore/catalog/remote/grpc_test.go index dde5f4dae..e3288f9d5 100644 --- a/datastore/catalog/remote/grpc_test.go +++ b/datastore/catalog/remote/grpc_test.go @@ -70,3 +70,17 @@ func TestNewCatalogClient_Success(t *testing.T) { }) } } + +func TestCatalogClient_Close(t *testing.T) { + t.Parallel() + + client, err := remote.NewCatalogClient(t.Context(), remote.CatalogConfig{ + GRPC: "localhost:9090", + Creds: insecure.NewCredentials(), + }) + + require.NoError(t, err) + require.NotNil(t, client) + + require.NoError(t, client.Close()) +} From d4f89dba7b72fb49825deb4bd7ddde26b2b0811e Mon Sep 17 00:00:00 2001 From: DimitriosNaikopoulos Date: Wed, 12 Nov 2025 17:59:48 +0000 Subject: [PATCH 2/3] CLD-769: Lint --- datastore/catalog/remote/grpc.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datastore/catalog/remote/grpc.go b/datastore/catalog/remote/grpc.go index 966415d20..d1d3fe19e 100644 --- a/datastore/catalog/remote/grpc.go +++ b/datastore/catalog/remote/grpc.go @@ -2,6 +2,7 @@ package remote import ( "context" + "errors" "fmt" "sync" "time" @@ -93,12 +94,13 @@ func (c *CatalogClient) CloseStream() error { // Close closes the underlying gRPC connection. func (c *CatalogClient) Close() error { if c.cachedStream != nil { - return fmt.Errorf("stream is not closed") + return errors.New("stream is not closed") } if c.conn != nil { return c.conn.Close() } + return nil } From 197a564d46f5bd07c1d10beda2d293d23da7e079 Mon Sep 17 00:00:00 2001 From: DimitriosNaikopoulos Date: Thu, 13 Nov 2025 15:53:05 +0000 Subject: [PATCH 3/3] CLD-769: add changeset --- .changeset/eleven-dragons-laugh.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/eleven-dragons-laugh.md diff --git a/.changeset/eleven-dragons-laugh.md b/.changeset/eleven-dragons-laugh.md new file mode 100644 index 000000000..9fa8ca8cc --- /dev/null +++ b/.changeset/eleven-dragons-laugh.md @@ -0,0 +1,5 @@ +--- +"chainlink-deployments-framework": minor +--- + +add grpc keepalive, retries and connection closure functionality