Skip to content

Commit 6f40050

Browse files
committed
"Implement retry logic for critical operations: socket initialization, OpenSearch connection, and gRPC serving to enhance resilience."
Signed-off-by: Osmany Montero <osmontero@icloud.com>
1 parent ee23785 commit 6f40050

2 files changed

Lines changed: 117 additions & 20 deletions

File tree

plugins/events/main.go

Lines changed: 83 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"io"
88
"net"
99
"os"
10+
"time"
1011

1112
"google.golang.org/grpc"
1213
)
@@ -16,35 +17,103 @@ type analysisServer struct {
1617
}
1718

1819
func main() {
19-
filePath, err := utils.MkdirJoin(plugins.WorkDir, "sockets")
20-
if err != nil {
21-
_ = catcher.Error("cannot create socket directory", err, nil)
22-
os.Exit(1)
20+
// Retry logic for initialization
21+
maxRetries := 3
22+
retryDelay := 2 * time.Second
23+
24+
// Initialize with retry logic instead of exiting
25+
var filePath utils.Folder
26+
var err error
27+
var socketPath string
28+
var unixAddress *net.UnixAddr
29+
var listener *net.UnixListener
30+
31+
// Retry loop for creating socket directory
32+
for retry := 0; retry < maxRetries; retry++ {
33+
filePath, err = utils.MkdirJoin(plugins.WorkDir, "sockets")
34+
if err == nil {
35+
break
36+
}
37+
38+
_ = catcher.Error("cannot create socket directory, retrying", err, map[string]any{
39+
"retry": retry + 1,
40+
"maxRetries": maxRetries,
41+
})
42+
43+
if retry < maxRetries-1 {
44+
time.Sleep(retryDelay)
45+
// Increase delay for next retry
46+
retryDelay *= 2
47+
} else {
48+
// If all retries failed, log the error and return
49+
_ = catcher.Error("all retries failed when creating socket directory", err, nil)
50+
return
51+
}
2352
}
2453

25-
socketPath := filePath.FileJoin("com.utmstack.events_analysis.sock")
54+
socketPath = filePath.FileJoin("com.utmstack.events_analysis.sock")
2655
_ = os.Remove(socketPath)
2756

28-
unixAddress, err := net.ResolveUnixAddr("unix", socketPath)
29-
if err != nil {
30-
_ = catcher.Error("cannot resolve unix address", err, nil)
31-
os.Exit(1)
57+
// Retry loop for resolving unix address
58+
retryDelay = 2 * time.Second
59+
for retry := 0; retry < maxRetries; retry++ {
60+
unixAddress, err = net.ResolveUnixAddr("unix", socketPath)
61+
if err == nil {
62+
break
63+
}
64+
65+
_ = catcher.Error("cannot resolve unix address, retrying", err, map[string]any{
66+
"retry": retry + 1,
67+
"maxRetries": maxRetries,
68+
})
69+
70+
if retry < maxRetries-1 {
71+
time.Sleep(retryDelay)
72+
// Increase delay for next retry
73+
retryDelay *= 2
74+
} else {
75+
// If all retries failed, log the error and return
76+
_ = catcher.Error("all retries failed when resolving unix address", err, nil)
77+
return
78+
}
3279
}
3380

3481
startQueue()
3582

36-
listener, err := net.ListenUnix("unix", unixAddress)
37-
if err != nil {
38-
_ = catcher.Error("cannot listen to unix socket", err, nil)
39-
os.Exit(1)
83+
// Retry loop for listening to unix socket
84+
retryDelay = 2 * time.Second
85+
for retry := 0; retry < maxRetries; retry++ {
86+
listener, err = net.ListenUnix("unix", unixAddress)
87+
if err == nil {
88+
break
89+
}
90+
91+
_ = catcher.Error("cannot listen to unix socket, retrying", err, map[string]any{
92+
"retry": retry + 1,
93+
"maxRetries": maxRetries,
94+
})
95+
96+
if retry < maxRetries-1 {
97+
time.Sleep(retryDelay)
98+
// Increase delay for next retry
99+
retryDelay *= 2
100+
} else {
101+
// If all retries failed, log the error and return
102+
_ = catcher.Error("all retries failed when listening to unix socket", err, nil)
103+
return
104+
}
40105
}
41106

42107
grpcServer := grpc.NewServer()
43108
plugins.RegisterAnalysisServer(grpcServer, &analysisServer{})
44109

110+
// Serve with error handling
45111
if err := grpcServer.Serve(listener); err != nil {
46112
_ = catcher.Error("cannot serve grpc", err, nil)
47-
os.Exit(1)
113+
// Instead of exiting, restart the main function
114+
time.Sleep(5 * time.Second)
115+
go main()
116+
return
48117
}
49118
}
50119

plugins/events/queue.go

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ package main
22

33
import (
44
"context"
5+
"fmt"
56
"github.com/threatwinds/go-sdk/catcher"
67
"github.com/threatwinds/go-sdk/plugins"
7-
"os"
88
"runtime"
99
"sync"
1010
"time"
@@ -16,16 +16,44 @@ import (
1616
var logs = make(chan string, 100*runtime.NumCPU())
1717

1818
func addToQueue(l string) {
19+
if len(logs) >= 100*runtime.NumCPU() {
20+
_ = catcher.Error("cannot enqueue log", fmt.Errorf("queue is full"), map[string]any{
21+
"queue": "logs",
22+
})
23+
24+
return
25+
}
26+
1927
logs <- l
2028
}
2129

2230
func startQueue() {
23-
osUrl := plugins.PluginCfg("com.utmstack", false).Get("opensearch").String()
31+
// Retry logic for connecting to OpenSearch
32+
maxRetries := 3
33+
retryDelay := 2 * time.Second
34+
35+
for retry := 0; retry < maxRetries; retry++ {
36+
osUrl := plugins.PluginCfg("com.utmstack", false).Get("opensearch").String()
37+
38+
err := opensearch.Connect([]string{osUrl})
39+
if err == nil {
40+
break
41+
}
42+
43+
_ = catcher.Error("cannot connect to OpenSearch, retrying", err, map[string]any{
44+
"retry": retry + 1,
45+
"maxRetries": maxRetries,
46+
})
2447

25-
err := opensearch.Connect([]string{osUrl})
26-
if err != nil {
27-
_ = catcher.Error("cannot connect to OpenSearch", err, nil)
28-
os.Exit(1)
48+
if retry < maxRetries-1 {
49+
time.Sleep(retryDelay)
50+
// Increase delay for next retry
51+
retryDelay *= 2
52+
} else {
53+
// If all retries failed, log the error and return
54+
_ = catcher.Error("all retries failed when connecting to OpenSearch", err, nil)
55+
return
56+
}
2957
}
3058

3159
numCPU := runtime.NumCPU() * 2

0 commit comments

Comments
 (0)