Skip to content

Commit 9628e84

Browse files
committed
"Add retry logic for Pub/Sub client creation and improve error handling in GCP plugin"
Signed-off-by: Osmany Montero <osmontero@icloud.com>
1 parent 6f40050 commit 9628e84

1 file changed

Lines changed: 32 additions & 5 deletions

File tree

plugins/gcp/main.go

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ type GroupModule struct {
3333
func main() {
3434
mode := plugins.GetCfg().Env.Mode
3535
if mode != "worker" {
36-
os.Exit(0)
36+
return
3737
}
3838

3939
for i := 0; i < 2*runtime.NumCPU(); i++ {
@@ -50,9 +50,36 @@ func main() {
5050
}
5151

5252
func (g *GroupModule) PullLogs() {
53-
client, err := pubsub.NewClient(g.CTX, g.ProjectID, option.WithCredentialsJSON([]byte(g.JsonKey)))
53+
54+
// Retry logic for creating client
55+
maxRetries := 3
56+
retryDelay := 2 * time.Second
57+
var client *pubsub.Client
58+
var err error
59+
60+
for retry := 0; retry < maxRetries; retry++ {
61+
client, err = pubsub.NewClient(g.CTX, g.ProjectID, option.WithCredentialsJSON([]byte(g.JsonKey)))
62+
if err == nil {
63+
break
64+
}
65+
66+
_ = catcher.Error("failed to create client, retrying", err, map[string]any{
67+
"retry": retry + 1,
68+
"maxRetries": maxRetries,
69+
"group": g.GroupName,
70+
})
71+
72+
if retry < maxRetries-1 {
73+
time.Sleep(retryDelay)
74+
// Increase delay for next retry
75+
retryDelay *= 2
76+
}
77+
}
78+
5479
if err != nil {
55-
_ = catcher.Error("failed to create client", err, map[string]any{})
80+
_ = catcher.Error("all retries failed when creating client", err, map[string]any{
81+
"group": g.GroupName,
82+
})
5683
return
5784
}
5885

@@ -61,8 +88,7 @@ func (g *GroupModule) PullLogs() {
6188
sub := client.Subscription(g.SubscriptionID)
6289

6390
for {
64-
65-
err = sub.Receive(g.CTX, func(ctx context.Context, msg *pubsub.Message) {
91+
err := sub.Receive(g.CTX, func(ctx context.Context, msg *pubsub.Message) {
6692
plugins.EnqueueLog(&plugins.Log{
6793
Id: uuid.NewString(),
6894
TenantId: defaultTenant,
@@ -77,6 +103,7 @@ func (g *GroupModule) PullLogs() {
77103

78104
if err != nil {
79105
_ = catcher.Error("failed to receive message", err, map[string]any{})
106+
time.Sleep(5 * time.Second)
80107
continue
81108
}
82109
}

0 commit comments

Comments
 (0)