Skip to content

Commit 7bc2ead

Browse files
committed
refactor[plugins](alerts): group caches and tag-rule engine into cache.go and tagrules.go
1 parent 08ea333 commit 7bc2ead

10 files changed

Lines changed: 420 additions & 387 deletions

File tree

backend/modules/alerts/connectors/repository.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,6 @@ type AlertRepository interface {
1717
CountOpenAlerts(ctx context.Context) (int64, error)
1818
CountByStatus(ctx context.Context, status int) (int64, error)
1919
SearchByIDs(ctx context.Context, alertIDs []string) ([]domain.UtmAlert, error)
20-
AssignAssetGroups(ctx context.Context, mapping map[string]AssetGroupRef) error
21-
}
22-
23-
type AssetGroupRef struct {
24-
ID int64
25-
Name string
2620
}
2721

2822
type HistoryEntry struct {

backend/modules/alerts/module.go

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
package alerts
22

33
import (
4-
"context"
5-
6-
"github.com/threatwinds/go-sdk/catcher"
74
"github.com/utmstack/utmstack/backend/modules/alerts/connectors"
85
"github.com/utmstack/utmstack/backend/modules/alerts/handler"
96
"github.com/utmstack/utmstack/backend/modules/alerts/repository"
@@ -17,14 +14,9 @@ type Module struct {
1714
alertTagHandler *handler.AlertTagHandler
1815
alertTagRuleHandler *handler.AlertTagRuleHandler
1916
adversaryHandler *handler.AdversaryHandler
20-
scheduler *usecase.Scheduler
21-
schedulerEnabled bool
2217
}
2318

24-
func NewModule(
25-
db *gorm.DB,
26-
schedulerEnabled bool,
27-
) *Module {
19+
func NewModule(db *gorm.DB) *Module {
2820
alertRepo := repository.NewOSAlertRepository()
2921

3022
historyRecorder := repository.NewHistoryRecorder()
@@ -40,8 +32,6 @@ func NewModule(
4032
alertTagRuleUC := usecase.NewAlertTagRuleUsecase(alertTagRuleRepo, alertTagRepo)
4133
alertTagRuleH := handler.NewAlertTagRuleHandler(alertTagRuleUC)
4234

43-
sched := usecase.NewScheduler(alertRepo)
44-
4535
adversaryUC := usecase.NewAdversaryUsecase()
4636
adversaryH := handler.NewAdversaryHandler(adversaryUC)
4737

@@ -51,18 +41,7 @@ func NewModule(
5141
alertTagHandler: alertTagH,
5242
alertTagRuleHandler: alertTagRuleH,
5343
adversaryHandler: adversaryH,
54-
scheduler: sched,
55-
schedulerEnabled: schedulerEnabled,
56-
}
57-
}
58-
59-
func (m *Module) Start(ctx context.Context) {
60-
if !m.schedulerEnabled {
61-
catcher.Info("alerts scheduler: disabled (ALERTS_SCHEDULER_ENABLED=false)", nil)
62-
return
6344
}
64-
catcher.Info("alerts scheduler: enabled — launching goroutine", nil)
65-
go m.scheduler.Start(ctx)
6645
}
6746

6847
func (m *Module) GetAlertHandler() *handler.AlertHandler { return m.alertHandler }
@@ -76,5 +55,3 @@ func (m *Module) GetAlertTagRuleHandler() *handler.AlertTagRuleHandler {
7655
}
7756

7857
func (m *Module) GetAdversaryHandler() *handler.AdversaryHandler { return m.adversaryHandler }
79-
80-
func (m *Module) IsSchedulerEnabled() bool { return m.schedulerEnabled }

backend/modules/alerts/repository/alert_os.go

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,6 @@ ctx._source.incidentDetail.createdBy = params.createdBy;
4848
ctx._source.incidentDetail.source = params.source;
4949
`
5050

51-
const assignAssetGroupsScript = `
52-
if (params.mapping.containsKey(ctx._source.dataSource)) {
53-
ctx._source.assetGroupId = params.mapping[ctx._source.dataSource].id;
54-
ctx._source.assetGroupName = params.mapping[ctx._source.dataSource].name;
55-
}
56-
`
57-
5851
// ---------------------------------------------------------------------------
5952
// osAlertRepo implements connectors.AlertRepository against the go-sdk `os` client.
6053
// ---------------------------------------------------------------------------
@@ -195,20 +188,3 @@ func (r *osAlertRepo) SearchByIDs(ctx context.Context, alertIDs []string) ([]dom
195188
return alerts, nil
196189
}
197190

198-
func (r *osAlertRepo) AssignAssetGroups(ctx context.Context, mapping map[string]connectors.AssetGroupRef) error {
199-
filter := termQuery("status", int(domain.AlertStatusAutomaticReview))
200-
201-
// Convert mapping to a plain map[string]any for Painless params.
202-
pmapping := make(map[string]any, len(mapping))
203-
for k, v := range mapping {
204-
pmapping[k] = map[string]any{"id": v.ID, "name": v.Name}
205-
}
206-
207-
script := Script{
208-
Source: assignAssetGroupsScript,
209-
Params: map[string]any{
210-
"mapping": pmapping,
211-
},
212-
}
213-
return osUpdateByQuery(ctx, alertIndex, filter, script)
214-
}

backend/modules/alerts/repository/osquery.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,6 @@ func termsQuery(field string, values []string) map[string]any {
2525
return map[string]any{"terms": map[string]any{field: iValues}}
2626
}
2727

28-
func matchQuery(field string, value any) map[string]any {
29-
return map[string]any{"match": map[string]any{field: value}}
30-
}
31-
3228
// Script holds a Painless script source and its parameters. Always populate
3329
// Params — never embed user input directly in Source.
3430
type Script struct {

backend/modules/alerts/usecase/scheduler.go

Lines changed: 0 additions & 59 deletions
This file was deleted.
Lines changed: 126 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,16 @@ const (
1818
defaultRefreshSec = 60
1919
rulesRequestTimeout = 10 * time.Second
2020
activeRulesPath = "/api/v1/internal/alert-tag-rules/active"
21+
enrichmentPath = "/api/v1/datasources/enrichment"
2122
)
2223

23-
// RuleSnapshot is the per-rule view the plugin needs at evaluation time.
24-
// Conditions is left as raw JSON until used so we don't decode work we won't
25-
// run (a rule with empty conditions never reaches the evaluator).
2624
type RuleSnapshot struct {
2725
ID uint64
2826
Name string
2927
Conditions []FilterType
3028
TagNames []string
3129
}
3230

33-
// activeRuleWire matches the backend dto.ActiveAlertTagRule payload.
3431
type activeRuleWire struct {
3532
ID uint64 `json:"id"`
3633
Name string `json:"name"`
@@ -161,3 +158,128 @@ func (c *ruleCache) Run(ctx context.Context) {
161158
}
162159
}
163160
}
161+
162+
type datasourceEnrichment struct {
163+
GroupID *uint64
164+
GroupName string
165+
Labels []string
166+
}
167+
168+
type enrichmentWire struct {
169+
Name string `json:"name"`
170+
DataType string `json:"dataType"`
171+
GroupID *uint64 `json:"groupId"`
172+
GroupName string `json:"groupName"`
173+
Labels []string `json:"labels"`
174+
}
175+
176+
type datasourceCache struct {
177+
baseURL string
178+
internalKey string
179+
httpClient *http.Client
180+
refresh time.Duration
181+
182+
mu sync.RWMutex
183+
byName map[string]datasourceEnrichment
184+
}
185+
186+
func newDatasourceCache() *datasourceCache {
187+
cfg := plugins.PluginCfg("com.utmstack")
188+
base := cfg.Get("backend").String()
189+
if base != "" && !strings.HasPrefix(base, "http://") && !strings.HasPrefix(base, "https://") {
190+
base = "http://" + base
191+
}
192+
refresh := time.Duration(defaultRefreshSec) * time.Second
193+
if v := cfg.Get("rulesRefreshSec").Int(); v > 0 {
194+
refresh = time.Duration(v) * time.Second
195+
}
196+
return &datasourceCache{
197+
baseURL: base,
198+
internalKey: cfg.Get("internalKey").String(),
199+
httpClient: &http.Client{Timeout: rulesRequestTimeout},
200+
refresh: refresh,
201+
}
202+
}
203+
204+
func (c *datasourceCache) Lookup(name string) (datasourceEnrichment, bool) {
205+
c.mu.RLock()
206+
defer c.mu.RUnlock()
207+
e, ok := c.byName[name]
208+
return e, ok
209+
}
210+
211+
func (c *datasourceCache) Refresh(ctx context.Context) error {
212+
if c.baseURL == "" || c.internalKey == "" {
213+
return catcher.Error("datasource cache: backend URL or internal key missing", nil, map[string]any{"process": "plugin_com.utmstack.alerts"})
214+
}
215+
216+
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+enrichmentPath, nil)
217+
if err != nil {
218+
return catcher.Error("datasource cache: build request failed", err, map[string]any{"process": "plugin_com.utmstack.alerts"})
219+
}
220+
req.Header.Set(internalKeyHeader, c.internalKey)
221+
222+
resp, err := c.httpClient.Do(req)
223+
if err != nil {
224+
return catcher.Error("datasource cache: request failed", err, map[string]any{"process": "plugin_com.utmstack.alerts"})
225+
}
226+
defer func() { _ = resp.Body.Close() }()
227+
228+
body, err := io.ReadAll(resp.Body)
229+
if err != nil {
230+
return catcher.Error("datasource cache: read body failed", err, map[string]any{"process": "plugin_com.utmstack.alerts"})
231+
}
232+
if resp.StatusCode >= 400 {
233+
return catcher.Error("datasource cache: backend returned error", nil, map[string]any{
234+
"status": resp.StatusCode,
235+
"body": string(body),
236+
"process": "plugin_com.utmstack.alerts",
237+
})
238+
}
239+
240+
var wire []enrichmentWire
241+
if err := json.Unmarshal(body, &wire); err != nil {
242+
return catcher.Error("datasource cache: decode failed", err, map[string]any{"process": "plugin_com.utmstack.alerts"})
243+
}
244+
245+
next := make(map[string]datasourceEnrichment, len(wire))
246+
for _, w := range wire {
247+
// Keyed by name; a host's group is the same across its data types, so a
248+
// repeated name just overwrites with equivalent enrichment.
249+
next[w.Name] = datasourceEnrichment{
250+
GroupID: w.GroupID,
251+
GroupName: w.GroupName,
252+
Labels: w.Labels,
253+
}
254+
}
255+
256+
c.mu.Lock()
257+
c.byName = next
258+
c.mu.Unlock()
259+
return nil
260+
}
261+
262+
func (c *datasourceCache) Run(ctx context.Context) {
263+
defer func() {
264+
if r := recover(); r != nil {
265+
_ = catcher.Error("datasource cache: recovered from panic in Run", nil, map[string]any{
266+
"panic": r,
267+
"process": "plugin_com.utmstack.alerts",
268+
})
269+
}
270+
}()
271+
272+
ticker := time.NewTicker(c.refresh)
273+
defer ticker.Stop()
274+
275+
for {
276+
select {
277+
case <-ctx.Done():
278+
return
279+
case <-ticker.C:
280+
refreshCtx, cancel := context.WithTimeout(context.Background(), rulesRequestTimeout)
281+
_ = c.Refresh(refreshCtx)
282+
cancel()
283+
}
284+
}
285+
}

0 commit comments

Comments
 (0)