66package parsing
77
88import (
9+ "bytes"
10+ "compress/gzip"
911 "context"
10- "encoding/base64"
1112 "encoding/json"
1213 "fmt"
1314 "log/slog"
@@ -25,18 +26,13 @@ func HandleKinesis(ctx context.Context, event json.RawMessage, cfg *config.Confi
2526 }
2627
2728 for _ , record := range kinesisEvent .Records {
28- cwEvent := events.CloudwatchLogsEvent {
29- AWSLogs : events.CloudwatchLogsRawData {
30- Data : base64 .StdEncoding .EncodeToString (record .Kinesis .Data ),
31- },
32- }
33-
34- cwRaw , err := json .Marshal (cwEvent )
29+ data , err := decodeCloudwatchLogsData (record .Kinesis .Data )
3530 if err != nil {
36- return fmt .Errorf ("marshal cloudwatch event from kinesis: %w" , err )
31+ slog .WarnContext (ctx , "skipping kinesis record" , "error" , err )
32+ continue
3733 }
3834
39- entries , err := parseCloudwatchLogs (ctx , cwRaw , cfg )
35+ entries , err := buildCloudwatchLogEntries (ctx , data , cfg )
4036 if err != nil {
4137 slog .WarnContext (ctx , "skipping kinesis record" , "error" , err )
4238 continue
@@ -50,3 +46,17 @@ func HandleKinesis(ctx context.Context, event json.RawMessage, cfg *config.Confi
5046 }
5147 return nil
5248}
49+
50+ func decodeCloudwatchLogsData (data []byte ) (events.CloudwatchLogsData , error ) {
51+ zr , err := gzip .NewReader (bytes .NewReader (data ))
52+ if err != nil {
53+ return events.CloudwatchLogsData {}, fmt .Errorf ("gzip: %w" , err )
54+ }
55+ defer zr .Close ()
56+
57+ var d events.CloudwatchLogsData
58+ if err := json .NewDecoder (zr ).Decode (& d ); err != nil {
59+ return events.CloudwatchLogsData {}, fmt .Errorf ("decode: %w" , err )
60+ }
61+ return d , nil
62+ }
0 commit comments