From 328b2042f0462e80178a5531c6f93891c21b6953 Mon Sep 17 00:00:00 2001 From: Nabil Dakkoune Date: Fri, 12 Jun 2026 13:24:38 +0200 Subject: [PATCH 1/8] perf(go-forwarder): benchmarking decoder vs unmarshal on parsing --- .../internal/parsing/bench_test.go | 126 ++++++++++++++++++ .../internal/parsing/content.go | 5 +- .../internal/parsing/eventbridge_unmarshal.go | 61 +++++++++ .../internal/parsing/parse2.go | 83 ++++++++++++ .../internal/parsing/sns.go | 4 +- .../internal/parsing/sns_marshal.go | 32 +++++ .../internal/parsing/sqs.go | 6 +- .../internal/parsing/sqs_marshal.go | 62 +++++++++ 8 files changed, 372 insertions(+), 7 deletions(-) create mode 100644 aws/logs_monitoring_go/internal/parsing/bench_test.go create mode 100644 aws/logs_monitoring_go/internal/parsing/eventbridge_unmarshal.go create mode 100644 aws/logs_monitoring_go/internal/parsing/parse2.go create mode 100644 aws/logs_monitoring_go/internal/parsing/sns_marshal.go create mode 100644 aws/logs_monitoring_go/internal/parsing/sqs_marshal.go diff --git a/aws/logs_monitoring_go/internal/parsing/bench_test.go b/aws/logs_monitoring_go/internal/parsing/bench_test.go new file mode 100644 index 000000000..631f7a887 --- /dev/null +++ b/aws/logs_monitoring_go/internal/parsing/bench_test.go @@ -0,0 +1,126 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package parsing + +import ( + "encoding/json" + "testing" +) + +var ( + benchCloudWatch = json.RawMessage(`{"awslogs":{"data":"dGVzdA=="}}`) + benchS3 = json.RawMessage(`{"Records":[{"eventSource":"aws:s3","s3":{"bucket":{"name":"b"},"object":{"key":"k"}}}]}`) + benchSNSS3 = json.RawMessage(`{"Records":[{"EventSource":"aws:sns","Sns":{"Type":"Notification","Message":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b\"},\"object\":{\"key\":\"k\"}}}]}"}}]}`) + benchSQSS3 = json.RawMessage(`{"Records":[` + + `{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b1\"},\"object\":{\"key\":\"k1\"}}}]}"},` + + `{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b2\"},\"object\":{\"key\":\"k2\"}}}]}"},` + + `{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b3\"},\"object\":{\"key\":\"k3\"}}}]}"},` + + `{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b4\"},\"object\":{\"key\":\"k4\"}}}]}"},` + + `{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b5\"},\"object\":{\"key\":\"k5\"}}}]}"},` + + `{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b6\"},\"object\":{\"key\":\"k6\"}}}]}"},` + + `{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b7\"},\"object\":{\"key\":\"k7\"}}}]}"},` + + `{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b8\"},\"object\":{\"key\":\"k8\"}}}]}"},` + + `{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b9\"},\"object\":{\"key\":\"k9\"}}}]}"},` + + `{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b10\"},\"object\":{\"key\":\"k10\"}}}]}"}` + + `]}`) + benchSQSSNSS3 = json.RawMessage(`{"Records":[` + + `{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b1\\\"},\\\"object\\\":{\\\"key\\\":\\\"k1\\\"}}}]}\"}"},` + + `{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b2\\\"},\\\"object\\\":{\\\"key\\\":\\\"k2\\\"}}}]}\"}"},` + + `{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b3\\\"},\\\"object\\\":{\\\"key\\\":\\\"k3\\\"}}}]}\"}"},` + + `{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b4\\\"},\\\"object\\\":{\\\"key\\\":\\\"k4\\\"}}}]}\"}"},` + + `{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b5\\\"},\\\"object\\\":{\\\"key\\\":\\\"k5\\\"}}}]}\"}"},` + + `{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b6\\\"},\\\"object\\\":{\\\"key\\\":\\\"k6\\\"}}}]}\"}"},` + + `{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b7\\\"},\\\"object\\\":{\\\"key\\\":\\\"k7\\\"}}}]}\"}"},` + + `{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b8\\\"},\\\"object\\\":{\\\"key\\\":\\\"k8\\\"}}}]}\"}"},` + + `{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b9\\\"},\\\"object\\\":{\\\"key\\\":\\\"k9\\\"}}}]}\"}"},` + + `{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b10\\\"},\\\"object\\\":{\\\"key\\\":\\\"k10\\\"}}}]}\"}"}` + + `]}`) + benchEventBridge = json.RawMessage(`{"version":"0","detail-type":"Object Created","source":"aws.s3","detail":{"bucket":{"name":"my-bucket"},"object":{"key":"my-key"}}}`) +) + +func BenchmarkParse_Decoder_CloudWatch(b *testing.B) { + b.ReportAllocs() + for b.Loop() { + _, _ = Parse(benchCloudWatch) + } +} + +func BenchmarkParse_Unmarshal_CloudWatch(b *testing.B) { + b.ReportAllocs() + for b.Loop() { + _, _ = ParseUnmarshal(benchCloudWatch) + } +} + +func BenchmarkParse_Decoder_S3(b *testing.B) { + b.ReportAllocs() + for b.Loop() { + _, _ = Parse(benchS3) + } +} + +func BenchmarkParse_Unmarshal_S3(b *testing.B) { + b.ReportAllocs() + for b.Loop() { + _, _ = ParseUnmarshal(benchS3) + } +} + +func BenchmarkParse_Decoder_SNS_S3(b *testing.B) { + b.ReportAllocs() + for b.Loop() { + _, _ = Parse(benchSNSS3) + } +} + +func BenchmarkParse_Unmarshal_SNS_S3(b *testing.B) { + b.ReportAllocs() + for b.Loop() { + _, _ = ParseUnmarshal(benchSNSS3) + } +} + +func BenchmarkParse_Decoder_SQS_S3(b *testing.B) { + b.ReportAllocs() + for b.Loop() { + _, _ = Parse(benchSQSS3) + } +} + +func BenchmarkParse_Unmarshal_SQS_S3(b *testing.B) { + b.ReportAllocs() + for b.Loop() { + _, _ = ParseUnmarshal(benchSQSS3) + } +} + +func BenchmarkParse_Decoder_SQS_SNS_S3(b *testing.B) { + b.ReportAllocs() + for b.Loop() { + _, _ = Parse(benchSQSSNSS3) + } +} + +func BenchmarkParse_Unmarshal_SQS_SNS_S3(b *testing.B) { + b.ReportAllocs() + for b.Loop() { + _, _ = ParseUnmarshal(benchSQSSNSS3) + } +} + +func BenchmarkParse_Decoder_EventBridge(b *testing.B) { + b.ReportAllocs() + for b.Loop() { + _, _ = Parse(benchEventBridge) + } +} + +func BenchmarkParse_Unmarshal_EventBridge(b *testing.B) { + b.ReportAllocs() + for b.Loop() { + _, _ = ParseUnmarshal(benchEventBridge) + } +} diff --git a/aws/logs_monitoring_go/internal/parsing/content.go b/aws/logs_monitoring_go/internal/parsing/content.go index d52101100..b04fe7b2a 100644 --- a/aws/logs_monitoring_go/internal/parsing/content.go +++ b/aws/logs_monitoring_go/internal/parsing/content.go @@ -21,6 +21,7 @@ const ( ) type ParsedEvent struct { - ContentType ContentType - Payload json.RawMessage + ContentType ContentType + Payload json.RawMessage + SQSReceiptHandle string } diff --git a/aws/logs_monitoring_go/internal/parsing/eventbridge_unmarshal.go b/aws/logs_monitoring_go/internal/parsing/eventbridge_unmarshal.go new file mode 100644 index 000000000..165b879ba --- /dev/null +++ b/aws/logs_monitoring_go/internal/parsing/eventbridge_unmarshal.go @@ -0,0 +1,61 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package parsing + +import ( + "encoding/json" + "fmt" + "strings" + + "github.com/aws/aws-lambda-go/events" +) + +type s3EventBridgeDetail struct { + Bucket struct { + Name string `json:"name"` + } `json:"bucket"` + Object struct { + Key string `json:"key"` + } `json:"object"` +} + +func eventBridgeUnmarshal(event json.RawMessage) ([]ParsedEvent, error) { + var eventBridgeEvent events.EventBridgeEvent + if err := json.Unmarshal(event, &eventBridgeEvent); err != nil { + return nil, err + } + + if eventBridgeEvent.Source == eventSourceS3 && strings.Contains(eventBridgeEvent.DetailType, "Object Created") { + var s3eb s3EventBridgeDetail + if err := json.Unmarshal(eventBridgeEvent.Detail, &s3eb); err != nil { + return nil, err + } + + payload, err := mapS3EventBridge(s3eb) + if err != nil { + return nil, err + } + + return []ParsedEvent{{ContentType: ContentTypeS3, Payload: payload}}, nil + } + return []ParsedEvent{{ContentType: ContentTypeEventBridge, Payload: event}}, nil +} + +func mapS3EventBridge(eb s3EventBridgeDetail) (json.RawMessage, error) { + s3EventRecord := events.S3EventRecord{ + EventSource: eventSourceS3, + S3: events.S3Entity{ + Bucket: events.S3Bucket{Name: eb.Bucket.Name}, + Object: events.S3Object{Key: eb.Object.Key, URLDecodedKey: eb.Object.Key}, + }, + } + payload, err := json.Marshal(s3EventRecord) + if err != nil { + return nil, fmt.Errorf("marshal: %w", err) + } + + return payload, nil +} diff --git a/aws/logs_monitoring_go/internal/parsing/parse2.go b/aws/logs_monitoring_go/internal/parsing/parse2.go new file mode 100644 index 000000000..1d3cf0db7 --- /dev/null +++ b/aws/logs_monitoring_go/internal/parsing/parse2.go @@ -0,0 +1,83 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package parsing + +import ( + "encoding/json" + "errors" + "fmt" +) + +type eventDiscriminator struct { + AWSLogs json.RawMessage `json:"awslogs"` // CloudWatch logs + Records []struct { + EventSource string `json:"eventSource"` + } `json:"Records"` // S3, SQS, SNS + Detail json.RawMessage `json:"detail"` // EventBridge + Retry json.RawMessage `json:"retry"` +} + +type recordsDiscriminator struct { + Records json.RawMessage `json:"Records"` +} + +func ParseUnmarshal(event json.RawMessage) ([]ParsedEvent, error) { + var disc eventDiscriminator + if err := json.Unmarshal(event, &disc); err != nil { + return nil, err + } + + switch { + case disc.AWSLogs != nil: + return []ParsedEvent{{ContentType: ContentTypeCloudwatchLogs, Payload: event}}, nil + + case disc.Retry != nil: + return []ParsedEvent{{ContentType: ContentTypeRetry}}, nil + + case len(disc.Records) > 0: + parsed, err := recordsUnmarshal(event, disc) + if err != nil { + return nil, fmt.Errorf("records: %w", err) + } + return parsed, nil + + case disc.Detail != nil: + parsed, err := eventBridgeUnmarshal(event) + if err != nil { + return nil, fmt.Errorf("eventbridge: %w", err) + } + return parsed, nil + } + + return nil, errors.New("unsupported event") +} + +func recordsUnmarshal(event json.RawMessage, disc eventDiscriminator) ([]ParsedEvent, error) { + switch disc.Records[0].EventSource { + case eventSourceS3: + return []ParsedEvent{{ContentType: ContentTypeS3, Payload: event}}, nil + + case eventSourceKinesis: + return []ParsedEvent{{ContentType: ContentTypeKinesis, Payload: event}}, nil + + case eventSourceSQS: + parsed, err := sqsUnmarshal(event) + if err != nil { + return nil, fmt.Errorf("sqs: %w", err) + } + return parsed, nil + + case eventSourceSNS: + parsed, err := snsUnmarshal(event) + if err != nil { + return nil, fmt.Errorf("sns: %w", err) + } + return parsed, nil + + default: + return nil, fmt.Errorf("unsupported event source %q", disc.Records[0].EventSource) + } +} diff --git a/aws/logs_monitoring_go/internal/parsing/sns.go b/aws/logs_monitoring_go/internal/parsing/sns.go index 71a68fd95..ea9ca6bc8 100644 --- a/aws/logs_monitoring_go/internal/parsing/sns.go +++ b/aws/logs_monitoring_go/internal/parsing/sns.go @@ -46,11 +46,11 @@ func parseSNS(event json.RawMessage) ([]ParsedEvent, error) { inner := json.RawMessage(message) if isS3(inner) { - parsed = append(parsed, ParsedEvent{ContentTypeS3, inner}) + parsed = append(parsed, ParsedEvent{ContentType: ContentTypeS3, Payload: inner}) continue } - parsed = append(parsed, ParsedEvent{ContentTypeSNS, record}) + parsed = append(parsed, ParsedEvent{ContentType: ContentTypeSNS, Payload: record}) } return parsed, nil diff --git a/aws/logs_monitoring_go/internal/parsing/sns_marshal.go b/aws/logs_monitoring_go/internal/parsing/sns_marshal.go new file mode 100644 index 000000000..43b00ca91 --- /dev/null +++ b/aws/logs_monitoring_go/internal/parsing/sns_marshal.go @@ -0,0 +1,32 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package parsing + +import ( + "encoding/json" + + "github.com/aws/aws-lambda-go/events" +) + +func snsUnmarshal(event json.RawMessage) ([]ParsedEvent, error) { + var snsEvent events.SNSEvent + if err := json.Unmarshal(event, &snsEvent); err != nil { + return nil, err + } + + var parsed []ParsedEvent + for _, record := range snsEvent.Records { + inner := record.SNS.Message + if err := json.Unmarshal([]byte(inner), &recordsDiscriminator{}); err != nil { + parsed = append(parsed, ParsedEvent{ContentType: ContentTypeS3, Payload: json.RawMessage(inner)}) + continue + } + + parsed = append(parsed, ParsedEvent{ContentType: ContentTypeSNS, Payload: event}) + } + + return parsed, nil +} diff --git a/aws/logs_monitoring_go/internal/parsing/sqs.go b/aws/logs_monitoring_go/internal/parsing/sqs.go index 2d5bb6964..bee33f8f9 100644 --- a/aws/logs_monitoring_go/internal/parsing/sqs.go +++ b/aws/logs_monitoring_go/internal/parsing/sqs.go @@ -91,7 +91,7 @@ func parseSQSBody(body string) (ParsedEvent, error) { } case recordsKey: if isS3(inner) { - return ParsedEvent{ContentTypeS3, inner}, nil + return ParsedEvent{ContentType: ContentTypeS3, Payload: inner}, nil } return ParsedEvent{}, errUnknownEvent default: @@ -104,9 +104,9 @@ func parseSQSBody(body string) (ParsedEvent, error) { if typ == "Notification" && message != "" { msg := json.RawMessage(message) if isS3(msg) { - return ParsedEvent{ContentTypeS3, msg}, nil + return ParsedEvent{ContentType: ContentTypeS3, Payload: msg}, nil } - return ParsedEvent{ContentTypeSNS, inner}, nil + return ParsedEvent{ContentType: ContentTypeSNS, Payload: inner}, nil } return ParsedEvent{}, errUnknownEvent diff --git a/aws/logs_monitoring_go/internal/parsing/sqs_marshal.go b/aws/logs_monitoring_go/internal/parsing/sqs_marshal.go new file mode 100644 index 000000000..e27b65a85 --- /dev/null +++ b/aws/logs_monitoring_go/internal/parsing/sqs_marshal.go @@ -0,0 +1,62 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package parsing + +import ( + "encoding/json" + + "github.com/aws/aws-lambda-go/events" +) + +type sqsBodyDiscriminator struct { + Type string `json:"Type"` // SNS inside SQS + Message string `json:"Message"` // SNS inside SQS + Records json.RawMessage `json:"Records"` // S3 inside SQS +} + +func sqsUnmarshal(event json.RawMessage) ([]ParsedEvent, error) { + var sqsEvent events.SQSEvent + if err := json.Unmarshal(event, &sqsEvent); err != nil { + return nil, err + } + + var parsed []ParsedEvent + for _, msg := range sqsEvent.Records { + pe, err := sqsBody(msg.Body) + if err != nil { + return nil, err + } + + pe.SQSReceiptHandle = msg.ReceiptHandle + parsed = append(parsed, pe) + } + + return parsed, nil +} + +func sqsBody(body string) (ParsedEvent, error) { + raw := json.RawMessage(body) + + var disc sqsBodyDiscriminator + if err := json.Unmarshal(raw, &disc); err != nil { + return ParsedEvent{}, err + } + + switch { + case disc.Records != nil: + return ParsedEvent{ContentType: ContentTypeS3, Payload: raw}, nil + + case disc.Type == "Notification" && disc.Message != "": + var innerDisc recordsDiscriminator + if err := json.Unmarshal([]byte(disc.Message), &innerDisc); err != nil { + return ParsedEvent{ContentType: ContentTypeS3, Payload: json.RawMessage(disc.Message)}, nil + } + + return ParsedEvent{ContentType: ContentTypeSNS, Payload: raw}, nil + } + + return ParsedEvent{}, errUnknownEvent +} From 18e589308637ae9ff06ad888707fef09e458e75c Mon Sep 17 00:00:00 2001 From: Nabil Dakkoune Date: Wed, 17 Jun 2026 15:28:40 +0200 Subject: [PATCH 2/8] avoid stuttering --- .../internal/parsing/content.go | 2 +- .../internal/parsing/eventbridge.go | 6 ++--- .../internal/parsing/eventbridge_unmarshal.go | 6 ++--- .../internal/parsing/parse.go | 12 ++++----- .../internal/parsing/parse2.go | 12 ++++----- .../internal/parsing/sns.go | 8 +++--- .../internal/parsing/sns_marshal.go | 8 +++--- .../internal/parsing/sqs.go | 26 +++++++++---------- .../internal/parsing/sqs_marshal.go | 16 ++++++------ 9 files changed, 48 insertions(+), 48 deletions(-) diff --git a/aws/logs_monitoring_go/internal/parsing/content.go b/aws/logs_monitoring_go/internal/parsing/content.go index b04fe7b2a..8818e7d62 100644 --- a/aws/logs_monitoring_go/internal/parsing/content.go +++ b/aws/logs_monitoring_go/internal/parsing/content.go @@ -20,7 +20,7 @@ const ( ContentTypeRetry ) -type ParsedEvent struct { +type Event struct { ContentType ContentType Payload json.RawMessage SQSReceiptHandle string diff --git a/aws/logs_monitoring_go/internal/parsing/eventbridge.go b/aws/logs_monitoring_go/internal/parsing/eventbridge.go index becbb2279..78ab79116 100644 --- a/aws/logs_monitoring_go/internal/parsing/eventbridge.go +++ b/aws/logs_monitoring_go/internal/parsing/eventbridge.go @@ -19,7 +19,7 @@ const ( eventBridgeDetailTypeS3 = "Object Created" ) -func parseEventBridge(event json.RawMessage) ([]ParsedEvent, error) { +func parseEventBridge(event json.RawMessage) ([]Event, error) { source, detailType, detail, err := decodeEventBridgeEnvelope(event) if err != nil { return nil, fmt.Errorf("decode eventbridge: %w", err) @@ -30,10 +30,10 @@ func parseEventBridge(event json.RawMessage) ([]ParsedEvent, error) { if err != nil { return nil, fmt.Errorf("build s3 event from eventbridge: %w", err) } - return []ParsedEvent{{ContentType: ContentTypeS3, Payload: s3Event}}, nil + return []Event{{ContentType: ContentTypeS3, Payload: s3Event}}, nil } - return []ParsedEvent{{ContentType: ContentTypeEventBridge, Payload: event}}, nil + return []Event{{ContentType: ContentTypeEventBridge, Payload: event}}, nil } func decodeEventBridgeEnvelope(event json.RawMessage) (source, detailType string, detail json.RawMessage, err error) { diff --git a/aws/logs_monitoring_go/internal/parsing/eventbridge_unmarshal.go b/aws/logs_monitoring_go/internal/parsing/eventbridge_unmarshal.go index 165b879ba..e6012ae31 100644 --- a/aws/logs_monitoring_go/internal/parsing/eventbridge_unmarshal.go +++ b/aws/logs_monitoring_go/internal/parsing/eventbridge_unmarshal.go @@ -22,7 +22,7 @@ type s3EventBridgeDetail struct { } `json:"object"` } -func eventBridgeUnmarshal(event json.RawMessage) ([]ParsedEvent, error) { +func eventBridgeUnmarshal(event json.RawMessage) ([]Event, error) { var eventBridgeEvent events.EventBridgeEvent if err := json.Unmarshal(event, &eventBridgeEvent); err != nil { return nil, err @@ -39,9 +39,9 @@ func eventBridgeUnmarshal(event json.RawMessage) ([]ParsedEvent, error) { return nil, err } - return []ParsedEvent{{ContentType: ContentTypeS3, Payload: payload}}, nil + return []Event{{ContentType: ContentTypeS3, Payload: payload}}, nil } - return []ParsedEvent{{ContentType: ContentTypeEventBridge, Payload: event}}, nil + return []Event{{ContentType: ContentTypeEventBridge, Payload: event}}, nil } func mapS3EventBridge(eb s3EventBridgeDetail) (json.RawMessage, error) { diff --git a/aws/logs_monitoring_go/internal/parsing/parse.go b/aws/logs_monitoring_go/internal/parsing/parse.go index 9fe9fce11..bc565169d 100644 --- a/aws/logs_monitoring_go/internal/parsing/parse.go +++ b/aws/logs_monitoring_go/internal/parsing/parse.go @@ -26,7 +26,7 @@ const ( eventSourceSNS = "aws:sns" ) -func Parse(event json.RawMessage) ([]ParsedEvent, error) { +func Parse(event json.RawMessage) ([]Event, error) { dec := json.NewDecoder(bytes.NewReader(event)) if err := SkipBrace(dec); err != nil { return nil, err @@ -40,9 +40,9 @@ func Parse(event json.RawMessage) ([]ParsedEvent, error) { switch key { case cloudwatchLogsKey: - return []ParsedEvent{{ContentType: ContentTypeCloudwatchLogs, Payload: event}}, nil + return []Event{{ContentType: ContentTypeCloudwatchLogs, Payload: event}}, nil case retryKey: - return []ParsedEvent{{ContentType: ContentTypeRetry}}, nil + return []Event{{ContentType: ContentTypeRetry}}, nil case recordsKey: parsed, err := parseRecords(event, dec) if err != nil { @@ -65,7 +65,7 @@ func Parse(event json.RawMessage) ([]ParsedEvent, error) { return nil, errors.New("unsupported event") } -func parseRecords(event json.RawMessage, dec *json.Decoder) ([]ParsedEvent, error) { +func parseRecords(event json.RawMessage, dec *json.Decoder) ([]Event, error) { if err := SkipBracket(dec); err != nil { return nil, err } @@ -104,9 +104,9 @@ func parseRecords(event json.RawMessage, dec *json.Decoder) ([]ParsedEvent, erro switch eventSource { case eventSourceS3: - return []ParsedEvent{{ContentType: ContentTypeS3, Payload: event}}, nil + return []Event{{ContentType: ContentTypeS3, Payload: event}}, nil case eventSourceKinesis: - return []ParsedEvent{{ContentType: ContentTypeKinesis, Payload: event}}, nil + return []Event{{ContentType: ContentTypeKinesis, Payload: event}}, nil case eventSourceSQS: parsed, err := parseSQS(event) if err != nil { diff --git a/aws/logs_monitoring_go/internal/parsing/parse2.go b/aws/logs_monitoring_go/internal/parsing/parse2.go index 1d3cf0db7..f7cf634ba 100644 --- a/aws/logs_monitoring_go/internal/parsing/parse2.go +++ b/aws/logs_monitoring_go/internal/parsing/parse2.go @@ -24,7 +24,7 @@ type recordsDiscriminator struct { Records json.RawMessage `json:"Records"` } -func ParseUnmarshal(event json.RawMessage) ([]ParsedEvent, error) { +func ParseUnmarshal(event json.RawMessage) ([]Event, error) { var disc eventDiscriminator if err := json.Unmarshal(event, &disc); err != nil { return nil, err @@ -32,10 +32,10 @@ func ParseUnmarshal(event json.RawMessage) ([]ParsedEvent, error) { switch { case disc.AWSLogs != nil: - return []ParsedEvent{{ContentType: ContentTypeCloudwatchLogs, Payload: event}}, nil + return []Event{{ContentType: ContentTypeCloudwatchLogs, Payload: event}}, nil case disc.Retry != nil: - return []ParsedEvent{{ContentType: ContentTypeRetry}}, nil + return []Event{{ContentType: ContentTypeRetry}}, nil case len(disc.Records) > 0: parsed, err := recordsUnmarshal(event, disc) @@ -55,13 +55,13 @@ func ParseUnmarshal(event json.RawMessage) ([]ParsedEvent, error) { return nil, errors.New("unsupported event") } -func recordsUnmarshal(event json.RawMessage, disc eventDiscriminator) ([]ParsedEvent, error) { +func recordsUnmarshal(event json.RawMessage, disc eventDiscriminator) ([]Event, error) { switch disc.Records[0].EventSource { case eventSourceS3: - return []ParsedEvent{{ContentType: ContentTypeS3, Payload: event}}, nil + return []Event{{ContentType: ContentTypeS3, Payload: event}}, nil case eventSourceKinesis: - return []ParsedEvent{{ContentType: ContentTypeKinesis, Payload: event}}, nil + return []Event{{ContentType: ContentTypeKinesis, Payload: event}}, nil case eventSourceSQS: parsed, err := sqsUnmarshal(event) diff --git a/aws/logs_monitoring_go/internal/parsing/sns.go b/aws/logs_monitoring_go/internal/parsing/sns.go index ea9ca6bc8..91b622fb8 100644 --- a/aws/logs_monitoring_go/internal/parsing/sns.go +++ b/aws/logs_monitoring_go/internal/parsing/sns.go @@ -11,14 +11,14 @@ import ( "fmt" ) -func parseSNS(event json.RawMessage) ([]ParsedEvent, error) { +func parseSNS(event json.RawMessage) ([]Event, error) { dec := json.NewDecoder(bytes.NewReader(event)) if err := SkipToRecords(dec); err != nil { return nil, fmt.Errorf("skip to records: %w", err) } - var parsed []ParsedEvent + var parsed []Event for dec.More() { var record json.RawMessage if err := dec.Decode(&record); err != nil { @@ -46,11 +46,11 @@ func parseSNS(event json.RawMessage) ([]ParsedEvent, error) { inner := json.RawMessage(message) if isS3(inner) { - parsed = append(parsed, ParsedEvent{ContentType: ContentTypeS3, Payload: inner}) + parsed = append(parsed, Event{ContentType: ContentTypeS3, Payload: inner}) continue } - parsed = append(parsed, ParsedEvent{ContentType: ContentTypeSNS, Payload: record}) + parsed = append(parsed, Event{ContentType: ContentTypeSNS, Payload: record}) } return parsed, nil diff --git a/aws/logs_monitoring_go/internal/parsing/sns_marshal.go b/aws/logs_monitoring_go/internal/parsing/sns_marshal.go index 43b00ca91..0cee7b5b1 100644 --- a/aws/logs_monitoring_go/internal/parsing/sns_marshal.go +++ b/aws/logs_monitoring_go/internal/parsing/sns_marshal.go @@ -11,21 +11,21 @@ import ( "github.com/aws/aws-lambda-go/events" ) -func snsUnmarshal(event json.RawMessage) ([]ParsedEvent, error) { +func snsUnmarshal(event json.RawMessage) ([]Event, error) { var snsEvent events.SNSEvent if err := json.Unmarshal(event, &snsEvent); err != nil { return nil, err } - var parsed []ParsedEvent + var parsed []Event for _, record := range snsEvent.Records { inner := record.SNS.Message if err := json.Unmarshal([]byte(inner), &recordsDiscriminator{}); err != nil { - parsed = append(parsed, ParsedEvent{ContentType: ContentTypeS3, Payload: json.RawMessage(inner)}) + parsed = append(parsed, Event{ContentType: ContentTypeS3, Payload: json.RawMessage(inner)}) continue } - parsed = append(parsed, ParsedEvent{ContentType: ContentTypeSNS, Payload: event}) + parsed = append(parsed, Event{ContentType: ContentTypeSNS, Payload: event}) } return parsed, nil diff --git a/aws/logs_monitoring_go/internal/parsing/sqs.go b/aws/logs_monitoring_go/internal/parsing/sqs.go index bee33f8f9..928558ab3 100644 --- a/aws/logs_monitoring_go/internal/parsing/sqs.go +++ b/aws/logs_monitoring_go/internal/parsing/sqs.go @@ -13,13 +13,13 @@ import ( "log/slog" ) -func parseSQS(event json.RawMessage) ([]ParsedEvent, error) { +func parseSQS(event json.RawMessage) ([]Event, error) { dec := json.NewDecoder(bytes.NewReader(event)) if err := SkipToRecords(dec); err != nil { return nil, fmt.Errorf("skip to records: %w", err) } - var parsed []ParsedEvent + var parsed []Event for i := 0; dec.More(); i++ { body, err := extractBody(dec) if err != nil { @@ -65,38 +65,38 @@ func extractBody(dec *json.Decoder) (string, error) { return body, nil } -func parseSQSBody(body string) (ParsedEvent, error) { +func parseSQSBody(body string) (Event, error) { inner := json.RawMessage(body) dec := json.NewDecoder(bytes.NewReader(inner)) if err := SkipBrace(dec); err != nil { - return ParsedEvent{}, err + return Event{}, err } var typ, message string for dec.More() { key, err := dec.Token() if err != nil { - return ParsedEvent{}, err + return Event{}, err } switch key { case "Type": if err := dec.Decode(&typ); err != nil { - return ParsedEvent{}, fmt.Errorf("decode: %w", err) + return Event{}, fmt.Errorf("decode: %w", err) } case "Message": if err := dec.Decode(&message); err != nil { - return ParsedEvent{}, fmt.Errorf("decode: %w", err) + return Event{}, fmt.Errorf("decode: %w", err) } case recordsKey: if isS3(inner) { - return ParsedEvent{ContentType: ContentTypeS3, Payload: inner}, nil + return Event{ContentType: ContentTypeS3, Payload: inner}, nil } - return ParsedEvent{}, errUnknownEvent + return Event{}, errUnknownEvent default: if err := Skip(dec); err != nil { - return ParsedEvent{}, err + return Event{}, err } } } @@ -104,10 +104,10 @@ func parseSQSBody(body string) (ParsedEvent, error) { if typ == "Notification" && message != "" { msg := json.RawMessage(message) if isS3(msg) { - return ParsedEvent{ContentType: ContentTypeS3, Payload: msg}, nil + return Event{ContentType: ContentTypeS3, Payload: msg}, nil } - return ParsedEvent{ContentType: ContentTypeSNS, Payload: inner}, nil + return Event{ContentType: ContentTypeSNS, Payload: inner}, nil } - return ParsedEvent{}, errUnknownEvent + return Event{}, errUnknownEvent } diff --git a/aws/logs_monitoring_go/internal/parsing/sqs_marshal.go b/aws/logs_monitoring_go/internal/parsing/sqs_marshal.go index e27b65a85..c21cb0082 100644 --- a/aws/logs_monitoring_go/internal/parsing/sqs_marshal.go +++ b/aws/logs_monitoring_go/internal/parsing/sqs_marshal.go @@ -17,13 +17,13 @@ type sqsBodyDiscriminator struct { Records json.RawMessage `json:"Records"` // S3 inside SQS } -func sqsUnmarshal(event json.RawMessage) ([]ParsedEvent, error) { +func sqsUnmarshal(event json.RawMessage) ([]Event, error) { var sqsEvent events.SQSEvent if err := json.Unmarshal(event, &sqsEvent); err != nil { return nil, err } - var parsed []ParsedEvent + var parsed []Event for _, msg := range sqsEvent.Records { pe, err := sqsBody(msg.Body) if err != nil { @@ -37,26 +37,26 @@ func sqsUnmarshal(event json.RawMessage) ([]ParsedEvent, error) { return parsed, nil } -func sqsBody(body string) (ParsedEvent, error) { +func sqsBody(body string) (Event, error) { raw := json.RawMessage(body) var disc sqsBodyDiscriminator if err := json.Unmarshal(raw, &disc); err != nil { - return ParsedEvent{}, err + return Event{}, err } switch { case disc.Records != nil: - return ParsedEvent{ContentType: ContentTypeS3, Payload: raw}, nil + return Event{ContentType: ContentTypeS3, Payload: raw}, nil case disc.Type == "Notification" && disc.Message != "": var innerDisc recordsDiscriminator if err := json.Unmarshal([]byte(disc.Message), &innerDisc); err != nil { - return ParsedEvent{ContentType: ContentTypeS3, Payload: json.RawMessage(disc.Message)}, nil + return Event{ContentType: ContentTypeS3, Payload: json.RawMessage(disc.Message)}, nil } - return ParsedEvent{ContentType: ContentTypeSNS, Payload: raw}, nil + return Event{ContentType: ContentTypeSNS, Payload: raw}, nil } - return ParsedEvent{}, errUnknownEvent + return Event{}, errUnknownEvent } From 9681b7125e71f1850d8c3f2a24b1bee8c138e506 Mon Sep 17 00:00:00 2001 From: Nabil Dakkoune Date: Wed, 17 Jun 2026 16:02:17 +0200 Subject: [PATCH 3/8] lets reuse what we can --- aws/logs_monitoring_go/internal/parsing/parse2.go | 3 --- .../internal/parsing/sns_marshal.go | 8 +++++--- .../internal/parsing/sqs_marshal.go | 12 ++++++------ 3 files changed, 11 insertions(+), 12 deletions(-) diff --git a/aws/logs_monitoring_go/internal/parsing/parse2.go b/aws/logs_monitoring_go/internal/parsing/parse2.go index f7cf634ba..960f96ab7 100644 --- a/aws/logs_monitoring_go/internal/parsing/parse2.go +++ b/aws/logs_monitoring_go/internal/parsing/parse2.go @@ -20,9 +20,6 @@ type eventDiscriminator struct { Retry json.RawMessage `json:"retry"` } -type recordsDiscriminator struct { - Records json.RawMessage `json:"Records"` -} func ParseUnmarshal(event json.RawMessage) ([]Event, error) { var disc eventDiscriminator diff --git a/aws/logs_monitoring_go/internal/parsing/sns_marshal.go b/aws/logs_monitoring_go/internal/parsing/sns_marshal.go index 0cee7b5b1..be977b651 100644 --- a/aws/logs_monitoring_go/internal/parsing/sns_marshal.go +++ b/aws/logs_monitoring_go/internal/parsing/sns_marshal.go @@ -19,9 +19,11 @@ func snsUnmarshal(event json.RawMessage) ([]Event, error) { var parsed []Event for _, record := range snsEvent.Records { - inner := record.SNS.Message - if err := json.Unmarshal([]byte(inner), &recordsDiscriminator{}); err != nil { - parsed = append(parsed, Event{ContentType: ContentTypeS3, Payload: json.RawMessage(inner)}) + inner := json.RawMessage(record.SNS.Message) + + var disc eventDiscriminator + if err := json.Unmarshal(inner, &disc); err == nil && len(disc.Records) > 0 && disc.Records[0].EventSource == eventSourceS3 { + parsed = append(parsed, Event{ContentType: ContentTypeS3, Payload: inner}) continue } diff --git a/aws/logs_monitoring_go/internal/parsing/sqs_marshal.go b/aws/logs_monitoring_go/internal/parsing/sqs_marshal.go index c21cb0082..b493eecd1 100644 --- a/aws/logs_monitoring_go/internal/parsing/sqs_marshal.go +++ b/aws/logs_monitoring_go/internal/parsing/sqs_marshal.go @@ -12,9 +12,9 @@ import ( ) type sqsBodyDiscriminator struct { - Type string `json:"Type"` // SNS inside SQS - Message string `json:"Message"` // SNS inside SQS - Records json.RawMessage `json:"Records"` // S3 inside SQS + Type string `json:"Type"` // SNS inside SQS when raw message delivery disabled. See https://docs.aws.amazon.com/sns/latest/dg/sns-large-payload-raw-message-delivery.html + Message string `json:"Message"` // SNS inside SQS when raw message delivery disabled. See https://docs.aws.amazon.com/sns/latest/dg/sns-large-payload-raw-message-delivery.html + eventDiscriminator } func sqsUnmarshal(event json.RawMessage) ([]Event, error) { @@ -46,12 +46,12 @@ func sqsBody(body string) (Event, error) { } switch { - case disc.Records != nil: + case len(disc.Records) > 0 && disc.Records[0].EventSource == eventSourceS3: return Event{ContentType: ContentTypeS3, Payload: raw}, nil case disc.Type == "Notification" && disc.Message != "": - var innerDisc recordsDiscriminator - if err := json.Unmarshal([]byte(disc.Message), &innerDisc); err != nil { + var innerDisc eventDiscriminator + if err := json.Unmarshal([]byte(disc.Message), &innerDisc); err == nil && len(innerDisc.Records) > 0 && innerDisc.Records[0].EventSource == eventSourceS3 { return Event{ContentType: ContentTypeS3, Payload: json.RawMessage(disc.Message)}, nil } From e9879565df41aad2335c48b1839c27e4821bec88 Mon Sep 17 00:00:00 2001 From: Nabil Dakkoune Date: Wed, 17 Jun 2026 16:02:32 +0200 Subject: [PATCH 4/8] fixup! lets reuse what we can --- .../internal/parsing/bench_test.go | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/aws/logs_monitoring_go/internal/parsing/bench_test.go b/aws/logs_monitoring_go/internal/parsing/bench_test.go index 631f7a887..8253c29aa 100644 --- a/aws/logs_monitoring_go/internal/parsing/bench_test.go +++ b/aws/logs_monitoring_go/internal/parsing/bench_test.go @@ -13,7 +13,7 @@ import ( var ( benchCloudWatch = json.RawMessage(`{"awslogs":{"data":"dGVzdA=="}}`) benchS3 = json.RawMessage(`{"Records":[{"eventSource":"aws:s3","s3":{"bucket":{"name":"b"},"object":{"key":"k"}}}]}`) - benchSNSS3 = json.RawMessage(`{"Records":[{"EventSource":"aws:sns","Sns":{"Type":"Notification","Message":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b\"},\"object\":{\"key\":\"k\"}}}]}"}}]}`) + benchSNSS3 = json.RawMessage(`{"Records":[{"EventSource":"aws:sns","Sns":{"Type":"Notification","Message":"{\"Records\":[{\"eventSource\":\"aws:s3\",\"s3\":{\"bucket\":{\"name\":\"b\"},\"object\":{\"key\":\"k\"}}}]}"}}]}`) benchSQSS3 = json.RawMessage(`{"Records":[` + `{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b1\"},\"object\":{\"key\":\"k1\"}}}]}"},` + `{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b2\"},\"object\":{\"key\":\"k2\"}}}]}"},` + @@ -27,16 +27,16 @@ var ( `{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b10\"},\"object\":{\"key\":\"k10\"}}}]}"}` + `]}`) benchSQSSNSS3 = json.RawMessage(`{"Records":[` + - `{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b1\\\"},\\\"object\\\":{\\\"key\\\":\\\"k1\\\"}}}]}\"}"},` + - `{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b2\\\"},\\\"object\\\":{\\\"key\\\":\\\"k2\\\"}}}]}\"}"},` + - `{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b3\\\"},\\\"object\\\":{\\\"key\\\":\\\"k3\\\"}}}]}\"}"},` + - `{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b4\\\"},\\\"object\\\":{\\\"key\\\":\\\"k4\\\"}}}]}\"}"},` + - `{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b5\\\"},\\\"object\\\":{\\\"key\\\":\\\"k5\\\"}}}]}\"}"},` + - `{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b6\\\"},\\\"object\\\":{\\\"key\\\":\\\"k6\\\"}}}]}\"}"},` + - `{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b7\\\"},\\\"object\\\":{\\\"key\\\":\\\"k7\\\"}}}]}\"}"},` + - `{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b8\\\"},\\\"object\\\":{\\\"key\\\":\\\"k8\\\"}}}]}\"}"},` + - `{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b9\\\"},\\\"object\\\":{\\\"key\\\":\\\"k9\\\"}}}]}\"}"},` + - `{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b10\\\"},\\\"object\\\":{\\\"key\\\":\\\"k10\\\"}}}]}\"}"}` + + `{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"eventSource\\\":\\\"aws:s3\\\",\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b1\\\"},\\\"object\\\":{\\\"key\\\":\\\"k1\\\"}}}]}\"}"},` + + `{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"eventSource\\\":\\\"aws:s3\\\",\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b2\\\"},\\\"object\\\":{\\\"key\\\":\\\"k2\\\"}}}]}\"}"},` + + `{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"eventSource\\\":\\\"aws:s3\\\",\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b3\\\"},\\\"object\\\":{\\\"key\\\":\\\"k3\\\"}}}]}\"}"},` + + `{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"eventSource\\\":\\\"aws:s3\\\",\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b4\\\"},\\\"object\\\":{\\\"key\\\":\\\"k4\\\"}}}]}\"}"},` + + `{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"eventSource\\\":\\\"aws:s3\\\",\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b5\\\"},\\\"object\\\":{\\\"key\\\":\\\"k5\\\"}}}]}\"}"},` + + `{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"eventSource\\\":\\\"aws:s3\\\",\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b6\\\"},\\\"object\\\":{\\\"key\\\":\\\"k6\\\"}}}]}\"}"},` + + `{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"eventSource\\\":\\\"aws:s3\\\",\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b7\\\"},\\\"object\\\":{\\\"key\\\":\\\"k7\\\"}}}]}\"}"},` + + `{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"eventSource\\\":\\\"aws:s3\\\",\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b8\\\"},\\\"object\\\":{\\\"key\\\":\\\"k8\\\"}}}]}\"}"},` + + `{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"eventSource\\\":\\\"aws:s3\\\",\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b9\\\"},\\\"object\\\":{\\\"key\\\":\\\"k9\\\"}}}]}\"}"},` + + `{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"eventSource\\\":\\\"aws:s3\\\",\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b10\\\"},\\\"object\\\":{\\\"key\\\":\\\"k10\\\"}}}]}\"}"}` + `]}`) benchEventBridge = json.RawMessage(`{"version":"0","detail-type":"Object Created","source":"aws.s3","detail":{"bucket":{"name":"my-bucket"},"object":{"key":"my-key"}}}`) ) From dffff4dc3623b81e44f19e2dae7b375920bfca48 Mon Sep 17 00:00:00 2001 From: Nabil Dakkoune Date: Wed, 17 Jun 2026 17:47:23 +0200 Subject: [PATCH 5/8] final --- aws/logs_monitoring_go/internal/pipeline/pipeline.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/aws/logs_monitoring_go/internal/pipeline/pipeline.go b/aws/logs_monitoring_go/internal/pipeline/pipeline.go index 2148a6c9b..33e6df027 100644 --- a/aws/logs_monitoring_go/internal/pipeline/pipeline.go +++ b/aws/logs_monitoring_go/internal/pipeline/pipeline.go @@ -42,9 +42,9 @@ func New( func (p *Pipeline) Start( ctx context.Context, - parsedEvents []parsing.ParsedEvent, + events []parsing.Event, ) error { - contentType := parsedEvents[0].ContentType + contentType := events[0].ContentType if contentType == parsing.ContentTypeRetry { if err := p.forwarder.Retry(ctx); err != nil { return fmt.Errorf("retry: %w", err) @@ -58,7 +58,7 @@ func (p *Pipeline) Start( eg.Go(func() error { defer close(entries) - for _, parsedEvent := range parsedEvents { + for _, parsedEvent := range events { handler, err := handling.NewHandler(ctx, p.hcfg, p.scrubber, p.filterer, parsedEvent.ContentType) if err != nil { return fmt.Errorf("new handler: %w", err) From cfcc3bfe1e5251f21e58a1dc9dcfc19fc8e74a49 Mon Sep 17 00:00:00 2001 From: Nabil Dakkoune Date: Thu, 18 Jun 2026 10:10:52 +0200 Subject: [PATCH 6/8] Delete old implementation, non-used key not found and move skipXXX --- .../internal/handling/cloudtrail.go | 85 ++++++-- .../internal/handling/eventbridge.go | 21 +- .../internal/parsing/bench_test.go | 54 +---- .../internal/parsing/error.go | 26 --- .../internal/parsing/eventbridge.go | 131 +++--------- .../internal/parsing/eventbridge_unmarshal.go | 61 ------ .../internal/parsing/parse.go | 198 ++++-------------- .../internal/parsing/parse2.go | 80 ------- .../internal/parsing/parse_test.go | 16 +- .../internal/parsing/sns.go | 64 ++---- .../internal/parsing/sns_marshal.go | 34 --- .../internal/parsing/sqs.go | 115 +++------- .../internal/parsing/sqs_marshal.go | 62 ------ 13 files changed, 204 insertions(+), 743 deletions(-) delete mode 100644 aws/logs_monitoring_go/internal/parsing/error.go delete mode 100644 aws/logs_monitoring_go/internal/parsing/eventbridge_unmarshal.go delete mode 100644 aws/logs_monitoring_go/internal/parsing/parse2.go delete mode 100644 aws/logs_monitoring_go/internal/parsing/sns_marshal.go delete mode 100644 aws/logs_monitoring_go/internal/parsing/sqs_marshal.go diff --git a/aws/logs_monitoring_go/internal/handling/cloudtrail.go b/aws/logs_monitoring_go/internal/handling/cloudtrail.go index 96f1274b5..0ba3058ae 100644 --- a/aws/logs_monitoring_go/internal/handling/cloudtrail.go +++ b/aws/logs_monitoring_go/internal/handling/cloudtrail.go @@ -10,11 +10,7 @@ import ( "fmt" "io" "iter" - "log/slog" "regexp" - "strings" - - "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/parsing" ) var ( @@ -25,7 +21,7 @@ var ( func decodeCloudTrail(r io.Reader) iter.Seq2[string, error] { return func(yield func(string, error) bool) { dec := json.NewDecoder(r) - if err := parsing.SkipToRecords(dec); err != nil { + if err := skipToRecords(dec); err != nil { yield("", fmt.Errorf("cloudtrail: %w", err)) return } @@ -43,33 +39,76 @@ func decodeCloudTrail(r io.Reader) iter.Seq2[string, error] { } } -func cloudtrailHost(message string) (host string) { - dec := json.NewDecoder(strings.NewReader(message)) - if err := parsing.SkipBrace(dec); err != nil { - return +func cloudtrailHost(message string) string { + var record struct { + UserIdentity struct { + ARN string `json:"arn"` + } `json:"userIdentity"` + } + if err := json.Unmarshal([]byte(message), &record); err != nil { + return "" } - if err := parsing.SkipToKey(dec, "userIdentity"); err != nil { - return + matches := ec2InstanceRegexp.FindStringSubmatch(record.UserIdentity.ARN) + if matches != nil { + return matches[ec2InstanceRegexp.SubexpIndex("host")] } + return "" +} - if err := parsing.SkipBrace(dec); err != nil { - return +func skipBrace(dec *json.Decoder) error { + if t, err := dec.Token(); err != nil || t != json.Delim('{') { + return fmt.Errorf("expected '{': %w", err) } + return nil +} - if err := parsing.SkipToKey(dec, "arn"); err != nil { - return +func skipBracket(dec *json.Decoder) error { + if t, err := dec.Token(); err != nil || t != json.Delim('[') { + return fmt.Errorf("expected '[': %w", err) } + return nil +} + +func skipToKey(dec *json.Decoder, key string) error { + for dec.More() { + k, err := dec.Token() + if err != nil { + return err + } + + if k != key { + if err := skip(dec); err != nil { + return err + } + continue + } - var arn string - if err := dec.Decode(&arn); err != nil { - return + return nil } - matches := ec2InstanceRegexp.FindStringSubmatch(arn) - if matches != nil { - host = matches[ec2InstanceRegexp.SubexpIndex("host")] - slog.Debug("ec2 host found in userIdentity.arn") + return fmt.Errorf("key not found %q", key) +} + +func skipToRecords(dec *json.Decoder) error { + if err := skipBrace(dec); err != nil { + return err + } + + if err := skipToKey(dec, "Records"); err != nil { + return err + } + + if err := skipBracket(dec); err != nil { + return err + } + return nil +} + +func skip(dec *json.Decoder) error { + var skip json.RawMessage + if err := dec.Decode(&skip); err != nil { + return fmt.Errorf("skip: %w", err) } - return + return nil } diff --git a/aws/logs_monitoring_go/internal/handling/eventbridge.go b/aws/logs_monitoring_go/internal/handling/eventbridge.go index 1ae54ce18..64f5c410a 100644 --- a/aws/logs_monitoring_go/internal/handling/eventbridge.go +++ b/aws/logs_monitoring_go/internal/handling/eventbridge.go @@ -6,17 +6,16 @@ package handling import ( - "bytes" "cmp" "context" "encoding/json" + "errors" "fmt" "strings" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/concurrent" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/filtering" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" - "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/parsing" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/scrubbing" ) @@ -42,7 +41,7 @@ func (h *eventBridgeHandler) Handle(ctx context.Context, event json.RawMessage, source, err := eventBridgeSource(event) if err != nil { - return fmt.Errorf("source: %w", err) + return err } switch source { @@ -97,21 +96,19 @@ func (h *eventBridgeHandler) newEntry(source string, lambdaOrigin model.LambdaOr } func eventBridgeSource(event json.RawMessage) (string, error) { - dec := json.NewDecoder(bytes.NewReader(event)) - if err := parsing.SkipBrace(dec); err != nil { - return "", err + var s struct { + Source string `json:"source"` } - if err := parsing.SkipToKey(dec, "source"); err != nil { - return "", err + if err := json.Unmarshal(event, &s); err != nil { + return "", fmt.Errorf("unmarshal: %w", err) } - var rawSource string - if err := dec.Decode(&rawSource); err != nil { - return "", fmt.Errorf("decode: %w", err) + if s.Source == "" { + return "", errors.New("eventbridge source not found") } - _, source, found := strings.Cut(rawSource, ".") + _, source, found := strings.Cut(s.Source, ".") if found { return source, nil } diff --git a/aws/logs_monitoring_go/internal/parsing/bench_test.go b/aws/logs_monitoring_go/internal/parsing/bench_test.go index 8253c29aa..c48644bc7 100644 --- a/aws/logs_monitoring_go/internal/parsing/bench_test.go +++ b/aws/logs_monitoring_go/internal/parsing/bench_test.go @@ -41,86 +41,44 @@ var ( benchEventBridge = json.RawMessage(`{"version":"0","detail-type":"Object Created","source":"aws.s3","detail":{"bucket":{"name":"my-bucket"},"object":{"key":"my-key"}}}`) ) -func BenchmarkParse_Decoder_CloudWatch(b *testing.B) { +func BenchmarkParse_CloudWatch(b *testing.B) { b.ReportAllocs() for b.Loop() { _, _ = Parse(benchCloudWatch) } } -func BenchmarkParse_Unmarshal_CloudWatch(b *testing.B) { - b.ReportAllocs() - for b.Loop() { - _, _ = ParseUnmarshal(benchCloudWatch) - } -} - -func BenchmarkParse_Decoder_S3(b *testing.B) { +func BenchmarkParse_S3(b *testing.B) { b.ReportAllocs() for b.Loop() { _, _ = Parse(benchS3) } } -func BenchmarkParse_Unmarshal_S3(b *testing.B) { - b.ReportAllocs() - for b.Loop() { - _, _ = ParseUnmarshal(benchS3) - } -} - -func BenchmarkParse_Decoder_SNS_S3(b *testing.B) { +func BenchmarkParse_SNS_S3(b *testing.B) { b.ReportAllocs() for b.Loop() { _, _ = Parse(benchSNSS3) } } -func BenchmarkParse_Unmarshal_SNS_S3(b *testing.B) { - b.ReportAllocs() - for b.Loop() { - _, _ = ParseUnmarshal(benchSNSS3) - } -} - -func BenchmarkParse_Decoder_SQS_S3(b *testing.B) { +func BenchmarkParse_SQS_S3(b *testing.B) { b.ReportAllocs() for b.Loop() { _, _ = Parse(benchSQSS3) } } -func BenchmarkParse_Unmarshal_SQS_S3(b *testing.B) { - b.ReportAllocs() - for b.Loop() { - _, _ = ParseUnmarshal(benchSQSS3) - } -} - -func BenchmarkParse_Decoder_SQS_SNS_S3(b *testing.B) { +func BenchmarkParse_SQS_SNS_S3(b *testing.B) { b.ReportAllocs() for b.Loop() { _, _ = Parse(benchSQSSNSS3) } } -func BenchmarkParse_Unmarshal_SQS_SNS_S3(b *testing.B) { - b.ReportAllocs() - for b.Loop() { - _, _ = ParseUnmarshal(benchSQSSNSS3) - } -} - -func BenchmarkParse_Decoder_EventBridge(b *testing.B) { +func BenchmarkParse_EventBridge(b *testing.B) { b.ReportAllocs() for b.Loop() { _, _ = Parse(benchEventBridge) } } - -func BenchmarkParse_Unmarshal_EventBridge(b *testing.B) { - b.ReportAllocs() - for b.Loop() { - _, _ = ParseUnmarshal(benchEventBridge) - } -} diff --git a/aws/logs_monitoring_go/internal/parsing/error.go b/aws/logs_monitoring_go/internal/parsing/error.go deleted file mode 100644 index ac317c873..000000000 --- a/aws/logs_monitoring_go/internal/parsing/error.go +++ /dev/null @@ -1,26 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2026-Present Datadog, Inc. - -package parsing - -import ( - "errors" - "fmt" -) - -var errUnknownEvent = errors.New("unknown event") - -type KeyNotFoundError struct { - Key string -} - -func (e *KeyNotFoundError) Error() string { - return fmt.Sprintf("%s key not found", e.Key) -} - -func (e *KeyNotFoundError) Is(target error) bool { - _, ok := target.(*KeyNotFoundError) - return ok -} diff --git a/aws/logs_monitoring_go/internal/parsing/eventbridge.go b/aws/logs_monitoring_go/internal/parsing/eventbridge.go index 78ab79116..823fd8e23 100644 --- a/aws/logs_monitoring_go/internal/parsing/eventbridge.go +++ b/aws/logs_monitoring_go/internal/parsing/eventbridge.go @@ -6,7 +6,6 @@ package parsing import ( - "bytes" "encoding/json" "fmt" "strings" @@ -14,121 +13,49 @@ import ( "github.com/aws/aws-lambda-go/events" ) -const ( - eventBridgeSourceS3 = "aws.s3" - eventBridgeDetailTypeS3 = "Object Created" -) +type s3EventBridgeDetail struct { + Bucket struct { + Name string `json:"name"` + } `json:"bucket"` + Object struct { + Key string `json:"key"` + } `json:"object"` +} -func parseEventBridge(event json.RawMessage) ([]Event, error) { - source, detailType, detail, err := decodeEventBridgeEnvelope(event) - if err != nil { - return nil, fmt.Errorf("decode eventbridge: %w", err) +func eventBridge(event json.RawMessage) ([]Event, error) { + var eventBridgeEvent events.EventBridgeEvent + if err := json.Unmarshal(event, &eventBridgeEvent); err != nil { + return nil, fmt.Errorf("unmarshal: %w", err) } - if source == eventBridgeSourceS3 && strings.Contains(detailType, eventBridgeDetailTypeS3) { - s3Event, err := buildS3EventFromEventBridge(detail) - if err != nil { - return nil, fmt.Errorf("build s3 event from eventbridge: %w", err) + if eventBridgeEvent.Source == "aws.s3" && strings.Contains(eventBridgeEvent.DetailType, "Object Created") { + var s3eb s3EventBridgeDetail + if err := json.Unmarshal(eventBridgeEvent.Detail, &s3eb); err != nil { + return nil, fmt.Errorf("unmarshal: %w", err) } - return []Event{{ContentType: ContentTypeS3, Payload: s3Event}}, nil - } - - return []Event{{ContentType: ContentTypeEventBridge, Payload: event}}, nil -} - -func decodeEventBridgeEnvelope(event json.RawMessage) (source, detailType string, detail json.RawMessage, err error) { - dec := json.NewDecoder(bytes.NewReader(event)) - if err := SkipBrace(dec); err != nil { - return "", "", nil, err - } - for dec.More() { - key, err := dec.Token() + payload, err := mapS3EventBridge(s3eb) if err != nil { - return "", "", nil, fmt.Errorf("read key: %w", err) + return nil, err } - switch key { - case "source": - if err := dec.Decode(&source); err != nil { - return "", "", nil, fmt.Errorf("source: %w", err) - } - case "detail-type": - if err := dec.Decode(&detailType); err != nil { - return "", "", nil, fmt.Errorf("detail-type: %w", err) - } - case "detail": - if err := dec.Decode(&detail); err != nil { - return "", "", nil, fmt.Errorf("detail: %w", err) - } - default: - if err := Skip(dec); err != nil { - return "", "", nil, err - } - } + return []Event{{ContentType: ContentTypeS3, Payload: payload}}, nil } - - return source, detailType, detail, nil + return []Event{{ContentType: ContentTypeEventBridge, Payload: event}}, nil } -func buildS3EventFromEventBridge(detail json.RawMessage) (json.RawMessage, error) { - bucketName, objectKey, err := decodeEventBridgeS3Detail(detail) - if err != nil { - return nil, fmt.Errorf("decode: %w", err) - } - - s3Event := events.S3Event{ - Records: []events.S3EventRecord{{ - EventSource: eventSourceS3, - S3: events.S3Entity{ - Bucket: events.S3Bucket{Name: bucketName}, - Object: events.S3Object{Key: objectKey, URLDecodedKey: objectKey}, - }, - }}, +func mapS3EventBridge(eb s3EventBridgeDetail) (json.RawMessage, error) { + s3EventRecord := events.S3EventRecord{ + EventSource: eventSourceS3, + S3: events.S3Entity{ + Bucket: events.S3Bucket{Name: eb.Bucket.Name}, + Object: events.S3Object{Key: eb.Object.Key, URLDecodedKey: eb.Object.Key}, + }, } - payload, err := json.Marshal(s3Event) + payload, err := json.Marshal(s3EventRecord) if err != nil { return nil, fmt.Errorf("marshal: %w", err) } - return payload, nil -} - -func decodeEventBridgeS3Detail(detail json.RawMessage) (bucket, key string, err error) { - dec := json.NewDecoder(bytes.NewReader(detail)) - - if err := SkipBrace(dec); err != nil { - return "", "", err - } - for dec.More() { - k, err := dec.Token() - if err != nil { - return "", "", fmt.Errorf("read key: %w", err) - } - - switch k { - case "bucket": - var b struct { - Name string `json:"name"` - } - if err := dec.Decode(&b); err != nil { - return "", "", fmt.Errorf("bucket: %w", err) - } - bucket = b.Name - case "object": - var o struct { - Key string `json:"key"` - } - if err := dec.Decode(&o); err != nil { - return "", "", fmt.Errorf("object: %w", err) - } - key = o.Key - default: - if err := Skip(dec); err != nil { - return "", "", err - } - } - } - - return bucket, key, nil + return payload, nil } diff --git a/aws/logs_monitoring_go/internal/parsing/eventbridge_unmarshal.go b/aws/logs_monitoring_go/internal/parsing/eventbridge_unmarshal.go deleted file mode 100644 index e6012ae31..000000000 --- a/aws/logs_monitoring_go/internal/parsing/eventbridge_unmarshal.go +++ /dev/null @@ -1,61 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2026-Present Datadog, Inc. - -package parsing - -import ( - "encoding/json" - "fmt" - "strings" - - "github.com/aws/aws-lambda-go/events" -) - -type s3EventBridgeDetail struct { - Bucket struct { - Name string `json:"name"` - } `json:"bucket"` - Object struct { - Key string `json:"key"` - } `json:"object"` -} - -func eventBridgeUnmarshal(event json.RawMessage) ([]Event, error) { - var eventBridgeEvent events.EventBridgeEvent - if err := json.Unmarshal(event, &eventBridgeEvent); err != nil { - return nil, err - } - - if eventBridgeEvent.Source == eventSourceS3 && strings.Contains(eventBridgeEvent.DetailType, "Object Created") { - var s3eb s3EventBridgeDetail - if err := json.Unmarshal(eventBridgeEvent.Detail, &s3eb); err != nil { - return nil, err - } - - payload, err := mapS3EventBridge(s3eb) - if err != nil { - return nil, err - } - - return []Event{{ContentType: ContentTypeS3, Payload: payload}}, nil - } - return []Event{{ContentType: ContentTypeEventBridge, Payload: event}}, nil -} - -func mapS3EventBridge(eb s3EventBridgeDetail) (json.RawMessage, error) { - s3EventRecord := events.S3EventRecord{ - EventSource: eventSourceS3, - S3: events.S3Entity{ - Bucket: events.S3Bucket{Name: eb.Bucket.Name}, - Object: events.S3Object{Key: eb.Object.Key, URLDecodedKey: eb.Object.Key}, - }, - } - payload, err := json.Marshal(s3EventRecord) - if err != nil { - return nil, fmt.Errorf("marshal: %w", err) - } - - return payload, nil -} diff --git a/aws/logs_monitoring_go/internal/parsing/parse.go b/aws/logs_monitoring_go/internal/parsing/parse.go index bc565169d..537771cd6 100644 --- a/aws/logs_monitoring_go/internal/parsing/parse.go +++ b/aws/logs_monitoring_go/internal/parsing/parse.go @@ -6,193 +6,81 @@ package parsing import ( - "bytes" "encoding/json" "errors" "fmt" - "strings" ) const ( - cloudwatchLogsKey = "awslogs" - detailKey = "detail" - eventSourceKey = "eventSource" - recordsKey = "Records" - retryKey = "retry" - eventSourceS3 = "aws:s3" eventSourceKinesis = "aws:kinesis" eventSourceSQS = "aws:sqs" eventSourceSNS = "aws:sns" ) -func Parse(event json.RawMessage) ([]Event, error) { - dec := json.NewDecoder(bytes.NewReader(event)) - if err := SkipBrace(dec); err != nil { - return nil, err - } - - for dec.More() { - key, err := dec.Token() - if err != nil { - return nil, err - } - - switch key { - case cloudwatchLogsKey: - return []Event{{ContentType: ContentTypeCloudwatchLogs, Payload: event}}, nil - case retryKey: - return []Event{{ContentType: ContentTypeRetry}}, nil - case recordsKey: - parsed, err := parseRecords(event, dec) - if err != nil { - return nil, fmt.Errorf("records: %w", err) - } - return parsed, nil - case detailKey: - parsed, err := parseEventBridge(event) - if err != nil { - return nil, fmt.Errorf("eventbridge: %w", err) - } - return parsed, nil - default: - if err := Skip(dec); err != nil { - return nil, err - } - } - } - - return nil, errors.New("unsupported event") +type eventDiscriminator struct { + AWSLogs json.RawMessage `json:"awslogs"` // CloudWatch logs + Records []struct { + EventSource string `json:"eventSource"` + } `json:"Records"` // S3, SQS, SNS + Detail json.RawMessage `json:"detail"` // EventBridge + Retry json.RawMessage `json:"retry"` } -func parseRecords(event json.RawMessage, dec *json.Decoder) ([]Event, error) { - if err := SkipBracket(dec); err != nil { - return nil, err - } - if err := SkipBrace(dec); err != nil { - return nil, err +func Parse(event json.RawMessage) ([]Event, error) { + var disc eventDiscriminator + if err := json.Unmarshal(event, &disc); err != nil { + return nil, fmt.Errorf("unmarshal: %w", err) } - for dec.More() { - key, err := dec.Token() - if err != nil { - return nil, err - } + switch { + case disc.AWSLogs != nil: + return []Event{{ContentType: ContentTypeCloudwatchLogs, Payload: event}}, nil - keyStr, ok := key.(string) - if !ok { - return nil, fmt.Errorf("expected string key, got %T", key) - } + case disc.Retry != nil: + return []Event{{ContentType: ContentTypeRetry}}, nil - // SNS uses "EventSource" so we compare case-insensitively. - if !strings.EqualFold(keyStr, eventSourceKey) { - if err := Skip(dec); err != nil { - return nil, err - } - continue - } - - val, err := dec.Token() + case len(disc.Records) > 0: + parsed, err := records(event, disc) if err != nil { - return nil, err - } - - eventSource, ok := val.(string) - if !ok { - return nil, fmt.Errorf("eventSource value should be a string, got %T", val) + return nil, fmt.Errorf("records: %w", err) } + return parsed, nil - switch eventSource { - case eventSourceS3: - return []Event{{ContentType: ContentTypeS3, Payload: event}}, nil - case eventSourceKinesis: - return []Event{{ContentType: ContentTypeKinesis, Payload: event}}, nil - case eventSourceSQS: - parsed, err := parseSQS(event) - if err != nil { - return nil, fmt.Errorf("sqs: %w", err) - } - return parsed, nil - case eventSourceSNS: - parsed, err := parseSNS(event) - if err != nil { - return nil, fmt.Errorf("sns: %w", err) - } - return parsed, nil - default: - return nil, fmt.Errorf("unsupported event source %q", eventSource) + case disc.Detail != nil: + parsed, err := eventBridge(event) + if err != nil { + return nil, fmt.Errorf("eventbridge: %w", err) } + return parsed, nil } - return nil, errors.New("no eventSource found") + return nil, errors.New("unsupported event") } -func SkipBrace(dec *json.Decoder) error { - if t, err := dec.Token(); err != nil || t != json.Delim('{') { - return fmt.Errorf("expected '{': %w", err) - } - return nil -} +func records(event json.RawMessage, disc eventDiscriminator) ([]Event, error) { + switch disc.Records[0].EventSource { + case eventSourceS3: + return []Event{{ContentType: ContentTypeS3, Payload: event}}, nil -func SkipBracket(dec *json.Decoder) error { - if t, err := dec.Token(); err != nil || t != json.Delim('[') { - return fmt.Errorf("expected '[': %w", err) - } - return nil -} + case eventSourceKinesis: + return []Event{{ContentType: ContentTypeKinesis, Payload: event}}, nil -func SkipToKey(dec *json.Decoder, key string) error { - for dec.More() { - k, err := dec.Token() + case eventSourceSQS: + parsed, err := sqs(event) if err != nil { - return err + return nil, fmt.Errorf("sqs: %w", err) } + return parsed, nil - if k != key { - if err := Skip(dec); err != nil { - return err - } - continue + case eventSourceSNS: + parsed, err := sns(event) + if err != nil { + return nil, fmt.Errorf("sns: %w", err) } + return parsed, nil - return nil - } - - return &KeyNotFoundError{key} -} - -func SkipToRecords(dec *json.Decoder) error { - if err := SkipBrace(dec); err != nil { - return err - } - - if err := SkipToKey(dec, recordsKey); err != nil { - return err - } - - if err := SkipBracket(dec); err != nil { - return err - } - return nil -} - -func Skip(dec *json.Decoder) error { - var skip json.RawMessage - if err := dec.Decode(&skip); err != nil { - return fmt.Errorf("skip: %w", err) - } - return nil -} - -func skipToEnd(dec *json.Decoder) error { - for dec.More() { - if _, err := dec.Token(); err != nil { - return err - } - if err := Skip(dec); err != nil { - return err - } + default: + return nil, fmt.Errorf("unsupported event source %q", disc.Records[0].EventSource) } - _, err := dec.Token() - return err } diff --git a/aws/logs_monitoring_go/internal/parsing/parse2.go b/aws/logs_monitoring_go/internal/parsing/parse2.go deleted file mode 100644 index 960f96ab7..000000000 --- a/aws/logs_monitoring_go/internal/parsing/parse2.go +++ /dev/null @@ -1,80 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2026-Present Datadog, Inc. - -package parsing - -import ( - "encoding/json" - "errors" - "fmt" -) - -type eventDiscriminator struct { - AWSLogs json.RawMessage `json:"awslogs"` // CloudWatch logs - Records []struct { - EventSource string `json:"eventSource"` - } `json:"Records"` // S3, SQS, SNS - Detail json.RawMessage `json:"detail"` // EventBridge - Retry json.RawMessage `json:"retry"` -} - - -func ParseUnmarshal(event json.RawMessage) ([]Event, error) { - var disc eventDiscriminator - if err := json.Unmarshal(event, &disc); err != nil { - return nil, err - } - - switch { - case disc.AWSLogs != nil: - return []Event{{ContentType: ContentTypeCloudwatchLogs, Payload: event}}, nil - - case disc.Retry != nil: - return []Event{{ContentType: ContentTypeRetry}}, nil - - case len(disc.Records) > 0: - parsed, err := recordsUnmarshal(event, disc) - if err != nil { - return nil, fmt.Errorf("records: %w", err) - } - return parsed, nil - - case disc.Detail != nil: - parsed, err := eventBridgeUnmarshal(event) - if err != nil { - return nil, fmt.Errorf("eventbridge: %w", err) - } - return parsed, nil - } - - return nil, errors.New("unsupported event") -} - -func recordsUnmarshal(event json.RawMessage, disc eventDiscriminator) ([]Event, error) { - switch disc.Records[0].EventSource { - case eventSourceS3: - return []Event{{ContentType: ContentTypeS3, Payload: event}}, nil - - case eventSourceKinesis: - return []Event{{ContentType: ContentTypeKinesis, Payload: event}}, nil - - case eventSourceSQS: - parsed, err := sqsUnmarshal(event) - if err != nil { - return nil, fmt.Errorf("sqs: %w", err) - } - return parsed, nil - - case eventSourceSNS: - parsed, err := snsUnmarshal(event) - if err != nil { - return nil, fmt.Errorf("sns: %w", err) - } - return parsed, nil - - default: - return nil, fmt.Errorf("unsupported event source %q", disc.Records[0].EventSource) - } -} diff --git a/aws/logs_monitoring_go/internal/parsing/parse_test.go b/aws/logs_monitoring_go/internal/parsing/parse_test.go index 3b7efb633..2566d5604 100644 --- a/aws/logs_monitoring_go/internal/parsing/parse_test.go +++ b/aws/logs_monitoring_go/internal/parsing/parse_test.go @@ -50,7 +50,7 @@ func TestParse(t *testing.T) { want: []ContentType{ContentTypeEventBridge}, }, "sns with s3": { - event: json.RawMessage(`{"Records":[{"EventSource":"aws:sns","Sns":{"Type":"Notification","Message":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b\"},\"object\":{\"key\":\"k\"}}}]}"}}]}`), + event: json.RawMessage(`{"Records":[{"EventSource":"aws:sns","Sns":{"Type":"Notification","Message":"{\"Records\":[{\"eventSource\":\"aws:s3\",\"s3\":{\"bucket\":{\"name\":\"b\"},\"object\":{\"key\":\"k\"}}}]}"}}]}`), want: []ContentType{ContentTypeS3}, }, "sns standalone": { @@ -58,17 +58,17 @@ func TestParse(t *testing.T) { want: []ContentType{ContentTypeSNS}, }, "sqs with direct s3": { - event: json.RawMessage(`{"Records":[{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b\"},\"object\":{\"key\":\"k\"}}}]}"}]}`), + event: json.RawMessage(`{"Records":[{"eventSource":"aws:sqs","body":"{\"Records\":[{\"eventSource\":\"aws:s3\",\"s3\":{\"bucket\":{\"name\":\"b\"},\"object\":{\"key\":\"k\"}}}]}"}]}`), want: []ContentType{ContentTypeS3}, }, "sqs with sns s3": { - event: json.RawMessage(`{"Records":[{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b\\\"},\\\"object\\\":{\\\"key\\\":\\\"k\\\"}}}]}\"}"}]}`), + event: json.RawMessage(`{"Records":[{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"eventSource\\\":\\\"aws:s3\\\",\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b\\\"},\\\"object\\\":{\\\"key\\\":\\\"k\\\"}}}]}\"}"}]}`), want: []ContentType{ContentTypeS3}, }, "sqs with multiple records": { event: json.RawMessage(`{"Records":[` + - `{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b1\"},\"object\":{\"key\":\"k1\"}}}]}"},` + - `{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b2\"},\"object\":{\"key\":\"k2\"}}}]}"}` + + `{"eventSource":"aws:sqs","body":"{\"Records\":[{\"eventSource\":\"aws:s3\",\"s3\":{\"bucket\":{\"name\":\"b1\"},\"object\":{\"key\":\"k1\"}}}]}"},` + + `{"eventSource":"aws:sqs","body":"{\"Records\":[{\"eventSource\":\"aws:s3\",\"s3\":{\"bucket\":{\"name\":\"b2\"},\"object\":{\"key\":\"k2\"}}}]}"}` + `]}`), want: []ContentType{ContentTypeS3, ContentTypeS3}, }, @@ -83,12 +83,12 @@ func TestParse(t *testing.T) { "sqs mixed valid and unrecognized": { event: json.RawMessage(`{"Records":[` + `{"eventSource":"aws:sqs","body":"{\"foo\":\"bar\"}"},` + - `{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b\"},\"object\":{\"key\":\"k\"}}}]}"}` + + `{"eventSource":"aws:sqs","body":"{\"Records\":[{\"eventSource\":\"aws:s3\",\"s3\":{\"bucket\":{\"name\":\"b\"},\"object\":{\"key\":\"k\"}}}]}"}` + `]}`), - want: []ContentType{ContentTypeS3}, + wantErr: true, }, "sqs with extra fields after body": { - event: json.RawMessage(`{"Records":[{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b\"},\"object\":{\"key\":\"k\"}}}]}","messageId":"abc","receiptHandle":"xyz","attributes":{"ApproximateReceiveCount":"1"}}]}`), + event: json.RawMessage(`{"Records":[{"eventSource":"aws:sqs","body":"{\"Records\":[{\"eventSource\":\"aws:s3\",\"s3\":{\"bucket\":{\"name\":\"b\"},\"object\":{\"key\":\"k\"}}}]}","messageId":"abc","receiptHandle":"xyz","attributes":{"ApproximateReceiveCount":"1"}}]}`), want: []ContentType{ContentTypeS3}, }, "sqs with malformed body json": { diff --git a/aws/logs_monitoring_go/internal/parsing/sns.go b/aws/logs_monitoring_go/internal/parsing/sns.go index 91b622fb8..19331edb6 100644 --- a/aws/logs_monitoring_go/internal/parsing/sns.go +++ b/aws/logs_monitoring_go/internal/parsing/sns.go @@ -6,66 +6,30 @@ package parsing import ( - "bytes" "encoding/json" "fmt" -) -func parseSNS(event json.RawMessage) ([]Event, error) { - dec := json.NewDecoder(bytes.NewReader(event)) + "github.com/aws/aws-lambda-go/events" +) - if err := SkipToRecords(dec); err != nil { - return nil, fmt.Errorf("skip to records: %w", err) +func sns(event json.RawMessage) ([]Event, error) { + var snsEvent events.SNSEvent + if err := json.Unmarshal(event, &snsEvent); err != nil { + return nil, fmt.Errorf("unmarshal: %w", err) } - var parsed []Event - for dec.More() { - var record json.RawMessage - if err := dec.Decode(&record); err != nil { - return nil, fmt.Errorf("decode: %w", err) - } + var events []Event + for _, record := range snsEvent.Records { + inner := json.RawMessage(record.SNS.Message) - recDec := json.NewDecoder(bytes.NewReader(record)) - if err := SkipBrace(recDec); err != nil { - return nil, err - } - if err := SkipToKey(recDec, "Sns"); err != nil { - return nil, fmt.Errorf("skip to key: %w", err) - } - if err := SkipBrace(recDec); err != nil { - return nil, err - } - if err := SkipToKey(recDec, "Message"); err != nil { - return nil, fmt.Errorf("skip to key: %w", err) - } - - var message string - if err := recDec.Decode(&message); err != nil { - return nil, fmt.Errorf("decode: %w", err) - } - - inner := json.RawMessage(message) - if isS3(inner) { - parsed = append(parsed, Event{ContentType: ContentTypeS3, Payload: inner}) + var disc eventDiscriminator + if err := json.Unmarshal(inner, &disc); err == nil && len(disc.Records) > 0 && disc.Records[0].EventSource == eventSourceS3 { + events = append(events, Event{ContentType: ContentTypeS3, Payload: inner}) continue } - parsed = append(parsed, Event{ContentType: ContentTypeSNS, Payload: record}) - } - - return parsed, nil -} - -func isS3(message json.RawMessage) bool { - dec := json.NewDecoder(bytes.NewReader(message)) - - if err := SkipToRecords(dec); err != nil { - return false - } - - if err := SkipBrace(dec); err != nil { - return false + events = append(events, Event{ContentType: ContentTypeSNS, Payload: event}) } - return SkipToKey(dec, "s3") == nil + return events, nil } diff --git a/aws/logs_monitoring_go/internal/parsing/sns_marshal.go b/aws/logs_monitoring_go/internal/parsing/sns_marshal.go deleted file mode 100644 index be977b651..000000000 --- a/aws/logs_monitoring_go/internal/parsing/sns_marshal.go +++ /dev/null @@ -1,34 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2026-Present Datadog, Inc. - -package parsing - -import ( - "encoding/json" - - "github.com/aws/aws-lambda-go/events" -) - -func snsUnmarshal(event json.RawMessage) ([]Event, error) { - var snsEvent events.SNSEvent - if err := json.Unmarshal(event, &snsEvent); err != nil { - return nil, err - } - - var parsed []Event - for _, record := range snsEvent.Records { - inner := json.RawMessage(record.SNS.Message) - - var disc eventDiscriminator - if err := json.Unmarshal(inner, &disc); err == nil && len(disc.Records) > 0 && disc.Records[0].EventSource == eventSourceS3 { - parsed = append(parsed, Event{ContentType: ContentTypeS3, Payload: inner}) - continue - } - - parsed = append(parsed, Event{ContentType: ContentTypeSNS, Payload: event}) - } - - return parsed, nil -} diff --git a/aws/logs_monitoring_go/internal/parsing/sqs.go b/aws/logs_monitoring_go/internal/parsing/sqs.go index 928558ab3..99955774a 100644 --- a/aws/logs_monitoring_go/internal/parsing/sqs.go +++ b/aws/logs_monitoring_go/internal/parsing/sqs.go @@ -6,108 +6,59 @@ package parsing import ( - "bytes" "encoding/json" "errors" "fmt" - "log/slog" -) - -func parseSQS(event json.RawMessage) ([]Event, error) { - dec := json.NewDecoder(bytes.NewReader(event)) - if err := SkipToRecords(dec); err != nil { - return nil, fmt.Errorf("skip to records: %w", err) - } - - var parsed []Event - for i := 0; dec.More(); i++ { - body, err := extractBody(dec) - if err != nil { - return nil, fmt.Errorf("extract body: %w", err) - } - pe, err := parseSQSBody(body) - if errors.Is(err, errUnknownEvent) { - slog.Warn("unknown event from SQS record, skipping", slog.Int("index", i)) - continue - } - if err != nil { - return nil, fmt.Errorf("parse body: %w", err) - } - - parsed = append(parsed, pe) - } + "github.com/aws/aws-lambda-go/events" +) - if len(parsed) == 0 { - return nil, errors.New("no recognizable events in SQS batch") - } - return parsed, nil +type sqsBodyDiscriminator struct { + Type string `json:"Type"` // SNS inside SQS when raw message delivery disabled. See https://docs.aws.amazon.com/sns/latest/dg/sns-large-payload-raw-message-delivery.html + Message string `json:"Message"` // SNS inside SQS when raw message delivery disabled. See https://docs.aws.amazon.com/sns/latest/dg/sns-large-payload-raw-message-delivery.html + eventDiscriminator } -func extractBody(dec *json.Decoder) (string, error) { - if err := SkipBrace(dec); err != nil { - return "", err - } - - if err := SkipToKey(dec, "body"); err != nil { - return "", fmt.Errorf("skip to key: %w", err) +func sqs(event json.RawMessage) ([]Event, error) { + var sqsEvent events.SQSEvent + if err := json.Unmarshal(event, &sqsEvent); err != nil { + return nil, fmt.Errorf("unmarshal: %w", err) } - var body string - if err := dec.Decode(&body); err != nil { - return "", fmt.Errorf("decode: %w", err) - } + var events []Event + for _, msg := range sqsEvent.Records { + e, err := sqsBody(msg.Body) + if err != nil { + return nil, err + } - if err := skipToEnd(dec); err != nil { - return "", fmt.Errorf("skip to end: %w", err) + e.SQSReceiptHandle = msg.ReceiptHandle + events = append(events, e) } - return body, nil + return events, nil } -func parseSQSBody(body string) (Event, error) { - inner := json.RawMessage(body) - dec := json.NewDecoder(bytes.NewReader(inner)) +func sqsBody(body string) (Event, error) { + raw := json.RawMessage(body) - if err := SkipBrace(dec); err != nil { - return Event{}, err + var disc sqsBodyDiscriminator + if err := json.Unmarshal(raw, &disc); err != nil { + return Event{}, fmt.Errorf("unmarshal: %w", err) } - var typ, message string - for dec.More() { - key, err := dec.Token() - if err != nil { - return Event{}, err - } + switch { + case len(disc.Records) > 0 && disc.Records[0].EventSource == eventSourceS3: + return Event{ContentType: ContentTypeS3, Payload: raw}, nil - switch key { - case "Type": - if err := dec.Decode(&typ); err != nil { - return Event{}, fmt.Errorf("decode: %w", err) - } - case "Message": - if err := dec.Decode(&message); err != nil { - return Event{}, fmt.Errorf("decode: %w", err) - } - case recordsKey: - if isS3(inner) { - return Event{ContentType: ContentTypeS3, Payload: inner}, nil - } - return Event{}, errUnknownEvent - default: - if err := Skip(dec); err != nil { - return Event{}, err - } + case disc.Type == "Notification" && disc.Message != "": + var innerDisc eventDiscriminator + if err := json.Unmarshal([]byte(disc.Message), &innerDisc); err == nil && len(innerDisc.Records) > 0 && innerDisc.Records[0].EventSource == eventSourceS3 { + return Event{ContentType: ContentTypeS3, Payload: json.RawMessage(disc.Message)}, nil } - } - if typ == "Notification" && message != "" { - msg := json.RawMessage(message) - if isS3(msg) { - return Event{ContentType: ContentTypeS3, Payload: msg}, nil - } - return Event{ContentType: ContentTypeSNS, Payload: inner}, nil + return Event{ContentType: ContentTypeSNS, Payload: raw}, nil } - return Event{}, errUnknownEvent + return Event{}, errors.New("unknown event") } diff --git a/aws/logs_monitoring_go/internal/parsing/sqs_marshal.go b/aws/logs_monitoring_go/internal/parsing/sqs_marshal.go deleted file mode 100644 index b493eecd1..000000000 --- a/aws/logs_monitoring_go/internal/parsing/sqs_marshal.go +++ /dev/null @@ -1,62 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2026-Present Datadog, Inc. - -package parsing - -import ( - "encoding/json" - - "github.com/aws/aws-lambda-go/events" -) - -type sqsBodyDiscriminator struct { - Type string `json:"Type"` // SNS inside SQS when raw message delivery disabled. See https://docs.aws.amazon.com/sns/latest/dg/sns-large-payload-raw-message-delivery.html - Message string `json:"Message"` // SNS inside SQS when raw message delivery disabled. See https://docs.aws.amazon.com/sns/latest/dg/sns-large-payload-raw-message-delivery.html - eventDiscriminator -} - -func sqsUnmarshal(event json.RawMessage) ([]Event, error) { - var sqsEvent events.SQSEvent - if err := json.Unmarshal(event, &sqsEvent); err != nil { - return nil, err - } - - var parsed []Event - for _, msg := range sqsEvent.Records { - pe, err := sqsBody(msg.Body) - if err != nil { - return nil, err - } - - pe.SQSReceiptHandle = msg.ReceiptHandle - parsed = append(parsed, pe) - } - - return parsed, nil -} - -func sqsBody(body string) (Event, error) { - raw := json.RawMessage(body) - - var disc sqsBodyDiscriminator - if err := json.Unmarshal(raw, &disc); err != nil { - return Event{}, err - } - - switch { - case len(disc.Records) > 0 && disc.Records[0].EventSource == eventSourceS3: - return Event{ContentType: ContentTypeS3, Payload: raw}, nil - - case disc.Type == "Notification" && disc.Message != "": - var innerDisc eventDiscriminator - if err := json.Unmarshal([]byte(disc.Message), &innerDisc); err == nil && len(innerDisc.Records) > 0 && innerDisc.Records[0].EventSource == eventSourceS3 { - return Event{ContentType: ContentTypeS3, Payload: json.RawMessage(disc.Message)}, nil - } - - return Event{ContentType: ContentTypeSNS, Payload: raw}, nil - } - - return Event{}, errUnknownEvent -} From 4de8494174a7775604e9912dfa837477d29aa31b Mon Sep 17 00:00:00 2001 From: Nabil Dakkoune Date: Thu, 18 Jun 2026 10:35:32 +0200 Subject: [PATCH 7/8] delete parsing helpers: go straight. --- .../internal/handling/cloudtrail.go | 88 ++++++------------- 1 file changed, 28 insertions(+), 60 deletions(-) diff --git a/aws/logs_monitoring_go/internal/handling/cloudtrail.go b/aws/logs_monitoring_go/internal/handling/cloudtrail.go index 0ba3058ae..2e31cc3d6 100644 --- a/aws/logs_monitoring_go/internal/handling/cloudtrail.go +++ b/aws/logs_monitoring_go/internal/handling/cloudtrail.go @@ -21,15 +21,40 @@ var ( func decodeCloudTrail(r io.Reader) iter.Seq2[string, error] { return func(yield func(string, error) bool) { dec := json.NewDecoder(r) - if err := skipToRecords(dec); err != nil { - yield("", fmt.Errorf("cloudtrail: %w", err)) + t, err := dec.Token() + if err != nil { + yield("", err) + return + } + if t != json.Delim('{') { + yield("", fmt.Errorf("expected \"{\" token, got %q", t)) + return + } + + t, err = dec.Token() + if err != nil { + yield("", err) + return + } + if t != "Records" { + yield("", fmt.Errorf("expected \"Records\" token, got %q", t)) + return + } + + t, err = dec.Token() + if err != nil { + yield("", err) + return + } + if t != json.Delim('[') { + yield("", fmt.Errorf("expected \"[\" token, got %q", t)) return } for dec.More() { var raw json.RawMessage if err := dec.Decode(&raw); err != nil { - yield("", fmt.Errorf("decode cloudtrail record: %w", err)) + yield("", fmt.Errorf("decode: %w", err)) return } if !yield(string(raw), nil) { @@ -55,60 +80,3 @@ func cloudtrailHost(message string) string { } return "" } - -func skipBrace(dec *json.Decoder) error { - if t, err := dec.Token(); err != nil || t != json.Delim('{') { - return fmt.Errorf("expected '{': %w", err) - } - return nil -} - -func skipBracket(dec *json.Decoder) error { - if t, err := dec.Token(); err != nil || t != json.Delim('[') { - return fmt.Errorf("expected '[': %w", err) - } - return nil -} - -func skipToKey(dec *json.Decoder, key string) error { - for dec.More() { - k, err := dec.Token() - if err != nil { - return err - } - - if k != key { - if err := skip(dec); err != nil { - return err - } - continue - } - - return nil - } - - return fmt.Errorf("key not found %q", key) -} - -func skipToRecords(dec *json.Decoder) error { - if err := skipBrace(dec); err != nil { - return err - } - - if err := skipToKey(dec, "Records"); err != nil { - return err - } - - if err := skipBracket(dec); err != nil { - return err - } - return nil -} - -func skip(dec *json.Decoder) error { - var skip json.RawMessage - if err := dec.Decode(&skip); err != nil { - return fmt.Errorf("skip: %w", err) - } - return nil -} From a322dbf01a2b27dd5a6f6d08be113e22147e8f6f Mon Sep 17 00:00:00 2001 From: Nabil Dakkoune Date: Thu, 18 Jun 2026 13:19:04 +0200 Subject: [PATCH 8/8] use backquotes --- aws/logs_monitoring_go/internal/handling/cloudtrail.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/aws/logs_monitoring_go/internal/handling/cloudtrail.go b/aws/logs_monitoring_go/internal/handling/cloudtrail.go index 2e31cc3d6..5bf137b2c 100644 --- a/aws/logs_monitoring_go/internal/handling/cloudtrail.go +++ b/aws/logs_monitoring_go/internal/handling/cloudtrail.go @@ -27,7 +27,7 @@ func decodeCloudTrail(r io.Reader) iter.Seq2[string, error] { return } if t != json.Delim('{') { - yield("", fmt.Errorf("expected \"{\" token, got %q", t)) + yield("", fmt.Errorf(`expected "{" token, got %q`, t)) return } @@ -37,7 +37,7 @@ func decodeCloudTrail(r io.Reader) iter.Seq2[string, error] { return } if t != "Records" { - yield("", fmt.Errorf("expected \"Records\" token, got %q", t)) + yield("", fmt.Errorf(`expected "Records" token, got %q`, t)) return } @@ -47,14 +47,14 @@ func decodeCloudTrail(r io.Reader) iter.Seq2[string, error] { return } if t != json.Delim('[') { - yield("", fmt.Errorf("expected \"[\" token, got %q", t)) + yield("", fmt.Errorf(`expected "[" token, got %q`, t)) return } for dec.More() { var raw json.RawMessage if err := dec.Decode(&raw); err != nil { - yield("", fmt.Errorf("decode: %w", err)) + yield("", fmt.Errorf(`decode: %w`, err)) return } if !yield(string(raw), nil) {