Skip to content

Commit 3e8bb2b

Browse files
committed
Add caching mechanism and re-enable threat intelligence queue
Implemented a cache for blocklist checks to improve performance and reduced redundant processing. Re-enabled the threat intelligence queueing mechanism to ensure logs are properly analyzed and flagged. Additionally, cleaned up unused code and improved the blocklisted element handling logic.
1 parent 9019707 commit 3e8bb2b

3 files changed

Lines changed: 82 additions & 52 deletions

File tree

correlation/api/newLogHandler.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package api
33
import (
44
"encoding/json"
55
"fmt"
6+
"github.com/utmstack/UTMStack/correlation/ti"
67
"io"
78
"log"
89
"net/http"
@@ -74,7 +75,7 @@ func NewLog(c *gin.Context) {
7475
}
7576

7677
cache.AddToCache(l)
77-
//ti.Enqueue(l)
78+
ti.Enqueue(l)
7879
search.AddToQueue(l)
7980
response["status"] = "queued"
8081
c.JSON(http.StatusOK, response)

correlation/ti/bases.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@ func Load() {
1212

1313
var files = []string{
1414
"ip_blocklist.list",
15-
//"domain_blocklist.list",
16-
//"hostname_blocklist.list",
1715
}
1816

1917
for _, file := range files {
@@ -22,10 +20,6 @@ func Load() {
2220
switch file {
2321
case "ip_blocklist.list":
2422
t = "IP"
25-
//case "domain_blocklist.list":
26-
// t = "domain"
27-
//case "hostname_blocklist.list":
28-
// t = "hostname"
2923
}
3024

3125
f, err := os.Open(filepath.Join("/app", file))

correlation/ti/ti.go

Lines changed: 80 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,68 @@ import (
55
"github.com/utmstack/UTMStack/correlation/correlation"
66
"github.com/utmstack/UTMStack/correlation/utils"
77
"runtime"
8+
"strings"
9+
"sync"
10+
"time"
811
)
912

13+
type Cache map[string]string
14+
1015
var blockList map[string]string
1116
var channel chan string
17+
var cache Cache
18+
var cacheLock *sync.RWMutex
1219

1320
func init() {
1421
blockList = make(map[string]string, 10000)
1522
channel = make(chan string, 10000)
23+
cache = make(Cache, 10000)
24+
cacheLock = &sync.RWMutex{}
25+
cache.AutoClean()
26+
}
27+
28+
func blocked(log string) bool {
29+
log = strings.ToLower(log)
30+
31+
exclusionList := []string{
32+
"block",
33+
"denied",
34+
"drop",
35+
"reject",
36+
"deny",
37+
}
38+
39+
for _, e := range exclusionList {
40+
if strings.Contains(log, e) {
41+
return true
42+
}
43+
}
44+
45+
return false
46+
}
47+
48+
func (c *Cache) Add(k, v string) {
49+
cacheLock.Lock()
50+
defer cacheLock.Unlock()
51+
(*c)[k] = v
52+
}
53+
54+
func (c *Cache) IsCached(k string) bool {
55+
cacheLock.RLock()
56+
defer cacheLock.RUnlock()
57+
_, ok := (*c)[k]
58+
return ok
59+
}
60+
61+
func (c *Cache) AutoClean() {
62+
go func() {
63+
for {
64+
cacheLock.Lock()
65+
cache = make(Cache, 10000)
66+
cacheLock.Unlock()
67+
time.Sleep(8 * time.Hour)
68+
}
69+
}()
1670
}
1771

1872
func IsBlocklisted() {
@@ -48,53 +102,30 @@ func IsBlocklisted() {
48102
sourceIp := gjson.Get(log, "logx.*.src_ip")
49103
destinationIp := gjson.Get(log, "logx.*.dest_ip")
50104

51-
for key, value := range blockList {
52-
var stop bool
53-
54-
switch value {
55-
case "IP":
56-
if sourceIp.String() == key {
57-
correlation.Alert(
58-
"Connection attempt from a malicious IP",
59-
"Low",
60-
"A blocklisted element has been identified in the logs. Further investigation is recommended.",
61-
"",
62-
"Threat Intelligence",
63-
"",
64-
[]string{"https://threatwinds.com"},
65-
gjson.Get(log, "dataType").String(),
66-
gjson.Get(log, "dataSource").String(),
67-
utils.ExtractDetails(saveFields, log),
68-
)
69-
70-
stop = true
71-
}
72-
73-
if destinationIp.String() == key {
74-
correlation.Alert(
75-
"Connection attempt to a malicious IP",
76-
"High",
77-
"A blocklisted element has been identified in the logs. Further investigation is recommended.",
78-
"",
79-
"Threat Intelligence",
80-
"",
81-
[]string{"https://threatwinds.com"},
82-
gjson.Get(log, "dataType").String(),
83-
gjson.Get(log, "dataSource").String(),
84-
utils.ExtractDetails(saveFields, log),
85-
)
86-
87-
stop = true
88-
}
89-
}
105+
if !cache.IsCached(sourceIp.String()) {
106+
if _, ok := blockList[sourceIp.String()]; ok && !blocked(log) {
107+
correlation.Alert(
108+
"Connection attempt from a malicious IP",
109+
"Low",
110+
"A blocklisted element has been identified in the logs. Further investigation is recommended.",
111+
"",
112+
"Threat Intelligence",
113+
"",
114+
[]string{"https://threatwinds.com"},
115+
gjson.Get(log, "dataType").String(),
116+
gjson.Get(log, "dataSource").String(),
117+
utils.ExtractDetails(saveFields, log),
118+
)
90119

91-
if stop {
92-
break
93120
}
94121

95-
/*if strings.Contains(log, key) {
122+
cache.Add(sourceIp.String(), "true")
123+
}
124+
125+
if !cache.IsCached(destinationIp.String()) {
126+
if _, ok := blockList[destinationIp.String()]; ok && !blocked(log) {
96127
correlation.Alert(
97-
fmt.Sprintf("Malicious %s found in log: %s", value, key),
128+
"Connection attempt from a malicious IP",
98129
"Low",
99130
"A blocklisted element has been identified in the logs. Further investigation is recommended.",
100131
"",
@@ -105,15 +136,19 @@ func IsBlocklisted() {
105136
gjson.Get(log, "dataSource").String(),
106137
utils.ExtractDetails(saveFields, log),
107138
)
139+
}
108140

109-
break
110-
}*/
141+
cache.Add(destinationIp.String(), "true")
111142
}
112143
}
113144
}()
114145
}
115146
}
116147

117148
func Enqueue(log string) {
149+
if len(channel) >= 10000 {
150+
return
151+
}
152+
118153
channel <- log
119154
}

0 commit comments

Comments
 (0)