Skip to content

Commit d191ce9

Browse files
committed
Subscribe arm agent to new windows channels
1 parent 4e66ae8 commit d191ce9

4 files changed

Lines changed: 114 additions & 63 deletions

File tree

.github/workflows/v11-principal-alpha-deployment.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ on:
44
workflow_dispatch:
55
inputs:
66
version_tag:
7-
description: "Version to deploy."
7+
description: "Version to deploy.(e.g., v1.0.0-alpha.1)"
88
required: true
99
event_processor_tag:
10-
description: "Event processor version to use for this deployment."
10+
description: "Event processor version to use for this deployment.(e.g., 1.0.0-beta)"
1111
required: true
1212

1313
jobs:

.github/workflows/v11-used-build.yml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,6 @@ jobs:
6363
$env:GOOS = "linux"
6464
$env:GOARCH = "amd64"
6565
go build -o utmstack_agent_service -v .
66-
# $env:GOARCH = "arm64"
67-
# go build -o utmstack_agent_service_arm64 -v .
6866
6967
$env:GOOS = "windows"
7068
$env:GOARCH = "amd64"

agent/collectors/windows_amd64.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,6 @@ func (w Windows) Install() error {
7373
}
7474

7575
func (w Windows) SendLogs() {
76-
host, _ := os.Hostname()
7776
logLinesChan := make(chan string)
7877
path := utils.GetMyPath()
7978
winbLogPath := filepath.Join(path, "beats", "winlogbeat", "logs")

agent/collectors/windows_arm64.go

Lines changed: 112 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ import (
1212
"os/signal"
1313
"strconv"
1414
"strings"
15+
"sync"
1516
"syscall"
17+
"time"
1618
"unsafe"
1719

1820
"github.com/threatwinds/go-sdk/plugins"
@@ -75,8 +77,10 @@ type EventSubscription struct {
7577
Channel string
7678
Query string
7779
Errors chan error
78-
Callback func(event *Event)
7980
winAPIHandle windows.Handle
81+
82+
mu sync.Mutex
83+
running bool
8084
}
8185

8286
const (
@@ -91,9 +95,13 @@ var (
9195
procEvtSubscribe = modwevtapi.NewProc("EvtSubscribe")
9296
procEvtRender = modwevtapi.NewProc("EvtRender")
9397
procEvtClose = modwevtapi.NewProc("EvtClose")
98+
incomingEvents = make(chan string, 1024)
9499
)
95100

96101
func (evtSub *EventSubscription) Create() error {
102+
evtSub.mu.Lock()
103+
defer evtSub.mu.Unlock()
104+
97105
if evtSub.winAPIHandle != 0 {
98106
return fmt.Errorf("windows_events: subscription has already been created")
99107
}
@@ -132,6 +140,9 @@ func (evtSub *EventSubscription) Create() error {
132140
}
133141

134142
func (evtSub *EventSubscription) Close() error {
143+
evtSub.mu.Lock()
144+
defer evtSub.mu.Unlock()
145+
135146
if evtSub.winAPIHandle == 0 {
136147
return fmt.Errorf("windows_events: no active subscription to close")
137148
}
@@ -146,47 +157,70 @@ func (evtSub *EventSubscription) Close() error {
146157
func (evtSub *EventSubscription) winAPICallback(action, userContext, event uintptr) uintptr {
147158
switch action {
148159
case evtSubscribeActionError:
149-
evtSub.Errors <- fmt.Errorf("windows_events: error in callback, code: %x", uint16(event))
150-
case evtSubscribeActionDeliver:
151-
bufferSize := uint32(4096)
152-
for {
153-
renderSpace := make([]uint16, bufferSize/2)
154-
bufferUsed := uint32(0)
155-
propertyCount := uint32(0)
156-
ret, _, err := procEvtRender.Call(
157-
0,
158-
event,
159-
evtRenderEventXML,
160-
uintptr(bufferSize),
161-
uintptr(unsafe.Pointer(&renderSpace[0])),
162-
uintptr(unsafe.Pointer(&bufferUsed)),
163-
uintptr(unsafe.Pointer(&propertyCount)),
164-
)
165-
if ret == 0 {
166-
if err == windows.ERROR_INSUFFICIENT_BUFFER {
167-
bufferSize *= 2
168-
continue
160+
err := fmt.Errorf("windows_events: error in callback, code: %x", uint16(event))
161+
evtSub.Errors <- err
162+
163+
go func(channel string) {
164+
utils.Logger.LogF(100, "Attempting to resubscribe to channel: %s after error: %v", channel, err)
165+
evtSub.mu.Lock()
166+
defer evtSub.mu.Unlock()
167+
168+
_ = evtSub.Close()
169+
170+
for {
171+
time.Sleep(5 * time.Second)
172+
if err := evtSub.Create(); err != nil {
173+
utils.Logger.ErrorF("Retry failed for channel %s: %s", channel, err)
174+
} else {
175+
utils.Logger.LogF(100, "Resubscribed to channel: %s", channel)
176+
break
169177
}
170-
evtSub.Errors <- fmt.Errorf("windows_events: failed to render event: %w", err)
171-
return 0
172-
}
173-
xmlStr := windows.UTF16ToString(renderSpace)
174-
xmlStr = cleanXML(xmlStr)
175-
176-
dataParsed := new(Event)
177-
if err := xml.Unmarshal([]byte(xmlStr), dataParsed); err != nil {
178-
evtSub.Errors <- fmt.Errorf("windows_events: failed to parse XML: %s", err)
179-
} else {
180-
evtSub.Callback(dataParsed)
181178
}
179+
}(evtSub.Channel)
180+
181+
case evtSubscribeActionDeliver:
182+
utils.Logger.LogF(100, "Received event from channel: %s", evtSub.Channel)
183+
xmlStr, err := quickRenderXML(event)
184+
if err != nil {
185+
evtSub.Errors <- fmt.Errorf("render in callback: %v", err)
182186
break
183187
}
188+
select {
189+
case incomingEvents <- xmlStr:
190+
default:
191+
utils.Logger.ErrorF("incomingEvents lleno: evento descartado")
192+
}
184193
default:
185194
evtSub.Errors <- fmt.Errorf("windows_events: unsupported action in callback: %x", uint16(action))
186195
}
187196
return 0
188197
}
189198

199+
func quickRenderXML(h uintptr) (string, error) {
200+
bufSize := uint32(4096)
201+
for {
202+
space := make([]uint16, bufSize/2)
203+
used := uint32(0)
204+
prop := uint32(0)
205+
206+
ret, _, err := procEvtRender.Call(
207+
0, h, evtRenderEventXML,
208+
uintptr(bufSize),
209+
uintptr(unsafe.Pointer(&space[0])),
210+
uintptr(unsafe.Pointer(&used)),
211+
uintptr(unsafe.Pointer(&prop)),
212+
)
213+
if ret == 0 {
214+
if err == windows.ERROR_INSUFFICIENT_BUFFER {
215+
bufSize *= 2
216+
continue
217+
}
218+
return "", err
219+
}
220+
return cleanXML(windows.UTF16ToString(space)), nil
221+
}
222+
}
223+
190224
func cleanXML(xmlStr string) string {
191225
xmlStr = strings.TrimSpace(xmlStr)
192226
if idx := strings.Index(xmlStr, "<?xml"); idx > 0 {
@@ -211,39 +245,21 @@ func getCollectorsInstances() []Collector {
211245

212246
func (w Windows) SendLogs() {
213247
errorsChan := make(chan error, 10)
248+
go eventWorker()
214249

215-
callback := func(event *Event) {
216-
eventJSON, err := convertEventToJSON(event)
217-
if err != nil {
218-
utils.Logger.ErrorF("error converting event to JSON: %v", err)
219-
return
220-
}
221-
host, err := os.Hostname()
222-
if err != nil {
223-
utils.Logger.ErrorF("error getting hostname: %v", err)
224-
host = "unknown"
225-
}
226-
validatedLog, _, err := validations.ValidateString(eventJSON, false)
227-
if err != nil {
228-
utils.Logger.LogF(100, "error validating log: %s: %v", eventJSON, err)
229-
return
230-
}
231-
logservice.LogQueue <- &plugins.Log{
232-
DataType: string(config.DataTypeWindowsAgent),
233-
DataSource: host,
234-
Raw: validatedLog,
235-
}
250+
channels := []string{
251+
"Security", "Application", "System", "Windows Powershell", "Microsoft-Windows-Powershell/Operational", "ForwardedEvents",
252+
"Microsoft-Windows-WinLogon/Operational", "Microsoft-Windows-Windows Firewall With Advanced Security/Firewall",
253+
"Microsoft-Windows-Windows Defender/Operational",
236254
}
237255

238-
channels := []string{"Security", "Application", "System"}
239256
var subscriptions []*EventSubscription
240257

241258
for _, channel := range channels {
242259
sub := &EventSubscription{
243-
Channel: channel,
244-
Query: "*",
245-
Errors: errorsChan,
246-
Callback: callback,
260+
Channel: channel,
261+
Query: "*",
262+
Errors: errorsChan,
247263
}
248264
if err := sub.Create(); err != nil {
249265
utils.Logger.ErrorF("Error subscribing to channel %s: %s", channel, err)
@@ -271,6 +287,44 @@ func (w Windows) SendLogs() {
271287
utils.Logger.LogF(100, "Agent finished successfully.")
272288
}
273289

290+
func eventWorker() {
291+
host, err := os.Hostname()
292+
if err != nil {
293+
utils.Logger.ErrorF("error getting hostname: %v", err)
294+
host = "unknown"
295+
}
296+
297+
for xmlStr := range incomingEvents {
298+
ev := new(Event)
299+
if err := xml.Unmarshal([]byte(xmlStr), ev); err != nil {
300+
utils.Logger.ErrorF("unmarshal error: %v", err)
301+
continue
302+
}
303+
304+
eventJSON, err := convertEventToJSON(ev)
305+
if err != nil {
306+
utils.Logger.ErrorF("toJSON error: %v", err)
307+
continue
308+
}
309+
310+
validatedLog, _, err := validations.ValidateString(eventJSON, false)
311+
if err != nil {
312+
utils.Logger.LogF(100, "validation error: %s: %v", eventJSON, err)
313+
continue
314+
}
315+
316+
select {
317+
case logservice.LogQueue <- &plugins.Log{
318+
DataSource: host,
319+
DataType: string(config.DataTypeWindowsAgent),
320+
Raw: validatedLog,
321+
}:
322+
default:
323+
utils.Logger.LogF(100, "LogQueue full: event discarded")
324+
}
325+
}
326+
}
327+
274328
func convertEventToJSON(event *Event) (string, error) {
275329
eventMap := map[string]interface{}{
276330
"timestamp": event.System.TimeCreated.SystemTime,

0 commit comments

Comments
 (0)