Skip to content

Commit 0b5da94

Browse files
committed
"Implement retry logic for key operations to enhance resilience and error handling in plugin initialization."
Signed-off-by: Osmany Montero <osmontero@icloud.com>
1 parent 29e20c4 commit 0b5da94

5 files changed

Lines changed: 185 additions & 58 deletions

File tree

plugins/inputs/auth.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ package main
33
import (
44
"context"
55
"crypto/tls"
6+
"fmt"
67
"github.com/threatwinds/go-sdk/catcher"
78
"github.com/threatwinds/go-sdk/plugins"
8-
"os"
99
"strings"
1010
"sync"
1111
"time"
@@ -16,7 +16,7 @@ import (
1616
"google.golang.org/grpc/metadata"
1717
)
1818

19-
const maxMessageSize = 1024 * 1024 * 1024
19+
const maxMessageSize = 20 * 1024 * 1024 // 20MB
2020

2121
type LogAuthService struct {
2222
Mutex *sync.Mutex
@@ -57,8 +57,9 @@ func (auth *LogAuthService) syncKeys(typ agent.ConnectorType) {
5757
internalKey := pConfig.Get("internalKey").String()
5858

5959
if agentManager == "" {
60-
_ = catcher.Error("agentManager config is empty", nil, nil)
61-
os.Exit(1)
60+
_ = catcher.Error("Could not sync keys. This is a common occurrence during the startup process and typically resolves on its own after a short while.", fmt.Errorf("configuration is empty"), nil)
61+
// Don't exit, just return and retry later
62+
return
6263
}
6364

6465
tlsConfig := &tls.Config{
@@ -68,7 +69,7 @@ func (auth *LogAuthService) syncKeys(typ agent.ConnectorType) {
6869
tlsCredentials := credentials.NewTLS(tlsConfig)
6970
conn, err := grpc.NewClient(agentManager, grpc.WithTransportCredentials(tlsCredentials), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMessageSize)))
7071
if err != nil {
71-
_ = catcher.Error("cannot to connect to agent manager", err, nil)
72+
_ = catcher.Error("Could not sync keys. This is a common occurrence during the startup process and typically resolves on its own after a short while.", err, nil)
7273
return
7374
}
7475
defer func() {

plugins/inputs/handlers.go

Lines changed: 77 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"github.com/threatwinds/go-sdk/utils"
88
"net"
99
"net/http"
10-
"os"
1110
"time"
1211

1312
"github.com/gin-gonic/gin"
@@ -19,6 +18,10 @@ import (
1918
)
2019

2120
func startHTTPServer(middlewares *Middlewares, cert string, key string) {
21+
// Retry logic for starting HTTP server
22+
maxRetries := 3
23+
retryDelay := 2 * time.Second
24+
2225
gin.SetMode(gin.ReleaseMode)
2326

2427
router := gin.Default()
@@ -27,10 +30,26 @@ func startHTTPServer(middlewares *Middlewares, cert string, key string) {
2730
router.GET("/v1/ping", Ping)
2831
router.GET("/v1/health", func(c *gin.Context) { c.Status(http.StatusOK) })
2932

30-
err := router.RunTLS(":8080", cert, key)
31-
if err != nil {
32-
_ = catcher.Error("failed to start http server", err, nil)
33-
os.Exit(1)
33+
for retry := 0; retry < maxRetries; retry++ {
34+
err := router.RunTLS(":8080", cert, key)
35+
if err == nil {
36+
break
37+
}
38+
39+
_ = catcher.Error("failed to start http server, retrying", err, map[string]any{
40+
"retry": retry + 1,
41+
"maxRetries": maxRetries,
42+
})
43+
44+
if retry < maxRetries-1 {
45+
time.Sleep(retryDelay)
46+
// Increase delay for next retry
47+
retryDelay *= 2
48+
} else {
49+
// If all retries failed, log the error and return
50+
_ = catcher.Error("all retries failed when starting http server", err, nil)
51+
return
52+
}
3453
}
3554
}
3655

@@ -115,11 +134,32 @@ type integration struct {
115134
plugins.UnimplementedIntegrationServer
116135
}
117136

118-
func startGRPCServer(middlewares *Middlewares, cert string, key string) {
119-
transportCredentials, err := credentials.NewServerTLSFromFile(cert, key)
120-
if err != nil {
121-
_ = catcher.Error("failed to create credentials", err, nil)
122-
os.Exit(1)
137+
func startGRPCServer(middlewares *Middlewares, cert string, key string) error {
138+
// Retry logic for creating credentials
139+
maxRetries := 3
140+
retryDelay := 2 * time.Second
141+
var transportCredentials credentials.TransportCredentials
142+
var err error
143+
144+
for retry := 0; retry < maxRetries; retry++ {
145+
transportCredentials, err = credentials.NewServerTLSFromFile(cert, key)
146+
if err == nil {
147+
break
148+
}
149+
150+
_ = catcher.Error("failed to create credentials, retrying", err, map[string]any{
151+
"retry": retry + 1,
152+
"maxRetries": maxRetries,
153+
})
154+
155+
if retry < maxRetries-1 {
156+
time.Sleep(retryDelay)
157+
// Increase delay for next retry
158+
retryDelay *= 2
159+
} else {
160+
// If all retries failed, log the error and return
161+
return catcher.Error("all retries failed when creating credentials", err, nil)
162+
}
123163
}
124164

125165
server := grpc.NewServer(
@@ -135,16 +175,37 @@ func startGRPCServer(middlewares *Middlewares, cert string, key string) {
135175
grpcHealth.RegisterHealthServer(server, healthServer)
136176
healthServer.SetServingStatus("", grpcHealth.HealthCheckResponse_SERVING)
137177

138-
listener, err := net.Listen("tcp", "0.0.0.0:50051")
139-
if err != nil {
140-
_ = catcher.Error("failed to listen to grpc", err, nil)
141-
os.Exit(1)
178+
// Retry logic for listening to gRPC
179+
retryDelay = 2 * time.Second
180+
var listener net.Listener
181+
182+
for retry := 0; retry < maxRetries; retry++ {
183+
listener, err = net.Listen("tcp", "0.0.0.0:50051")
184+
if err == nil {
185+
break
186+
}
187+
188+
_ = catcher.Error("failed to listen to grpc, retrying", err, map[string]any{
189+
"retry": retry + 1,
190+
"maxRetries": maxRetries,
191+
})
192+
193+
if retry < maxRetries-1 {
194+
time.Sleep(retryDelay)
195+
// Increase delay for next retry
196+
retryDelay *= 2
197+
} else {
198+
// If all retries failed, log the error and return
199+
return catcher.Error("all retries failed when listening to grpc", err, nil)
200+
}
142201
}
143202

203+
// Serve with error handling
144204
if err := server.Serve(listener); err != nil {
145-
_ = catcher.Error("failed to serve grpc", err, nil)
146-
os.Exit(1)
205+
return catcher.Error("failed to serve grpc", err, nil)
147206
}
207+
208+
return nil
148209
}
149210

150211
func (i *integration) ProcessLog(srv plugins.Integration_ProcessLogServer) error {

plugins/inputs/health.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package main
33
import (
44
"context"
55
"crypto/tls"
6+
"fmt"
7+
"github.com/threatwinds/go-sdk/catcher"
68
"github.com/threatwinds/go-sdk/plugins"
79
"time"
810

@@ -12,6 +14,8 @@ import (
1214
"google.golang.org/grpc/metadata"
1315
)
1416

17+
const healthMaxMessageSize = 5 * 1024 * 1024 // 5MB
18+
1519
func CheckAgentManagerHealth() {
1620
tlsConfig := &tls.Config{
1721
InsecureSkipVerify: true,
@@ -24,11 +28,14 @@ func CheckAgentManagerHealth() {
2428
internalKey := pConfig.Get("internalKey").String()
2529

2630
if agentManager == "" {
27-
panic("agentManager config is empty")
31+
_ = catcher.Error("Could not connect to the Agent Manager. This is a common occurrence during the startup process and typically resolves on its own after a short while.", fmt.Errorf("configuration is empty"), nil)
32+
time.Sleep(5 * time.Second)
33+
continue
2834
}
2935

30-
conn, err := grpc.NewClient(agentManager, grpc.WithTransportCredentials(tlsCredentials), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMessageSize)))
36+
conn, err := grpc.NewClient(agentManager, grpc.WithTransportCredentials(tlsCredentials), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(healthMaxMessageSize)))
3137
if err != nil {
38+
_ = catcher.Error("Could not connect to the Agent Manager. This is a common occurrence during the startup process and typically resolves on its own after a short while.", err, nil)
3239
time.Sleep(5 * time.Second)
3340
continue
3441
}
@@ -43,6 +50,7 @@ func CheckAgentManagerHealth() {
4350
if err != nil {
4451
cancel()
4552
_ = conn.Close()
53+
_ = catcher.Error("Could not connect to the Agent Manager. This is a common occurrence during the startup process and typically resolves on its own after a short while.", err, nil)
4654
time.Sleep(5 * time.Second)
4755
continue
4856
}
@@ -52,9 +60,5 @@ func CheckAgentManagerHealth() {
5260
_ = conn.Close()
5361
break
5462
}
55-
56-
cancel()
57-
_ = conn.Close()
58-
time.Sleep(5 * time.Second)
5963
}
6064
}

plugins/inputs/main.go

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/threatwinds/go-sdk/utils"
88
"os"
99
"runtime"
10+
"time"
1011
)
1112

1213
const defaultTenant string = "ce66672c-e36d-4761-a8c8-90058fee1a24"
@@ -16,20 +17,44 @@ var localLogsChannel chan *plugins.Log
1617
func main() {
1718
mode := plugins.GetCfg().Env.Mode
1819
if mode != "worker" {
19-
os.Exit(0)
20+
return
2021
}
2122

2223
CheckAgentManagerHealth()
2324

2425
autService := NewLogAuthService()
25-
go autService.SyncAuth()
26+
go func() {
27+
autService.SyncAuth()
28+
}()
2629

2730
middlewares := NewMiddlewares(autService)
2831

29-
cert, key, err := loadCerts()
30-
if err != nil {
31-
_ = catcher.Error("cannot load certificates", err, nil)
32-
os.Exit(1)
32+
// Retry logic for loading certificates
33+
maxRetries := 3
34+
retryDelay := 2 * time.Second
35+
var cert, key string
36+
var err error
37+
38+
for retry := 0; retry < maxRetries; retry++ {
39+
cert, key, err = loadCerts()
40+
if err == nil {
41+
break
42+
}
43+
44+
_ = catcher.Error("cannot load certificates, retrying", err, map[string]any{
45+
"retry": retry + 1,
46+
"maxRetries": maxRetries,
47+
})
48+
49+
if retry < maxRetries-1 {
50+
time.Sleep(retryDelay)
51+
// Increase delay for next retry
52+
retryDelay *= 2
53+
} else {
54+
// If all retries failed, log the error and return
55+
_ = catcher.Error("all retries failed when loading certificates", err, nil)
56+
return
57+
}
3358
}
3459

3560
cpu := runtime.NumCPU()
@@ -41,7 +66,7 @@ func main() {
4166
}
4267

4368
go startHTTPServer(middlewares, cert, key)
44-
startGRPCServer(middlewares, cert, key)
69+
_ = startGRPCServer(middlewares, cert, key)
4570
}
4671

4772
func loadCerts() (string, string, error) {

0 commit comments

Comments
 (0)