@@ -24,21 +24,25 @@ func HandleKinesis(ctx context.Context, event json.RawMessage, cfg *config.Confi
2424 }
2525
2626 for _ , record := range kinesisEvent .Records {
27- cwEvent := events.CloudwatchLogsEvent {
28- AWSLogs : events.CloudwatchLogsRawData {
29- Data : base64 .StdEncoding .EncodeToString (record .Kinesis .Data ),
30- },
31- }
32-
33- cwRaw , err := json .Marshal (cwEvent )
34- if err != nil {
35- return fmt .Errorf ("marshal cloudwatch event from kinesis: %w" , err )
36- }
37-
38- if err := HandleCloudwatchLogs (ctx , cwRaw , cfg , out ); err != nil {
27+ if err := handleKinesisRecord (ctx , record , cfg , out ); err != nil {
3928 slog .WarnContext (ctx , "skipping kinesis record" , "error" , err )
4029 continue
4130 }
4231 }
4332 return nil
4433}
34+
35+ func handleKinesisRecord (ctx context.Context , record events.KinesisEventRecord , cfg * config.Config , out chan <- model.CloudwatchLogEntry ) error {
36+ cwEvent := events.CloudwatchLogsEvent {
37+ AWSLogs : events.CloudwatchLogsRawData {
38+ Data : base64 .StdEncoding .EncodeToString (record .Kinesis .Data ),
39+ },
40+ }
41+
42+ cwRaw , err := json .Marshal (cwEvent )
43+ if err != nil {
44+ return fmt .Errorf ("marshal cloudwatch event from kinesis: %w" , err )
45+ }
46+
47+ return HandleCloudwatchLogs (ctx , cwRaw , cfg , out )
48+ }
0 commit comments