Skip to content

Commit a84fcc0

Browse files
committed
refactor(crowdstrike): migrate from polling to real-time event streaming
- Replaced polling-based event collection with real-time streaming architecture - Implemented persistent stream management with automatic reconnection - Added dynamic configuration reloading with live stream updates - Introduced per-stream offset tracking to prevent duplicate events
1 parent 6f2fe2e commit a84fcc0

File tree

3 files changed

+348
-142
lines changed

3 files changed

+348
-142
lines changed

plugins/crowdstrike/check.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func infiniteRetryIfXError(f func() error, exception string) error {
5858
_ = catcher.Error("An error occurred (%s), will keep retrying indefinitely...", err, map[string]any{"process": "plugin_com.utmstack.crowdstrike"})
5959
xErrorWasLogged = true
6060
}
61-
time.Sleep(wait)
61+
time.Sleep(reconnectDelay)
6262
continue
6363
}
6464

plugins/crowdstrike/config/config.go

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,17 @@ const (
2323
)
2424

2525
var (
26-
cnf *ConfigurationSection
27-
mu sync.Mutex
28-
26+
cnf *ConfigurationSection
27+
mu sync.Mutex
28+
configUpdateChan chan *ConfigurationSection
2929
internalKey string
3030
modulesConfigHost string
3131
)
3232

33+
func init() {
34+
configUpdateChan = make(chan *ConfigurationSection, 1)
35+
}
36+
3337
func GetConfig() *ConfigurationSection {
3438
mu.Lock()
3539
defer mu.Unlock()
@@ -39,6 +43,10 @@ func GetConfig() *ConfigurationSection {
3943
return cnf
4044
}
4145

46+
func GetConfigUpdateChannel() <-chan *ConfigurationSection {
47+
return configUpdateChan
48+
}
49+
4250
func StartConfigurationSystem() {
4351
for {
4452
pluginConfig := plugins.PluginCfg("com.utmstack")
@@ -133,7 +141,18 @@ func StartConfigurationSystem() {
133141
switch message := in.Payload.(type) {
134142
case *BiDirectionalMessage_Config:
135143
catcher.Info("Received configuration update", map[string]any{"process": "plugin_com.utmstack.crowdstrike"})
144+
145+
mu.Lock()
136146
cnf = message.Config
147+
mu.Unlock()
148+
149+
select {
150+
case configUpdateChan <- message.Config:
151+
152+
default:
153+
154+
catcher.Info("Configuration update channel full, skipping notification", map[string]any{"process": "plugin_com.utmstack.crowdstrike"})
155+
}
137156
}
138157
}
139158
}

0 commit comments

Comments
 (0)