Skip to content

Commit 6718509

Browse files
committed
fix(soc-ai): reduce CPU overuse in plugin
1 parent aba899f commit 6718509

4 files changed

Lines changed: 134 additions & 85 deletions

File tree

installer/docker/plugins.go

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,18 @@ type PluginsConfig struct {
1313
}
1414

1515
type PluginConfig struct {
16-
Order []string `yaml:"order,omitempty"`
17-
Port int `yaml:"port,omitempty"`
18-
RulesFolder string `yaml:"rulesFolder,omitempty"`
19-
GeoIPFolder string `yaml:"geoipFolder,omitempty"`
20-
OpenSearch string `yaml:"opensearch,omitempty"`
21-
PostgreSQL PostgreConfig `yaml:"postgresql,omitempty"`
22-
ServerName string `yaml:"serverName,omitempty"`
23-
InternalKey string `yaml:"internalKey,omitempty"`
24-
AgentManager string `yaml:"agentManager,omitempty"`
25-
Backend string `yaml:"backend,omitempty"`
26-
CertsFolder string `yaml:"certsFolder,omitempty"`
16+
Order []string `yaml:"order,omitempty"`
17+
Port int `yaml:"port,omitempty"`
18+
RulesFolder string `yaml:"rulesFolder,omitempty"`
19+
GeoIPFolder string `yaml:"geoipFolder,omitempty"`
20+
OpenSearch string `yaml:"opensearch,omitempty"`
21+
PostgreSQL PostgreConfig `yaml:"postgresql,omitempty"`
22+
ServerName string `yaml:"serverName,omitempty"`
23+
InternalKey string `yaml:"internalKey,omitempty"`
24+
AgentManager string `yaml:"agentManager,omitempty"`
25+
Backend string `yaml:"backend,omitempty"`
26+
CertsFolder string `yaml:"certsFolder,omitempty"`
27+
ModulesConfig string `yaml:"modulesConfig,omitempty"`
2728
}
2829

2930
type PostgreConfig struct {
@@ -75,11 +76,12 @@ func SetPluginsConfigs(conf *config.Config, stack *StackConfig) error {
7576
Password: conf.Password,
7677
Database: "utmstack",
7778
},
78-
ServerName: conf.ServerName,
79-
InternalKey: conf.InternalKey,
80-
AgentManager: "10.21.199.3:9000",
81-
Backend: "http://backend:8080",
82-
CertsFolder: "/cert",
79+
ServerName: conf.ServerName,
80+
InternalKey: conf.InternalKey,
81+
AgentManager: "10.21.199.3:9000",
82+
Backend: "http://backend:8080",
83+
CertsFolder: "/cert",
84+
ModulesConfig: "event-processor-manager:9003",
8385
}
8486

8587
openSearchPipeline := PluginsConfig{}

plugins/soc-ai/alert.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package main
22

33
import (
4-
"regexp"
5-
64
"github.com/threatwinds/go-sdk/plugins"
75
"github.com/utmstack/UTMStack/plugins/soc-ai/config"
86
"github.com/utmstack/UTMStack/plugins/soc-ai/schema"
@@ -40,7 +38,7 @@ func cleanAlerts(alert schema.AlertFields) schema.AlertFields {
4038
original := v.StringValue
4139
cleaned := original
4240
for _, pattern := range config.SensitivePatterns {
43-
re := regexp.MustCompile(pattern.Regexp)
41+
re := pattern.GetRegexp()
4442
cleaned = re.ReplaceAllString(cleaned, pattern.FakeValue)
4543
}
4644
if cleaned != original {

plugins/soc-ai/config/config.go

Lines changed: 99 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ type Config struct {
3737
Backend string
3838
InternalKey string
3939
Opensearch string
40+
ModulesConfigHost string
4041
APIKey string
4142
ChangeAlertStatus bool
4243
AutomaticIncidentCreation bool
@@ -64,14 +65,12 @@ func StartConfigurationSystem() {
6465
config.Backend = pluginConfig.Get("backend").String()
6566
config.InternalKey = pluginConfig.Get("internalKey").String()
6667
config.Opensearch = pluginConfig.Get("opensearch").String()
68+
config.ModulesConfigHost = pluginConfig.Get("modulesConfig").String()
6769
configMutex.Unlock()
6870

6971
utils.Logger.Info("Starting gRPC configuration client...")
7072

71-
ctx, cancel := context.WithCancel(context.Background())
72-
defer cancel()
73-
74-
go ConnectAndStreamConfig("localhost:9003", config.InternalKey)
73+
go ConnectAndStreamConfig(config.ModulesConfigHost, config.InternalKey)
7574

7675
ticker := time.NewTicker(5 * time.Second)
7776
defer ticker.Stop()
@@ -91,84 +90,120 @@ func StartConfigurationSystem() {
9190
} else {
9291
utils.Logger.LogF(100, "No gRPC configuration available yet...")
9392
}
94-
case <-ctx.Done():
93+
case <-configShutdown:
94+
utils.Logger.Info("StartConfigurationSystem shutting down...")
9595
return
9696
}
9797
}
9898
}
9999

100+
var (
101+
configShutdown = make(chan struct{})
102+
)
103+
104+
func ShutdownConfigSystem() {
105+
close(configShutdown)
106+
}
107+
100108
func ConnectAndStreamConfig(serverAddress, internalKey string) {
101109
for {
102-
func() {
103-
ctx, cancel := context.WithCancel(context.Background())
104-
defer cancel()
105-
ctx = metadata.AppendToOutgoingContext(ctx, "internal-key", internalKey)
106-
conn, err := grpc.NewClient(
107-
serverAddress,
108-
grpc.WithTransportCredentials(insecure.NewCredentials()),
109-
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMessageSize)),
110-
)
110+
select {
111+
case <-configShutdown:
112+
utils.Logger.Info("ConnectAndStreamConfig shutting down...")
113+
return
114+
default:
115+
}
111116

112-
if err != nil {
113-
catcher.Error("Failed to connect to server", err, nil)
114-
return
115-
}
117+
connCtx, connCancel := context.WithCancel(context.Background())
118+
connCtx = metadata.AppendToOutgoingContext(connCtx, "internal-key", internalKey)
119+
conn, err := grpc.NewClient(
120+
serverAddress,
121+
grpc.WithTransportCredentials(insecure.NewCredentials()),
122+
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMessageSize)),
123+
)
124+
125+
if err != nil {
126+
catcher.Error("Failed to connect to server", err, nil)
127+
connCancel()
128+
time.Sleep(reconnectDelay)
129+
continue
130+
}
116131

117-
state := conn.GetState()
118-
if state == connectivity.Shutdown || state == connectivity.TransientFailure {
119-
catcher.Error("Connection is in shutdown or transient failure state", nil, nil)
120-
return
121-
}
132+
state := conn.GetState()
133+
if state == connectivity.Shutdown || state == connectivity.TransientFailure {
134+
catcher.Error("Connection is in shutdown or transient failure state", nil, nil)
135+
conn.Close()
136+
connCancel()
137+
time.Sleep(reconnectDelay)
138+
continue
139+
}
122140

123-
client := NewConfigServiceClient(conn)
124-
stream, err := client.StreamConfig(ctx)
125-
if err != nil {
126-
catcher.Error("Failed to create stream", err, nil)
127-
return
128-
}
141+
client := NewConfigServiceClient(conn)
142+
stream, err := client.StreamConfig(connCtx)
143+
if err != nil {
144+
catcher.Error("Failed to create stream", err, nil)
145+
conn.Close()
146+
connCancel()
147+
time.Sleep(reconnectDelay)
148+
continue
149+
}
129150

130-
err = stream.Send(&BiDirectionalMessage{
131-
Payload: &BiDirectionalMessage_PluginInit{
132-
PluginInit: &PluginInit{Type: PluginType_SOC_AI},
133-
},
134-
})
135-
if err != nil {
136-
catcher.Error("Failed to send PluginInit", err, nil)
151+
err = stream.Send(&BiDirectionalMessage{
152+
Payload: &BiDirectionalMessage_PluginInit{
153+
PluginInit: &PluginInit{Type: PluginType_SOC_AI},
154+
},
155+
})
156+
if err != nil {
157+
catcher.Error("Failed to send PluginInit", err, nil)
158+
conn.Close()
159+
connCancel()
160+
time.Sleep(reconnectDelay)
161+
continue
162+
}
163+
164+
for {
165+
select {
166+
case <-configShutdown:
167+
conn.Close()
168+
connCancel()
137169
return
170+
default:
138171
}
139172

140-
for {
141-
in, err := stream.Recv()
142-
if err != nil {
143-
if strings.Contains(err.Error(), "EOF") {
144-
catcher.Info("Stream closed by server, reconnecting...", nil)
145-
conn.Close()
146-
time.Sleep(reconnectDelay)
147-
break
148-
}
149-
st, ok := status.FromError(err)
150-
if ok && (st.Code() == codes.Unavailable || st.Code() == codes.Canceled) {
151-
catcher.Error("Stream error: "+st.Message(), err, nil)
152-
conn.Close()
153-
time.Sleep(reconnectDelay)
154-
break
155-
} else {
156-
catcher.Error("Stream receive error", err, nil)
157-
time.Sleep(reconnectDelay)
158-
continue
159-
}
173+
in, err := stream.Recv()
174+
if err != nil {
175+
if strings.Contains(err.Error(), "EOF") {
176+
catcher.Info("Stream closed by server, reconnecting...", nil)
177+
conn.Close()
178+
connCancel()
179+
time.Sleep(reconnectDelay)
180+
break
160181
}
161-
162-
switch message := in.Payload.(type) {
163-
case *BiDirectionalMessage_Config:
164-
log.Printf("Received configuration update: %v", message.Config)
165-
grpcMutex.Lock()
166-
grpcConfig = message.Config
167-
grpcMutex.Unlock()
182+
st, ok := status.FromError(err)
183+
if ok && (st.Code() == codes.Unavailable || st.Code() == codes.Canceled) {
184+
catcher.Error("Stream error: "+st.Message(), err, nil)
185+
conn.Close()
186+
connCancel()
187+
time.Sleep(reconnectDelay)
188+
break
189+
} else {
190+
catcher.Error("Stream receive error", err, nil)
191+
time.Sleep(reconnectDelay)
192+
continue
168193
}
169194
}
170-
}()
171195

196+
switch message := in.Payload.(type) {
197+
case *BiDirectionalMessage_Config:
198+
log.Printf("Received configuration update: %v", message.Config)
199+
grpcMutex.Lock()
200+
grpcConfig = message.Config
201+
grpcMutex.Unlock()
202+
}
203+
}
204+
205+
conn.Close()
206+
connCancel()
172207
time.Sleep(reconnectDelay)
173208
}
174209
}

plugins/soc-ai/config/const.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
package config
22

3+
import (
4+
"regexp"
5+
"sync"
6+
)
7+
38
const (
49
API_ALERT_ENDPOINT = "/api/elasticsearch/search"
510
API_ALERT_STATUS_ENDPOINT = "/api/utm-alerts/status"
@@ -44,14 +49,23 @@ var (
4449
type SensitivePattern struct {
4550
Regexp string
4651
FakeValue string
52+
compiled *regexp.Regexp
53+
once sync.Once
54+
}
55+
56+
func (sp *SensitivePattern) GetRegexp() *regexp.Regexp {
57+
sp.once.Do(func() {
58+
sp.compiled = regexp.MustCompile(sp.Regexp)
59+
})
60+
return sp.compiled
4761
}
4862

4963
var (
5064
FakeUserName = "John Doe"
5165
FakeEmail = "jhondoe@gmail.com"
52-
SensitivePatterns = map[string]SensitivePattern{
66+
SensitivePatterns = map[string]*SensitivePattern{
5367
"email": {Regexp: `([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})`, FakeValue: "jhondoe@gmail.com"},
54-
//"ipv4": `(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)`,
68+
//"ipv4": {Regexp: `(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)`, FakeValue: "10.0.0.1"},
5569
}
5670
GPT_INSTRUCTION = "You are an expert security engineer. Perform a deep analysis of an alert created by a SIEM and the logs related to it. Determine if the alert could be an actual potential threat or not and explain why. Provide a description that shows a deep understanding of the alert based on a deep analysis of its logs and estimate the risk to the systems affected. Classify the alert in the following manner: if the alert information is sufficient to determine that the security, availability, confidentiality, or integrity of the systems has being compromised, then classify it as \"possible incident\". If the alert does not pose a security risk to the organization or has no security relevance, classify it as \"possible false positive\". If the alert does not pose an imminent risk to the systems, requires no urgent action from an administrator, or requires not urgent review by an administrator, it should be classified as a \"standard alert\". You will also provide context-specific instructions for remediation, mitigation, or further investigation, related to the alert and logs analyzed. Your answer should be provided using the following JSON format and the total number of characters in your answer must not exceed 1500 words. Your entire answer must be inside this json format. {\"activity_id\":\"<activity_id>\",\"classification\":\"<classification>\",\"reasoning\":[\"<deep_reasoning>\"],\"nextSteps\":[{\"step\":1,\"action\":\"<action_1>\",\"details\":\"<action_1_details>\"},{\"step\":2,\"action\":\"<action_2>\",\"details\":\"<action_2_details>\"},{\"step\":3,\"action\":\"<action_3>\"]}Ensure that your entire answer adheres to the provided JSON format. The response should be valid JSON syntax and schema."
5771
GPT_FALSE_POSITIVE = "This alert is categorized as a potential false positive due to two key factors. Firstly, it originates from an automated system, which may occasionally produce alerts without direct human validation. Additionally, the absence of any correlated logs further raises suspicion, as a genuine incident typically leaves a trail of relevant log entries. Hence, the combination of its system-generated nature and the lack of associated logs suggests a likelihood of being a false positive rather than a genuine security incident."

0 commit comments

Comments
 (0)