diff --git a/.changeset/eleven-dragons-laugh.md b/.changeset/eleven-dragons-laugh.md new file mode 100644 index 000000000..9fa8ca8cc --- /dev/null +++ b/.changeset/eleven-dragons-laugh.md @@ -0,0 +1,5 @@ +--- +"chainlink-deployments-framework": minor +--- + +add grpc keepalive, retries and connection closure functionality diff --git a/datastore/catalog/remote/grpc.go b/datastore/catalog/remote/grpc.go index ecb75fc04..d1d3fe19e 100644 --- a/datastore/catalog/remote/grpc.go +++ b/datastore/catalog/remote/grpc.go @@ -2,15 +2,36 @@ package remote import ( "context" + "errors" "fmt" "sync" + "time" pb "github.com/smartcontractkit/chainlink-protos/op-catalog/v1/datastore" "google.golang.org/grpc" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/keepalive" "google.golang.org/protobuf/proto" ) +const retryPolicy = `{ + "methodConfig": [{ + "name": [{"service": "op_catalog.v1.datastore.Datastore"}], + "retryPolicy": { + "maxAttempts": 5, + "initialBackoff": "0.1s", + "maxBackoff": "1s", + "backoffMultiplier": 2, + "retryableStatusCodes": [ + "UNAVAILABLE", + "DEADLINE_EXCEEDED", + "INTERNAL", + "RESOURCE_EXHAUSTED" + ] + } + }] +}` + type CatalogClient struct { protoClient pb.DatastoreClient // ctx is cached here, because we need the context that created the client, not the current @@ -23,6 +44,7 @@ type CatalogClient struct { // //nolint:containedctx ctx context.Context + conn *grpc.ClientConn cachedStream grpc.BidiStreamingClient[pb.DataAccessRequest, pb.DataAccessResponse] hmacConfig *HMACAuthConfig streamInitOnce sync.Once @@ -55,6 +77,7 @@ func (c *CatalogClient) DataAccess(req proto.Message) (grpc.BidiStreamingClient[ return c.cachedStream, c.streamInitErr } +// CloseStream closes the current stream. func (c *CatalogClient) CloseStream() error { if c.cachedStream == nil { return nil @@ -68,6 +91,19 @@ func (c *CatalogClient) CloseStream() error { return nil } +// Close closes the underlying gRPC connection. +func (c *CatalogClient) Close() error { + if c.cachedStream != nil { + return errors.New("stream is not closed") + } + + if c.conn != nil { + return c.conn.Close() + } + + return nil +} + type CatalogConfig struct { GRPC string Creds credentials.TransportCredentials @@ -98,6 +134,7 @@ func NewCatalogClient(ctx context.Context, cfg CatalogConfig) (*CatalogClient, e client := CatalogClient{ ctx: ctx, hmacConfig: cfg.HMACConfig, + conn: conn, protoClient: pb.NewDatastoreClient(conn), } @@ -122,6 +159,16 @@ func newCatalogConnection(cfg CatalogConfig) (*grpc.ClientConn, error) { opts = append(opts, grpc.WithAuthority(cfg.HMACConfig.Authority)) } + // Keepalive for long-lived bidirectional streams + // Ping every 20 seconds, wait up to 10 seconds for a response + opts = append(opts, grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: 20 * time.Second, + Timeout: 10 * time.Second, + PermitWithoutStream: true, + })) + + opts = append(opts, grpc.WithDefaultServiceConfig(retryPolicy)) + conn, err := grpc.NewClient(cfg.GRPC, opts...) if err != nil { return nil, err diff --git a/datastore/catalog/remote/grpc_test.go b/datastore/catalog/remote/grpc_test.go index dde5f4dae..e3288f9d5 100644 --- a/datastore/catalog/remote/grpc_test.go +++ b/datastore/catalog/remote/grpc_test.go @@ -70,3 +70,17 @@ func TestNewCatalogClient_Success(t *testing.T) { }) } } + +func TestCatalogClient_Close(t *testing.T) { + t.Parallel() + + client, err := remote.NewCatalogClient(t.Context(), remote.CatalogConfig{ + GRPC: "localhost:9090", + Creds: insecure.NewCredentials(), + }) + + require.NoError(t, err) + require.NotNil(t, client) + + require.NoError(t, client.Close()) +}