Skip to content

Commit 9f21d6b

Browse files
committed
refactor(plugins): simplify plugin initialization and improve error handling
1 parent a18cfb9 commit 9f21d6b

File tree

6 files changed

+38
-223
lines changed

6 files changed

+38
-223
lines changed

plugins/alerts/main.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package main
33
import (
44
"context"
55
"fmt"
6+
"os"
67
"strings"
78
"time"
89

@@ -58,9 +59,16 @@ func main() {
5859
err := sdkos.Connect([]string{openSearchUrl}, "", "")
5960
if err != nil {
6061
_ = catcher.Error("cannot connect to OpenSearch", err, map[string]any{"process": "plugin_com.utmstack.alerts"})
62+
os.Exit(1)
6163
}
6264

63-
_ = plugins.InitCorrelationPlugin("com.utmstack.alerts", correlate)
65+
err = plugins.InitCorrelationPlugin("com.utmstack.alerts", correlate)
66+
if err != nil {
67+
_ = catcher.Error("com.utmstack.alerts", err, map[string]any{
68+
"process": "plugin_com.utmstack.alerts",
69+
})
70+
os.Exit(1)
71+
}
6472
}
6573

6674
func correlate(_ context.Context,

plugins/events/main.go

Lines changed: 6 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -2,126 +2,26 @@ package main
22

33
import (
44
"io"
5-
"net"
65
"os"
7-
"time"
86

97
"github.com/threatwinds/go-sdk/catcher"
108
"github.com/threatwinds/go-sdk/plugins"
119
"github.com/threatwinds/go-sdk/utils"
12-
13-
"google.golang.org/grpc"
1410
)
1511

16-
type analysisServer struct {
17-
plugins.UnimplementedAnalysisServer
18-
}
19-
2012
func main() {
21-
// Retry logic for initialization
22-
maxRetries := 3
23-
retryDelay := 2 * time.Second
24-
25-
// Initialize with retry logic instead of exiting
26-
var filePath utils.Folder
27-
var err error
28-
var socketPath string
29-
var unixAddress *net.UnixAddr
30-
var listener *net.UnixListener
31-
32-
// Retry loop for creating socket directory
33-
for retry := 0; retry < maxRetries; retry++ {
34-
filePath, err = utils.MkdirJoin(plugins.WorkDir, "sockets")
35-
if err == nil {
36-
break
37-
}
38-
39-
_ = catcher.Error("cannot create socket directory, retrying", err, map[string]any{
40-
"process": "plugin_com.utmstack.events",
41-
"retry": retry + 1,
42-
"maxRetries": maxRetries,
43-
})
44-
45-
if retry < maxRetries-1 {
46-
time.Sleep(retryDelay)
47-
// Increase delay for next retry
48-
retryDelay *= 2
49-
} else {
50-
// If all retries failed, log the error and return
51-
_ = catcher.Error("all retries failed when creating socket directory", err, map[string]any{"process": "plugin_com.utmstack.events"})
52-
return
53-
}
54-
}
55-
56-
socketPath = filePath.FileJoin("com.utmstack.events_analysis.sock")
57-
_ = os.Remove(socketPath)
58-
59-
// Retry loop for resolving unix address
60-
retryDelay = 2 * time.Second
61-
for retry := 0; retry < maxRetries; retry++ {
62-
unixAddress, err = net.ResolveUnixAddr("unix", socketPath)
63-
if err == nil {
64-
break
65-
}
66-
67-
_ = catcher.Error("cannot resolve unix address, retrying", err, map[string]any{
68-
"process": "plugin_com.utmstack.events",
69-
"retry": retry + 1,
70-
"maxRetries": maxRetries,
71-
})
72-
73-
if retry < maxRetries-1 {
74-
time.Sleep(retryDelay)
75-
// Increase delay for next retry
76-
retryDelay *= 2
77-
} else {
78-
// If all retries failed, log the error and return
79-
_ = catcher.Error("all retries failed when resolving unix address", err, map[string]any{"process": "plugin_com.utmstack.events"})
80-
return
81-
}
82-
}
83-
8413
startQueue()
8514

86-
// Retry loop for listening to unix socket
87-
retryDelay = 2 * time.Second
88-
for retry := 0; retry < maxRetries; retry++ {
89-
listener, err = net.ListenUnix("unix", unixAddress)
90-
if err == nil {
91-
break
92-
}
93-
94-
_ = catcher.Error("cannot listen to unix socket, retrying", err, map[string]any{
95-
"process": "plugin_com.utmstack.events",
96-
"retry": retry + 1,
97-
"maxRetries": maxRetries,
15+
err := plugins.InitAnalysisPlugin("com.utmstack.events", analyze)
16+
if err != nil {
17+
_ = catcher.Error("failed to start analysis plugin", err, map[string]any{
18+
"process": "plugin_com.utmstack.events",
9819
})
99-
100-
if retry < maxRetries-1 {
101-
time.Sleep(retryDelay)
102-
// Increase delay for next retry
103-
retryDelay *= 2
104-
} else {
105-
// If all retries failed, log the error and return
106-
_ = catcher.Error("all retries failed when listening to unix socket", err, map[string]any{"process": "plugin_com.utmstack.events"})
107-
return
108-
}
109-
}
110-
111-
grpcServer := grpc.NewServer()
112-
plugins.RegisterAnalysisServer(grpcServer, &analysisServer{})
113-
114-
// Serve with error handling
115-
if err := grpcServer.Serve(listener); err != nil {
116-
_ = catcher.Error("cannot serve grpc", err, map[string]any{"process": "plugin_com.utmstack.events"})
117-
// Instead of exiting, restart the main function
118-
time.Sleep(5 * time.Second)
119-
go main()
120-
return
20+
os.Exit(1)
12121
}
12222
}
12323

124-
func (p *analysisServer) Analyze(event *plugins.Event, _ plugins.Analysis_AnalyzeServer) error {
24+
func analyze(event *plugins.Event, _ plugins.Analysis_AnalyzeServer) error {
12525
jLog, err := utils.ProtoMessageToString(event)
12626
if err != nil {
12727
return catcher.Error("cannot convert event to json", err, map[string]any{"process": "plugin_com.utmstack.events"})

plugins/geolocation/bases.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@ import (
1313

1414
// loadGeolocationData loads the geolocation files and populates the maps
1515
func loadGeolocationData() {
16-
// Get the geolocation directory from environment variable or use default
17-
workdir := plugins.WorkDir
18-
geoDir, err := utils.MkdirJoin(workdir, "geolocation")
16+
geoDir, err := utils.MkdirJoin(plugins.WorkDir, "geolocation")
1917
if err != nil {
2018
_ = catcher.Error("could not create geolocation directory", err, map[string]any{"process": "plugin_com.utmstack.geolocation"})
2119
return

plugins/geolocation/main.go

Lines changed: 7 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -2,129 +2,29 @@ package main
22

33
import (
44
"context"
5-
"net"
65
"os"
7-
"time"
86

97
"github.com/threatwinds/go-sdk/catcher"
108
"github.com/threatwinds/go-sdk/plugins"
119
"github.com/threatwinds/go-sdk/utils"
1210

1311
"github.com/tidwall/gjson"
1412
"github.com/tidwall/sjson"
15-
"google.golang.org/grpc"
1613
)
1714

18-
type parsingServer struct {
19-
plugins.UnimplementedParsingServer
20-
}
21-
2215
func main() {
23-
// Retry logic for creating socket directory
24-
maxRetries := 3
25-
retryDelay := 2 * time.Second
26-
27-
var filePath utils.Folder
28-
var err error
29-
30-
for retry := 0; retry < maxRetries; retry++ {
31-
filePath, err = utils.MkdirJoin(plugins.WorkDir, "sockets")
32-
if err == nil {
33-
break
34-
}
35-
36-
_ = catcher.Error("cannot create directory, retrying", err, map[string]any{
37-
"process": "plugin_com.utmstack.geolocation",
38-
"retry": retry + 1,
39-
"maxRetries": maxRetries,
40-
})
41-
42-
if retry < maxRetries-1 {
43-
time.Sleep(retryDelay)
44-
// Increase delay for next retry
45-
retryDelay *= 2
46-
} else {
47-
// If all retries failed, log the error and return
48-
_ = catcher.Error("all retries failed when creating directory", err, map[string]any{"process": "plugin_com.utmstack.geolocation"})
49-
return
50-
}
51-
}
52-
53-
socketPath := filePath.FileJoin("com.utmstack.geolocation_parsing.sock")
54-
_ = os.Remove(socketPath)
55-
56-
// Retry logic for resolving unix address
57-
retryDelay = 2 * time.Second
58-
var unixAddress *net.UnixAddr
59-
60-
for retry := 0; retry < maxRetries; retry++ {
61-
unixAddress, err = net.ResolveUnixAddr("unix", socketPath)
62-
if err == nil {
63-
break
64-
}
65-
66-
_ = catcher.Error("cannot resolve unix address, retrying", err, map[string]any{
67-
"process": "plugin_com.utmstack.geolocation",
68-
"retry": retry + 1,
69-
"maxRetries": maxRetries,
70-
})
71-
72-
if retry < maxRetries-1 {
73-
time.Sleep(retryDelay)
74-
// Increase delay for next retry
75-
retryDelay *= 2
76-
} else {
77-
// If all retries failed, log the error and return
78-
_ = catcher.Error("all retries failed when resolving unix address", err, map[string]any{"process": "plugin_com.utmstack.geolocation"})
79-
return
80-
}
81-
}
82-
83-
// Retry logic for listening to unix socket
84-
retryDelay = 2 * time.Second
85-
var listener *net.UnixListener
86-
87-
for retry := 0; retry < maxRetries; retry++ {
88-
listener, err = net.ListenUnix("unix", unixAddress)
89-
if err == nil {
90-
break
91-
}
92-
93-
_ = catcher.Error("cannot listen to unix socket, retrying", err, map[string]any{
94-
"process": "plugin_com.utmstack.geolocation",
95-
"retry": retry + 1,
96-
"maxRetries": maxRetries,
97-
})
98-
99-
if retry < maxRetries-1 {
100-
time.Sleep(retryDelay)
101-
// Increase delay for next retry
102-
retryDelay *= 2
103-
} else {
104-
// If all retries failed, log the error and return
105-
_ = catcher.Error("all retries failed when listening to unix socket", err, map[string]any{"process": "plugin_com.utmstack.geolocation"})
106-
return
107-
}
108-
}
109-
110-
grpcServer := grpc.NewServer()
111-
plugins.RegisterParsingServer(grpcServer, &parsingServer{})
112-
11316
go loadGeolocationData()
11417

115-
// Serve with error handling and retry logic
116-
for {
117-
err := grpcServer.Serve(listener)
118-
if err == nil {
119-
break
120-
}
121-
122-
_ = catcher.Error("cannot serve grpc, retrying", err, map[string]any{"process": "plugin_com.utmstack.geolocation"})
123-
time.Sleep(5 * time.Second)
18+
err := plugins.InitParsingPlugin("com.utmstack.geolocation", parseLog)
19+
if err != nil {
20+
_ = catcher.Error("com.utmstack.geolocation", err, map[string]any{
21+
"process": "plugin_com.utmstack.geolocation",
22+
})
23+
os.Exit(1)
12424
}
12525
}
12626

127-
func (p *parsingServer) ParseLog(_ context.Context, transform *plugins.Transform) (*plugins.Draft, error) {
27+
func parseLog(_ context.Context, transform *plugins.Transform) (*plugins.Draft, error) {
12828
source, ok := transform.Step.Dynamic.Params["source"]
12929
if !ok {
13030
return transform.Draft, catcher.Error("'source' parameter required", nil, map[string]any{"process": "plugin_com.utmstack.geolocation"})

plugins/soc-ai/main.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package main
22

33
import (
44
"context"
5+
"os"
56
"time"
67

78
"github.com/threatwinds/go-sdk/catcher"
@@ -11,10 +12,6 @@ import (
1112
"google.golang.org/protobuf/types/known/emptypb"
1213
)
1314

14-
type socAiServer struct {
15-
plugins.UnimplementedCorrelationServer
16-
}
17-
1815
func main() {
1916
utils.Logger.Info("Starting soc-ai plugin...")
2017

@@ -23,7 +20,13 @@ func main() {
2320
time.Sleep(2 * time.Second)
2421
InitializeQueue()
2522

26-
_ = plugins.InitCorrelationPlugin("com.utmstack.soc-ai", correlate)
23+
err := plugins.InitCorrelationPlugin("com.utmstack.soc-ai", correlate)
24+
if err != nil {
25+
_ = catcher.Error("failed to start correlation plugin", err, map[string]any{
26+
"process": "plugin_com.utmstack.soc-ai",
27+
})
28+
os.Exit(1)
29+
}
2730
}
2831

2932
func correlate(_ context.Context,

plugins/stats/main.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,13 @@ func main() {
5454
saveToDB(ctx, "success")
5555
}()
5656

57-
_ = plugins.InitNotificationPlugin("com.utmstack.stats", notify)
57+
err = plugins.InitNotificationPlugin("com.utmstack.stats", notify)
58+
if err != nil {
59+
_ = catcher.Error("failed to start notification plugin", err, map[string]any{
60+
"process": "plugin_com.utmstack.stats",
61+
})
62+
os.Exit(1)
63+
}
5864

5965
cancel()
6066
wg.Wait()

0 commit comments

Comments
 (0)