Skip to content

Commit 413ea27

Browse files
committed
refactor(azure-plugin): extracts event processing logic into separate functions to handle JSON format detection (array vs object)
1 parent 5945ee1 commit 413ea27

File tree

1 file changed

+85
-61
lines changed

1 file changed

+85
-61
lines changed

plugins/azure/main.go

Lines changed: 85 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -419,67 +419,7 @@ func processPartition(ctx context.Context, pc *azeventhubs.ProcessorPartitionCli
419419
}
420420

421421
for _, event := range events {
422-
var logData map[string]any
423-
if err := json.Unmarshal(event.Body, &logData); err != nil {
424-
_ = catcher.Error("cannot parse event body", err, map[string]any{
425-
"group": groupName,
426-
"partitionID": pc.PartitionID(),
427-
"process": "plugin_com.utmstack.azure",
428-
})
429-
continue
430-
}
431-
432-
if records, ok := logData["records"].([]any); ok && len(records) > 0 {
433-
for _, record := range records {
434-
recordMap, ok := record.(map[string]any)
435-
if !ok {
436-
_ = catcher.Error("invalid record format in records array", nil, map[string]any{
437-
"group": groupName,
438-
"partitionID": pc.PartitionID(),
439-
"process": "plugin_com.utmstack.azure",
440-
})
441-
continue
442-
}
443-
444-
jsonLog, err := json.Marshal(recordMap)
445-
if err != nil {
446-
_ = catcher.Error("cannot encode record to JSON", err, map[string]any{
447-
"group": groupName,
448-
"partitionID": pc.PartitionID(),
449-
"process": "plugin_com.utmstack.azure",
450-
})
451-
continue
452-
}
453-
454-
_ = plugins.EnqueueLog(&plugins.Log{
455-
Id: uuid.New().String(),
456-
TenantId: defaultTenant,
457-
DataType: "azure",
458-
DataSource: groupName,
459-
Timestamp: time.Now().UTC().Format(time.RFC3339Nano),
460-
Raw: string(jsonLog),
461-
}, "com.utmstack.azure")
462-
}
463-
} else {
464-
jsonLog, err := json.Marshal(logData)
465-
if err != nil {
466-
_ = catcher.Error("cannot encode log to JSON", err, map[string]any{
467-
"group": groupName,
468-
"partitionID": pc.PartitionID(),
469-
"process": "plugin_com.utmstack.azure",
470-
})
471-
continue
472-
}
473-
474-
_ = plugins.EnqueueLog(&plugins.Log{
475-
Id: uuid.New().String(),
476-
TenantId: defaultTenant,
477-
DataType: "azure",
478-
DataSource: groupName,
479-
Timestamp: time.Now().UTC().Format(time.RFC3339Nano),
480-
Raw: string(jsonLog),
481-
}, "com.utmstack.azure")
482-
}
422+
processEvent(event.Body, groupName)
483423
}
484424

485425
if err := pc.UpdateCheckpoint(context.Background(), events[len(events)-1], nil); err != nil {
@@ -518,6 +458,90 @@ func getAzureProcessor(group *config.ModuleGroup) AzureConfig {
518458
return azurePro
519459
}
520460

461+
func processEvent(eventBody []byte, groupName string) {
462+
var firstByte byte
463+
for _, b := range eventBody {
464+
if b != ' ' && b != '\t' && b != '\n' && b != '\r' {
465+
firstByte = b
466+
break
467+
}
468+
}
469+
470+
switch firstByte {
471+
case '[':
472+
processArrayEvent(eventBody, groupName)
473+
case '{':
474+
processObjectEvent(eventBody, groupName)
475+
default:
476+
_ = catcher.Error("invalid JSON format: expected array or object", nil, map[string]any{
477+
"group": groupName,
478+
"process": "plugin_com.utmstack.azure",
479+
})
480+
}
481+
}
482+
483+
func processArrayEvent(eventBody []byte, groupName string) {
484+
var records []map[string]any
485+
if err := json.Unmarshal(eventBody, &records); err != nil {
486+
_ = catcher.Error("cannot parse event body as array", err, map[string]any{
487+
"group": groupName,
488+
"process": "plugin_com.utmstack.azure",
489+
})
490+
return
491+
}
492+
493+
for _, record := range records {
494+
enqueueRecord(record, groupName)
495+
}
496+
}
497+
498+
func processObjectEvent(eventBody []byte, groupName string) {
499+
var logData map[string]any
500+
if err := json.Unmarshal(eventBody, &logData); err != nil {
501+
_ = catcher.Error("cannot parse event body as object", err, map[string]any{
502+
"group": groupName,
503+
"process": "plugin_com.utmstack.azure",
504+
})
505+
return
506+
}
507+
508+
if records, ok := logData["records"].([]any); ok && len(records) > 0 {
509+
for _, record := range records {
510+
recordMap, ok := record.(map[string]any)
511+
if !ok {
512+
_ = catcher.Error("invalid record format in records array", nil, map[string]any{
513+
"group": groupName,
514+
"process": "plugin_com.utmstack.azure",
515+
})
516+
continue
517+
}
518+
enqueueRecord(recordMap, groupName)
519+
}
520+
} else {
521+
enqueueRecord(logData, groupName)
522+
}
523+
}
524+
525+
func enqueueRecord(record map[string]any, groupName string) {
526+
jsonLog, err := json.Marshal(record)
527+
if err != nil {
528+
_ = catcher.Error("cannot encode record to JSON", err, map[string]any{
529+
"group": groupName,
530+
"process": "plugin_com.utmstack.azure",
531+
})
532+
return
533+
}
534+
535+
_ = plugins.EnqueueLog(&plugins.Log{
536+
Id: uuid.New().String(),
537+
TenantId: defaultTenant,
538+
DataType: "azure",
539+
DataSource: groupName,
540+
Timestamp: time.Now().UTC().Format(time.RFC3339Nano),
541+
Raw: string(jsonLog),
542+
}, "com.utmstack.azure")
543+
}
544+
521545
func connectionChecker(url string) error {
522546
checkConn := func() error {
523547
if err := checkConnection(url); err != nil {

0 commit comments

Comments
 (0)