@@ -2,6 +2,7 @@ package main
22
33import (
44 "context"
5+ "encoding/json"
56 "errors"
67 "fmt"
78 "net/http"
@@ -418,14 +419,67 @@ func processPartition(ctx context.Context, pc *azeventhubs.ProcessorPartitionCli
418419 }
419420
420421 for _ , event := range events {
421- _ = plugins .EnqueueLog (& plugins.Log {
422- Id : uuid .New ().String (),
423- TenantId : defaultTenant ,
424- DataType : "azure" ,
425- DataSource : groupName ,
426- Timestamp : time .Now ().UTC ().Format (time .RFC3339Nano ),
427- Raw : string (event .Body ),
428- }, "com.utmstack.azure" )
422+ var logData map [string ]any
423+ if err := json .Unmarshal (event .Body , & logData ); err != nil {
424+ _ = catcher .Error ("cannot parse event body" , err , map [string ]any {
425+ "group" : groupName ,
426+ "partitionID" : pc .PartitionID (),
427+ "process" : "plugin_com.utmstack.azure" ,
428+ })
429+ continue
430+ }
431+
432+ if records , ok := logData ["records" ].([]any ); ok && len (records ) > 0 {
433+ for _ , record := range records {
434+ recordMap , ok := record .(map [string ]any )
435+ if ! ok {
436+ _ = catcher .Error ("invalid record format in records array" , nil , map [string ]any {
437+ "group" : groupName ,
438+ "partitionID" : pc .PartitionID (),
439+ "process" : "plugin_com.utmstack.azure" ,
440+ })
441+ continue
442+ }
443+
444+ jsonLog , err := json .Marshal (recordMap )
445+ if err != nil {
446+ _ = catcher .Error ("cannot encode record to JSON" , err , map [string ]any {
447+ "group" : groupName ,
448+ "partitionID" : pc .PartitionID (),
449+ "process" : "plugin_com.utmstack.azure" ,
450+ })
451+ continue
452+ }
453+
454+ _ = plugins .EnqueueLog (& plugins.Log {
455+ Id : uuid .New ().String (),
456+ TenantId : defaultTenant ,
457+ DataType : "azure" ,
458+ DataSource : groupName ,
459+ Timestamp : time .Now ().UTC ().Format (time .RFC3339Nano ),
460+ Raw : string (jsonLog ),
461+ }, "com.utmstack.azure" )
462+ }
463+ } else {
464+ jsonLog , err := json .Marshal (logData )
465+ if err != nil {
466+ _ = catcher .Error ("cannot encode log to JSON" , err , map [string ]any {
467+ "group" : groupName ,
468+ "partitionID" : pc .PartitionID (),
469+ "process" : "plugin_com.utmstack.azure" ,
470+ })
471+ continue
472+ }
473+
474+ _ = plugins .EnqueueLog (& plugins.Log {
475+ Id : uuid .New ().String (),
476+ TenantId : defaultTenant ,
477+ DataType : "azure" ,
478+ DataSource : groupName ,
479+ Timestamp : time .Now ().UTC ().Format (time .RFC3339Nano ),
480+ Raw : string (jsonLog ),
481+ }, "com.utmstack.azure" )
482+ }
429483 }
430484
431485 if err := pc .UpdateCheckpoint (context .Background (), events [len (events )- 1 ], nil ); err != nil {
0 commit comments