Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/eleven-dragons-laugh.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink-deployments-framework": minor
---

add grpc keepalive, retries and connection closure functionality
47 changes: 47 additions & 0 deletions datastore/catalog/remote/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,36 @@ package remote

import (
"context"
"errors"
"fmt"
"sync"
"time"

pb "github.com/smartcontractkit/chainlink-protos/op-catalog/v1/datastore"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/protobuf/proto"
)

const retryPolicy = `{

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i thought retrypolicy is only for unary connection, we are using bidirectional here, how would that work?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should only apply to the initial request for the creation of the stream. Retries afterwards cannot be handled automatically. This could solve issues with some network flake that could stop the creation of the stream and therefore stop the pipeline for no reason

"methodConfig": [{
"name": [{"service": "op_catalog.v1.datastore.Datastore"}],
"retryPolicy": {
"maxAttempts": 5,
"initialBackoff": "0.1s",
"maxBackoff": "1s",
"backoffMultiplier": 2,
"retryableStatusCodes": [
"UNAVAILABLE",
"DEADLINE_EXCEEDED",
"INTERNAL",
"RESOURCE_EXHAUSTED"
]
}
}]
}`

type CatalogClient struct {
protoClient pb.DatastoreClient
// ctx is cached here, because we need the context that created the client, not the current
Expand All @@ -23,6 +44,7 @@ type CatalogClient struct {
//
//nolint:containedctx
ctx context.Context
conn *grpc.ClientConn
cachedStream grpc.BidiStreamingClient[pb.DataAccessRequest, pb.DataAccessResponse]
hmacConfig *HMACAuthConfig
streamInitOnce sync.Once
Expand Down Expand Up @@ -55,6 +77,7 @@ func (c *CatalogClient) DataAccess(req proto.Message) (grpc.BidiStreamingClient[
return c.cachedStream, c.streamInitErr
}

// CloseStream closes the current stream.
func (c *CatalogClient) CloseStream() error {
if c.cachedStream == nil {
return nil
Expand All @@ -68,6 +91,19 @@ func (c *CatalogClient) CloseStream() error {
return nil
}

// Close closes the underlying gRPC connection.
func (c *CatalogClient) Close() error {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just checking do we need to support concurrent call to Close or CloseStream?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so since we only open one stream and then we close it 🤔 I don't think we can use the client concurrently because of the cacheStream

.

Since we are using the same cached stream wouldn't we need to be able to enforce some sort of ordering in terms of reads and writes?

Just thinking if a changeset wants to update something and the use that something for something else if this can happen in parallel then we will have a configuration issue because of the data race 🤔

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense, i dont think a stream can be used concurrently, each should open its own stream.

if c.cachedStream != nil {
return errors.New("stream is not closed")

Copilot AI Nov 13, 2025

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message 'stream is not closed' could be clearer about the expected action. Consider changing it to 'cannot close connection while stream is open; call CloseStream() first' to better guide the user on how to resolve this issue.

Suggested change
return errors.New("stream is not closed")
return errors.New("cannot close connection while stream is open; call CloseStream() first")

Copilot uses AI. Check for mistakes.
}

if c.conn != nil {
return c.conn.Close()
}

return nil
}

type CatalogConfig struct {
GRPC string
Creds credentials.TransportCredentials
Expand Down Expand Up @@ -98,6 +134,7 @@ func NewCatalogClient(ctx context.Context, cfg CatalogConfig) (*CatalogClient, e
client := CatalogClient{
ctx: ctx,
hmacConfig: cfg.HMACConfig,
conn: conn,
protoClient: pb.NewDatastoreClient(conn),
}

Expand All @@ -122,6 +159,16 @@ func newCatalogConnection(cfg CatalogConfig) (*grpc.ClientConn, error) {
opts = append(opts, grpc.WithAuthority(cfg.HMACConfig.Authority))
}

// Keepalive for long-lived bidirectional streams
// Ping every 20 seconds, wait up to 10 seconds for a response
opts = append(opts, grpc.WithKeepaliveParams(keepalive.ClientParameters{

@graham-chainlink graham-chainlink Nov 14, 2025

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe u may have done it, we should do the same on the server side as well?

eg

grpc.NewServer(
    grpc.KeepaliveParams(keepalive.ServerParameters{
        MaxConnectionIdle: 5 * time.Minute, // close connections after idle time
        MaxConnectionAge:  30 * time.Minute,
        Time:              2 * time.Minute, // send keepalive pings every 2 mins
        Timeout:           20 * time.Second,
    }),
    grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
        MinTime:             30 * time.Second, // minimum time between client pings
        PermitWithoutStream: true,
    }),
)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea!! Will add the same on the server side. I think I did not touch that part on the server side yet.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Time: 20 * time.Second,
Timeout: 10 * time.Second,
PermitWithoutStream: true,
}))

opts = append(opts, grpc.WithDefaultServiceConfig(retryPolicy))

conn, err := grpc.NewClient(cfg.GRPC, opts...)
if err != nil {
return nil, err
Expand Down
14 changes: 14 additions & 0 deletions datastore/catalog/remote/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,17 @@ func TestNewCatalogClient_Success(t *testing.T) {
})
}
}

func TestCatalogClient_Close(t *testing.T) {
t.Parallel()

client, err := remote.NewCatalogClient(t.Context(), remote.CatalogConfig{
GRPC: "localhost:9090",
Creds: insecure.NewCredentials(),
})

require.NoError(t, err)
require.NotNil(t, client)

require.NoError(t, client.Close())
}
Loading