-
Notifications
You must be signed in to change notification settings - Fork 3
CLD-769: Improve catalog client reliability #573
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
55fa64f
d4f89db
197a564
43d9545
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| --- | ||
| "chainlink-deployments-framework": minor | ||
| --- | ||
|
|
||
| add grpc keepalive, retries and connection closure functionality |
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -2,15 +2,36 @@ package remote | |||||
|
|
||||||
| import ( | ||||||
| "context" | ||||||
| "errors" | ||||||
| "fmt" | ||||||
| "sync" | ||||||
| "time" | ||||||
|
|
||||||
| pb "github.com/smartcontractkit/chainlink-protos/op-catalog/v1/datastore" | ||||||
| "google.golang.org/grpc" | ||||||
| "google.golang.org/grpc/credentials" | ||||||
| "google.golang.org/grpc/keepalive" | ||||||
| "google.golang.org/protobuf/proto" | ||||||
| ) | ||||||
|
|
||||||
| const retryPolicy = `{ | ||||||
| "methodConfig": [{ | ||||||
| "name": [{"service": "op_catalog.v1.datastore.Datastore"}], | ||||||
| "retryPolicy": { | ||||||
| "maxAttempts": 5, | ||||||
| "initialBackoff": "0.1s", | ||||||
| "maxBackoff": "1s", | ||||||
| "backoffMultiplier": 2, | ||||||
| "retryableStatusCodes": [ | ||||||
| "UNAVAILABLE", | ||||||
| "DEADLINE_EXCEEDED", | ||||||
| "INTERNAL", | ||||||
| "RESOURCE_EXHAUSTED" | ||||||
| ] | ||||||
| } | ||||||
| }] | ||||||
| }` | ||||||
|
|
||||||
| type CatalogClient struct { | ||||||
| protoClient pb.DatastoreClient | ||||||
| // ctx is cached here, because we need the context that created the client, not the current | ||||||
|
|
@@ -23,6 +44,7 @@ type CatalogClient struct { | |||||
| // | ||||||
| //nolint:containedctx | ||||||
| ctx context.Context | ||||||
| conn *grpc.ClientConn | ||||||
| cachedStream grpc.BidiStreamingClient[pb.DataAccessRequest, pb.DataAccessResponse] | ||||||
| hmacConfig *HMACAuthConfig | ||||||
| streamInitOnce sync.Once | ||||||
|
|
@@ -55,6 +77,7 @@ func (c *CatalogClient) DataAccess(req proto.Message) (grpc.BidiStreamingClient[ | |||||
| return c.cachedStream, c.streamInitErr | ||||||
| } | ||||||
|
|
||||||
| // CloseStream closes the current stream. | ||||||
| func (c *CatalogClient) CloseStream() error { | ||||||
| if c.cachedStream == nil { | ||||||
| return nil | ||||||
|
|
@@ -68,6 +91,19 @@ func (c *CatalogClient) CloseStream() error { | |||||
| return nil | ||||||
| } | ||||||
|
|
||||||
| // Close closes the underlying gRPC connection. | ||||||
| func (c *CatalogClient) Close() error { | ||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just checking do we need to support concurrent call to Close or CloseStream?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think so since we only open one stream and then we close it 🤔 I don't think we can use the client concurrently because of the cacheStream
Since we are using the same cached stream wouldn't we need to be able to enforce some sort of ordering in terms of reads and writes? Just thinking if a changeset wants to update something and the use that something for something else if this can happen in parallel then we will have a configuration issue because of the data race 🤔
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. make sense, i dont think a stream can be used concurrently, each should open its own stream. |
||||||
| if c.cachedStream != nil { | ||||||
| return errors.New("stream is not closed") | ||||||
|
||||||
| return errors.New("stream is not closed") | |
| return errors.New("cannot close connection while stream is open; call CloseStream() first") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe u may have done it, we should do the same on the server side as well?
eg
grpc.NewServer(
grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionIdle: 5 * time.Minute, // close connections after idle time
MaxConnectionAge: 30 * time.Minute,
Time: 2 * time.Minute, // send keepalive pings every 2 mins
Timeout: 20 * time.Second,
}),
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: 30 * time.Second, // minimum time between client pings
PermitWithoutStream: true,
}),
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea!! Will add the same on the server side. I think I did not touch that part on the server side yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok added on the server as well https://github.com/smartcontractkit/op-catalog/pull/171
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i thought retrypolicy is only for unary connection, we are using bidirectional here, how would that work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should only apply to the initial request for the creation of the stream. Retries afterwards cannot be handled automatically. This could solve issues with some network flake that could stop the creation of the stream and therefore stop the pipeline for no reason