Skip to content

Commit d831110

Browse files
committed
"Add retry logic for socket initialization, OpenSearch connection, and document indexing with error handling and exponential backoff."
Signed-off-by: Osmany Montero <osmontero@icloud.com>
1 parent 5ea606c commit d831110

1 file changed

Lines changed: 104 additions & 22 deletions

File tree

plugins/stats/main.go

Lines changed: 104 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -35,25 +35,60 @@ var successLock sync.Mutex
3535
func main() {
3636
ctx, cancel := context.WithCancel(context.Background())
3737

38-
filePath, err := utils.MkdirJoin(plugins.WorkDir, "sockets")
39-
if err != nil {
40-
_ = catcher.Error("cannot create directory", err, nil)
41-
os.Exit(1)
42-
}
38+
// Retry logic for initialization
39+
var filePath utils.Folder
40+
var err error
41+
var socketPath string
42+
var unixAddress *net.UnixAddr
43+
var listener *net.UnixListener
44+
45+
// Retry logic for creating socket directory
46+
maxRetries := 10
47+
retryDelay := 5 * time.Second
48+
49+
for retry := 0; retry < maxRetries; retry++ {
50+
filePath, err = utils.MkdirJoin(plugins.WorkDir, "sockets")
51+
if err != nil {
52+
_ = catcher.Error("cannot create directory, retrying", err, map[string]any{
53+
"retry": retry + 1,
54+
"maxRetries": maxRetries,
55+
})
56+
time.Sleep(retryDelay)
57+
continue
58+
}
59+
60+
socketPath = filePath.FileJoin("com.utmstack.stats_notification.sock")
61+
_ = os.Remove(socketPath)
4362

44-
socketPath := filePath.FileJoin("com.utmstack.stats_notification.sock")
45-
_ = os.Remove(socketPath)
63+
unixAddress, err = net.ResolveUnixAddr("unix", socketPath)
64+
if err != nil {
65+
_ = catcher.Error("cannot resolve unix address, retrying", err, map[string]any{
66+
"retry": retry + 1,
67+
"maxRetries": maxRetries,
68+
})
69+
time.Sleep(retryDelay)
70+
continue
71+
}
4672

47-
unixAddress, err := net.ResolveUnixAddr("unix", socketPath)
73+
listener, err = net.ListenUnix("unix", unixAddress)
74+
if err != nil {
75+
_ = catcher.Error("cannot listen to unix socket, retrying", err, map[string]any{
76+
"retry": retry + 1,
77+
"maxRetries": maxRetries,
78+
})
79+
time.Sleep(retryDelay)
80+
continue
81+
}
4882

49-
if err != nil {
50-
_ = catcher.Error("cannot resolve unix address", err, nil)
51-
os.Exit(1)
83+
// If we got here, initialization was successful
84+
break
5285
}
5386

54-
listener, err := net.ListenUnix("unix", unixAddress)
55-
if err != nil {
56-
_ = catcher.Error("cannot listen to unix socket", err, nil)
87+
// If all retries failed, log a final error and exit
88+
if listener == nil {
89+
_ = catcher.Error("all retries failed when initializing socket", nil, map[string]any{
90+
"maxRetries": maxRetries,
91+
})
5792
os.Exit(1)
5893
}
5994

@@ -67,8 +102,29 @@ func main() {
67102
pCfg := plugins.PluginCfg("com.utmstack", false)
68103
osUrl := pCfg.Get("opensearch").String()
69104

70-
if err := opensearch.Connect([]string{osUrl}); err != nil {
71-
_ = catcher.Error("cannot connect to ElasticSearch/OpenSearch", err, nil)
105+
// Retry logic for connecting to OpenSearch
106+
maxOSRetries := 10
107+
osRetryDelay := 5 * time.Second
108+
var osConnected bool
109+
110+
for retry := 0; retry < maxOSRetries; retry++ {
111+
err := opensearch.Connect([]string{osUrl})
112+
if err == nil {
113+
osConnected = true
114+
break
115+
}
116+
_ = catcher.Error("cannot connect to ElasticSearch/OpenSearch, retrying", err, map[string]any{
117+
"retry": retry + 1,
118+
"maxRetries": maxOSRetries,
119+
})
120+
time.Sleep(osRetryDelay)
121+
}
122+
123+
// If all retries failed, log a final error and exit
124+
if !osConnected {
125+
_ = catcher.Error("all retries failed when connecting to OpenSearch", nil, map[string]any{
126+
"maxRetries": maxOSRetries,
127+
})
72128
os.Exit(1)
73129
}
74130

@@ -79,7 +135,7 @@ func main() {
79135
defer wg.Done()
80136
if err := grpcServer.Serve(listener); err != nil {
81137
_ = catcher.Error("cannot serve grpc", err, nil)
82-
os.Exit(1)
138+
// Instead of exiting, just log the error and let the main function handle it
83139
}
84140
}()
85141

@@ -109,6 +165,8 @@ func main() {
109165

110166
grpcServer.GracefulStop()
111167
cancel()
168+
169+
wg.Wait()
112170
}
113171

114172
func (p *notificationServer) Notify(_ context.Context, msg *plugins.Message) (*emptypb.Empty, error) {
@@ -270,11 +328,35 @@ func sendStatistic(t string) {
270328
}
271329

272330
func saveToOpenSearch[Data any](data Data) {
273-
oCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
274-
defer cancel()
331+
// Retry logic for indexing document
332+
maxRetries := 3
333+
retryDelay := 2 * time.Second
275334

276-
err := opensearch.IndexDoc(oCtx, &data, fmt.Sprintf("v11-statistics-%s", time.Now().UTC().Format("2006.01")), uuid.NewString())
277-
if err != nil {
278-
_ = catcher.Error("cannot index document", err, map[string]any{})
335+
for retry := 0; retry < maxRetries; retry++ {
336+
oCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
337+
338+
err := opensearch.IndexDoc(oCtx, &data, fmt.Sprintf("v11-statistics-%s", time.Now().UTC().Format("2006.01")), uuid.NewString())
339+
cancel()
340+
341+
if err == nil {
342+
// Successfully indexed document
343+
return
344+
}
345+
346+
_ = catcher.Error("cannot index document, retrying", err, map[string]any{
347+
"retry": retry + 1,
348+
"maxRetries": maxRetries,
349+
})
350+
351+
if retry < maxRetries-1 {
352+
time.Sleep(retryDelay)
353+
// Increase delay for next retry (exponential backoff)
354+
retryDelay *= 2
355+
}
279356
}
357+
358+
// After all retries, log a final error
359+
_ = catcher.Error("all retries failed when indexing document", nil, map[string]any{
360+
"maxRetries": maxRetries,
361+
})
280362
}

0 commit comments

Comments
 (0)