diff --git a/aws/logs_monitoring_go/internal/handling/cloudtrail.go b/aws/logs_monitoring_go/internal/handling/cloudtrail.go index 96f1274b5..5bf137b2c 100644 --- a/aws/logs_monitoring_go/internal/handling/cloudtrail.go +++ b/aws/logs_monitoring_go/internal/handling/cloudtrail.go @@ -10,11 +10,7 @@ import ( "fmt" "io" "iter" - "log/slog" "regexp" - "strings" - - "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/parsing" ) var ( @@ -25,15 +21,40 @@ var ( func decodeCloudTrail(r io.Reader) iter.Seq2[string, error] { return func(yield func(string, error) bool) { dec := json.NewDecoder(r) - if err := parsing.SkipToRecords(dec); err != nil { - yield("", fmt.Errorf("cloudtrail: %w", err)) + t, err := dec.Token() + if err != nil { + yield("", err) + return + } + if t != json.Delim('{') { + yield("", fmt.Errorf(`expected "{" token, got %q`, t)) + return + } + + t, err = dec.Token() + if err != nil { + yield("", err) + return + } + if t != "Records" { + yield("", fmt.Errorf(`expected "Records" token, got %q`, t)) + return + } + + t, err = dec.Token() + if err != nil { + yield("", err) + return + } + if t != json.Delim('[') { + yield("", fmt.Errorf(`expected "[" token, got %q`, t)) return } for dec.More() { var raw json.RawMessage if err := dec.Decode(&raw); err != nil { - yield("", fmt.Errorf("decode cloudtrail record: %w", err)) + yield("", fmt.Errorf(`decode: %w`, err)) return } if !yield(string(raw), nil) { @@ -43,33 +64,19 @@ func decodeCloudTrail(r io.Reader) iter.Seq2[string, error] { } } -func cloudtrailHost(message string) (host string) { - dec := json.NewDecoder(strings.NewReader(message)) - if err := parsing.SkipBrace(dec); err != nil { - return - } - - if err := parsing.SkipToKey(dec, "userIdentity"); err != nil { - return +func cloudtrailHost(message string) string { + var record struct { + UserIdentity struct { + ARN string `json:"arn"` + } `json:"userIdentity"` } - - if err := parsing.SkipBrace(dec); err != nil { - return - } - - if err := parsing.SkipToKey(dec, "arn"); err != nil { - return - } - - var arn string - if err := dec.Decode(&arn); err != nil { - return + if err := json.Unmarshal([]byte(message), &record); err != nil { + return "" } - matches := ec2InstanceRegexp.FindStringSubmatch(arn) + matches := ec2InstanceRegexp.FindStringSubmatch(record.UserIdentity.ARN) if matches != nil { - host = matches[ec2InstanceRegexp.SubexpIndex("host")] - slog.Debug("ec2 host found in userIdentity.arn") + return matches[ec2InstanceRegexp.SubexpIndex("host")] } - return + return "" } diff --git a/aws/logs_monitoring_go/internal/handling/eventbridge.go b/aws/logs_monitoring_go/internal/handling/eventbridge.go index 1ae54ce18..64f5c410a 100644 --- a/aws/logs_monitoring_go/internal/handling/eventbridge.go +++ b/aws/logs_monitoring_go/internal/handling/eventbridge.go @@ -6,17 +6,16 @@ package handling import ( - "bytes" "cmp" "context" "encoding/json" + "errors" "fmt" "strings" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/concurrent" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/filtering" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" - "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/parsing" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/scrubbing" ) @@ -42,7 +41,7 @@ func (h *eventBridgeHandler) Handle(ctx context.Context, event json.RawMessage, source, err := eventBridgeSource(event) if err != nil { - return fmt.Errorf("source: %w", err) + return err } switch source { @@ -97,21 +96,19 @@ func (h *eventBridgeHandler) newEntry(source string, lambdaOrigin model.LambdaOr } func eventBridgeSource(event json.RawMessage) (string, error) { - dec := json.NewDecoder(bytes.NewReader(event)) - if err := parsing.SkipBrace(dec); err != nil { - return "", err + var s struct { + Source string `json:"source"` } - if err := parsing.SkipToKey(dec, "source"); err != nil { - return "", err + if err := json.Unmarshal(event, &s); err != nil { + return "", fmt.Errorf("unmarshal: %w", err) } - var rawSource string - if err := dec.Decode(&rawSource); err != nil { - return "", fmt.Errorf("decode: %w", err) + if s.Source == "" { + return "", errors.New("eventbridge source not found") } - _, source, found := strings.Cut(rawSource, ".") + _, source, found := strings.Cut(s.Source, ".") if found { return source, nil } diff --git a/aws/logs_monitoring_go/internal/parsing/bench_test.go b/aws/logs_monitoring_go/internal/parsing/bench_test.go new file mode 100644 index 000000000..c48644bc7 --- /dev/null +++ b/aws/logs_monitoring_go/internal/parsing/bench_test.go @@ -0,0 +1,84 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package parsing + +import ( + "encoding/json" + "testing" +) + +var ( + benchCloudWatch = json.RawMessage(`{"awslogs":{"data":"dGVzdA=="}}`) + benchS3 = json.RawMessage(`{"Records":[{"eventSource":"aws:s3","s3":{"bucket":{"name":"b"},"object":{"key":"k"}}}]}`) + benchSNSS3 = json.RawMessage(`{"Records":[{"EventSource":"aws:sns","Sns":{"Type":"Notification","Message":"{\"Records\":[{\"eventSource\":\"aws:s3\",\"s3\":{\"bucket\":{\"name\":\"b\"},\"object\":{\"key\":\"k\"}}}]}"}}]}`) + benchSQSS3 = json.RawMessage(`{"Records":[` + + `{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b1\"},\"object\":{\"key\":\"k1\"}}}]}"},` + + `{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b2\"},\"object\":{\"key\":\"k2\"}}}]}"},` + + `{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b3\"},\"object\":{\"key\":\"k3\"}}}]}"},` + + `{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b4\"},\"object\":{\"key\":\"k4\"}}}]}"},` + + `{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b5\"},\"object\":{\"key\":\"k5\"}}}]}"},` + + `{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b6\"},\"object\":{\"key\":\"k6\"}}}]}"},` + + `{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b7\"},\"object\":{\"key\":\"k7\"}}}]}"},` + + `{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b8\"},\"object\":{\"key\":\"k8\"}}}]}"},` + + `{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b9\"},\"object\":{\"key\":\"k9\"}}}]}"},` + + `{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b10\"},\"object\":{\"key\":\"k10\"}}}]}"}` + + `]}`) + benchSQSSNSS3 = json.RawMessage(`{"Records":[` + + `{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"eventSource\\\":\\\"aws:s3\\\",\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b1\\\"},\\\"object\\\":{\\\"key\\\":\\\"k1\\\"}}}]}\"}"},` + + `{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"eventSource\\\":\\\"aws:s3\\\",\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b2\\\"},\\\"object\\\":{\\\"key\\\":\\\"k2\\\"}}}]}\"}"},` + + `{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"eventSource\\\":\\\"aws:s3\\\",\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b3\\\"},\\\"object\\\":{\\\"key\\\":\\\"k3\\\"}}}]}\"}"},` + + `{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"eventSource\\\":\\\"aws:s3\\\",\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b4\\\"},\\\"object\\\":{\\\"key\\\":\\\"k4\\\"}}}]}\"}"},` + + `{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"eventSource\\\":\\\"aws:s3\\\",\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b5\\\"},\\\"object\\\":{\\\"key\\\":\\\"k5\\\"}}}]}\"}"},` + + `{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"eventSource\\\":\\\"aws:s3\\\",\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b6\\\"},\\\"object\\\":{\\\"key\\\":\\\"k6\\\"}}}]}\"}"},` + + `{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"eventSource\\\":\\\"aws:s3\\\",\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b7\\\"},\\\"object\\\":{\\\"key\\\":\\\"k7\\\"}}}]}\"}"},` + + `{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"eventSource\\\":\\\"aws:s3\\\",\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b8\\\"},\\\"object\\\":{\\\"key\\\":\\\"k8\\\"}}}]}\"}"},` + + `{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"eventSource\\\":\\\"aws:s3\\\",\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b9\\\"},\\\"object\\\":{\\\"key\\\":\\\"k9\\\"}}}]}\"}"},` + + `{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"eventSource\\\":\\\"aws:s3\\\",\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b10\\\"},\\\"object\\\":{\\\"key\\\":\\\"k10\\\"}}}]}\"}"}` + + `]}`) + benchEventBridge = json.RawMessage(`{"version":"0","detail-type":"Object Created","source":"aws.s3","detail":{"bucket":{"name":"my-bucket"},"object":{"key":"my-key"}}}`) +) + +func BenchmarkParse_CloudWatch(b *testing.B) { + b.ReportAllocs() + for b.Loop() { + _, _ = Parse(benchCloudWatch) + } +} + +func BenchmarkParse_S3(b *testing.B) { + b.ReportAllocs() + for b.Loop() { + _, _ = Parse(benchS3) + } +} + +func BenchmarkParse_SNS_S3(b *testing.B) { + b.ReportAllocs() + for b.Loop() { + _, _ = Parse(benchSNSS3) + } +} + +func BenchmarkParse_SQS_S3(b *testing.B) { + b.ReportAllocs() + for b.Loop() { + _, _ = Parse(benchSQSS3) + } +} + +func BenchmarkParse_SQS_SNS_S3(b *testing.B) { + b.ReportAllocs() + for b.Loop() { + _, _ = Parse(benchSQSSNSS3) + } +} + +func BenchmarkParse_EventBridge(b *testing.B) { + b.ReportAllocs() + for b.Loop() { + _, _ = Parse(benchEventBridge) + } +} diff --git a/aws/logs_monitoring_go/internal/parsing/content.go b/aws/logs_monitoring_go/internal/parsing/content.go index d52101100..8818e7d62 100644 --- a/aws/logs_monitoring_go/internal/parsing/content.go +++ b/aws/logs_monitoring_go/internal/parsing/content.go @@ -20,7 +20,8 @@ const ( ContentTypeRetry ) -type ParsedEvent struct { - ContentType ContentType - Payload json.RawMessage +type Event struct { + ContentType ContentType + Payload json.RawMessage + SQSReceiptHandle string } diff --git a/aws/logs_monitoring_go/internal/parsing/error.go b/aws/logs_monitoring_go/internal/parsing/error.go deleted file mode 100644 index ac317c873..000000000 --- a/aws/logs_monitoring_go/internal/parsing/error.go +++ /dev/null @@ -1,26 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2026-Present Datadog, Inc. - -package parsing - -import ( - "errors" - "fmt" -) - -var errUnknownEvent = errors.New("unknown event") - -type KeyNotFoundError struct { - Key string -} - -func (e *KeyNotFoundError) Error() string { - return fmt.Sprintf("%s key not found", e.Key) -} - -func (e *KeyNotFoundError) Is(target error) bool { - _, ok := target.(*KeyNotFoundError) - return ok -} diff --git a/aws/logs_monitoring_go/internal/parsing/eventbridge.go b/aws/logs_monitoring_go/internal/parsing/eventbridge.go index becbb2279..823fd8e23 100644 --- a/aws/logs_monitoring_go/internal/parsing/eventbridge.go +++ b/aws/logs_monitoring_go/internal/parsing/eventbridge.go @@ -6,7 +6,6 @@ package parsing import ( - "bytes" "encoding/json" "fmt" "strings" @@ -14,121 +13,49 @@ import ( "github.com/aws/aws-lambda-go/events" ) -const ( - eventBridgeSourceS3 = "aws.s3" - eventBridgeDetailTypeS3 = "Object Created" -) +type s3EventBridgeDetail struct { + Bucket struct { + Name string `json:"name"` + } `json:"bucket"` + Object struct { + Key string `json:"key"` + } `json:"object"` +} -func parseEventBridge(event json.RawMessage) ([]ParsedEvent, error) { - source, detailType, detail, err := decodeEventBridgeEnvelope(event) - if err != nil { - return nil, fmt.Errorf("decode eventbridge: %w", err) +func eventBridge(event json.RawMessage) ([]Event, error) { + var eventBridgeEvent events.EventBridgeEvent + if err := json.Unmarshal(event, &eventBridgeEvent); err != nil { + return nil, fmt.Errorf("unmarshal: %w", err) } - if source == eventBridgeSourceS3 && strings.Contains(detailType, eventBridgeDetailTypeS3) { - s3Event, err := buildS3EventFromEventBridge(detail) - if err != nil { - return nil, fmt.Errorf("build s3 event from eventbridge: %w", err) + if eventBridgeEvent.Source == "aws.s3" && strings.Contains(eventBridgeEvent.DetailType, "Object Created") { + var s3eb s3EventBridgeDetail + if err := json.Unmarshal(eventBridgeEvent.Detail, &s3eb); err != nil { + return nil, fmt.Errorf("unmarshal: %w", err) } - return []ParsedEvent{{ContentType: ContentTypeS3, Payload: s3Event}}, nil - } - - return []ParsedEvent{{ContentType: ContentTypeEventBridge, Payload: event}}, nil -} -func decodeEventBridgeEnvelope(event json.RawMessage) (source, detailType string, detail json.RawMessage, err error) { - dec := json.NewDecoder(bytes.NewReader(event)) - if err := SkipBrace(dec); err != nil { - return "", "", nil, err - } - - for dec.More() { - key, err := dec.Token() + payload, err := mapS3EventBridge(s3eb) if err != nil { - return "", "", nil, fmt.Errorf("read key: %w", err) + return nil, err } - switch key { - case "source": - if err := dec.Decode(&source); err != nil { - return "", "", nil, fmt.Errorf("source: %w", err) - } - case "detail-type": - if err := dec.Decode(&detailType); err != nil { - return "", "", nil, fmt.Errorf("detail-type: %w", err) - } - case "detail": - if err := dec.Decode(&detail); err != nil { - return "", "", nil, fmt.Errorf("detail: %w", err) - } - default: - if err := Skip(dec); err != nil { - return "", "", nil, err - } - } + return []Event{{ContentType: ContentTypeS3, Payload: payload}}, nil } - - return source, detailType, detail, nil + return []Event{{ContentType: ContentTypeEventBridge, Payload: event}}, nil } -func buildS3EventFromEventBridge(detail json.RawMessage) (json.RawMessage, error) { - bucketName, objectKey, err := decodeEventBridgeS3Detail(detail) - if err != nil { - return nil, fmt.Errorf("decode: %w", err) - } - - s3Event := events.S3Event{ - Records: []events.S3EventRecord{{ - EventSource: eventSourceS3, - S3: events.S3Entity{ - Bucket: events.S3Bucket{Name: bucketName}, - Object: events.S3Object{Key: objectKey, URLDecodedKey: objectKey}, - }, - }}, +func mapS3EventBridge(eb s3EventBridgeDetail) (json.RawMessage, error) { + s3EventRecord := events.S3EventRecord{ + EventSource: eventSourceS3, + S3: events.S3Entity{ + Bucket: events.S3Bucket{Name: eb.Bucket.Name}, + Object: events.S3Object{Key: eb.Object.Key, URLDecodedKey: eb.Object.Key}, + }, } - payload, err := json.Marshal(s3Event) + payload, err := json.Marshal(s3EventRecord) if err != nil { return nil, fmt.Errorf("marshal: %w", err) } - return payload, nil -} - -func decodeEventBridgeS3Detail(detail json.RawMessage) (bucket, key string, err error) { - dec := json.NewDecoder(bytes.NewReader(detail)) - if err := SkipBrace(dec); err != nil { - return "", "", err - } - - for dec.More() { - k, err := dec.Token() - if err != nil { - return "", "", fmt.Errorf("read key: %w", err) - } - - switch k { - case "bucket": - var b struct { - Name string `json:"name"` - } - if err := dec.Decode(&b); err != nil { - return "", "", fmt.Errorf("bucket: %w", err) - } - bucket = b.Name - case "object": - var o struct { - Key string `json:"key"` - } - if err := dec.Decode(&o); err != nil { - return "", "", fmt.Errorf("object: %w", err) - } - key = o.Key - default: - if err := Skip(dec); err != nil { - return "", "", err - } - } - } - - return bucket, key, nil + return payload, nil } diff --git a/aws/logs_monitoring_go/internal/parsing/parse.go b/aws/logs_monitoring_go/internal/parsing/parse.go index 9fe9fce11..537771cd6 100644 --- a/aws/logs_monitoring_go/internal/parsing/parse.go +++ b/aws/logs_monitoring_go/internal/parsing/parse.go @@ -6,193 +6,81 @@ package parsing import ( - "bytes" "encoding/json" "errors" "fmt" - "strings" ) const ( - cloudwatchLogsKey = "awslogs" - detailKey = "detail" - eventSourceKey = "eventSource" - recordsKey = "Records" - retryKey = "retry" - eventSourceS3 = "aws:s3" eventSourceKinesis = "aws:kinesis" eventSourceSQS = "aws:sqs" eventSourceSNS = "aws:sns" ) -func Parse(event json.RawMessage) ([]ParsedEvent, error) { - dec := json.NewDecoder(bytes.NewReader(event)) - if err := SkipBrace(dec); err != nil { - return nil, err - } - - for dec.More() { - key, err := dec.Token() - if err != nil { - return nil, err - } - - switch key { - case cloudwatchLogsKey: - return []ParsedEvent{{ContentType: ContentTypeCloudwatchLogs, Payload: event}}, nil - case retryKey: - return []ParsedEvent{{ContentType: ContentTypeRetry}}, nil - case recordsKey: - parsed, err := parseRecords(event, dec) - if err != nil { - return nil, fmt.Errorf("records: %w", err) - } - return parsed, nil - case detailKey: - parsed, err := parseEventBridge(event) - if err != nil { - return nil, fmt.Errorf("eventbridge: %w", err) - } - return parsed, nil - default: - if err := Skip(dec); err != nil { - return nil, err - } - } - } - - return nil, errors.New("unsupported event") +type eventDiscriminator struct { + AWSLogs json.RawMessage `json:"awslogs"` // CloudWatch logs + Records []struct { + EventSource string `json:"eventSource"` + } `json:"Records"` // S3, SQS, SNS + Detail json.RawMessage `json:"detail"` // EventBridge + Retry json.RawMessage `json:"retry"` } -func parseRecords(event json.RawMessage, dec *json.Decoder) ([]ParsedEvent, error) { - if err := SkipBracket(dec); err != nil { - return nil, err - } - if err := SkipBrace(dec); err != nil { - return nil, err +func Parse(event json.RawMessage) ([]Event, error) { + var disc eventDiscriminator + if err := json.Unmarshal(event, &disc); err != nil { + return nil, fmt.Errorf("unmarshal: %w", err) } - for dec.More() { - key, err := dec.Token() - if err != nil { - return nil, err - } + switch { + case disc.AWSLogs != nil: + return []Event{{ContentType: ContentTypeCloudwatchLogs, Payload: event}}, nil - keyStr, ok := key.(string) - if !ok { - return nil, fmt.Errorf("expected string key, got %T", key) - } - - // SNS uses "EventSource" so we compare case-insensitively. - if !strings.EqualFold(keyStr, eventSourceKey) { - if err := Skip(dec); err != nil { - return nil, err - } - continue - } + case disc.Retry != nil: + return []Event{{ContentType: ContentTypeRetry}}, nil - val, err := dec.Token() + case len(disc.Records) > 0: + parsed, err := records(event, disc) if err != nil { - return nil, err + return nil, fmt.Errorf("records: %w", err) } + return parsed, nil - eventSource, ok := val.(string) - if !ok { - return nil, fmt.Errorf("eventSource value should be a string, got %T", val) - } - - switch eventSource { - case eventSourceS3: - return []ParsedEvent{{ContentType: ContentTypeS3, Payload: event}}, nil - case eventSourceKinesis: - return []ParsedEvent{{ContentType: ContentTypeKinesis, Payload: event}}, nil - case eventSourceSQS: - parsed, err := parseSQS(event) - if err != nil { - return nil, fmt.Errorf("sqs: %w", err) - } - return parsed, nil - case eventSourceSNS: - parsed, err := parseSNS(event) - if err != nil { - return nil, fmt.Errorf("sns: %w", err) - } - return parsed, nil - default: - return nil, fmt.Errorf("unsupported event source %q", eventSource) + case disc.Detail != nil: + parsed, err := eventBridge(event) + if err != nil { + return nil, fmt.Errorf("eventbridge: %w", err) } + return parsed, nil } - return nil, errors.New("no eventSource found") + return nil, errors.New("unsupported event") } -func SkipBrace(dec *json.Decoder) error { - if t, err := dec.Token(); err != nil || t != json.Delim('{') { - return fmt.Errorf("expected '{': %w", err) - } - return nil -} +func records(event json.RawMessage, disc eventDiscriminator) ([]Event, error) { + switch disc.Records[0].EventSource { + case eventSourceS3: + return []Event{{ContentType: ContentTypeS3, Payload: event}}, nil -func SkipBracket(dec *json.Decoder) error { - if t, err := dec.Token(); err != nil || t != json.Delim('[') { - return fmt.Errorf("expected '[': %w", err) - } - return nil -} + case eventSourceKinesis: + return []Event{{ContentType: ContentTypeKinesis, Payload: event}}, nil -func SkipToKey(dec *json.Decoder, key string) error { - for dec.More() { - k, err := dec.Token() + case eventSourceSQS: + parsed, err := sqs(event) if err != nil { - return err + return nil, fmt.Errorf("sqs: %w", err) } + return parsed, nil - if k != key { - if err := Skip(dec); err != nil { - return err - } - continue + case eventSourceSNS: + parsed, err := sns(event) + if err != nil { + return nil, fmt.Errorf("sns: %w", err) } + return parsed, nil - return nil - } - - return &KeyNotFoundError{key} -} - -func SkipToRecords(dec *json.Decoder) error { - if err := SkipBrace(dec); err != nil { - return err - } - - if err := SkipToKey(dec, recordsKey); err != nil { - return err - } - - if err := SkipBracket(dec); err != nil { - return err - } - return nil -} - -func Skip(dec *json.Decoder) error { - var skip json.RawMessage - if err := dec.Decode(&skip); err != nil { - return fmt.Errorf("skip: %w", err) - } - return nil -} - -func skipToEnd(dec *json.Decoder) error { - for dec.More() { - if _, err := dec.Token(); err != nil { - return err - } - if err := Skip(dec); err != nil { - return err - } + default: + return nil, fmt.Errorf("unsupported event source %q", disc.Records[0].EventSource) } - _, err := dec.Token() - return err } diff --git a/aws/logs_monitoring_go/internal/parsing/parse_test.go b/aws/logs_monitoring_go/internal/parsing/parse_test.go index 3b7efb633..2566d5604 100644 --- a/aws/logs_monitoring_go/internal/parsing/parse_test.go +++ b/aws/logs_monitoring_go/internal/parsing/parse_test.go @@ -50,7 +50,7 @@ func TestParse(t *testing.T) { want: []ContentType{ContentTypeEventBridge}, }, "sns with s3": { - event: json.RawMessage(`{"Records":[{"EventSource":"aws:sns","Sns":{"Type":"Notification","Message":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b\"},\"object\":{\"key\":\"k\"}}}]}"}}]}`), + event: json.RawMessage(`{"Records":[{"EventSource":"aws:sns","Sns":{"Type":"Notification","Message":"{\"Records\":[{\"eventSource\":\"aws:s3\",\"s3\":{\"bucket\":{\"name\":\"b\"},\"object\":{\"key\":\"k\"}}}]}"}}]}`), want: []ContentType{ContentTypeS3}, }, "sns standalone": { @@ -58,17 +58,17 @@ func TestParse(t *testing.T) { want: []ContentType{ContentTypeSNS}, }, "sqs with direct s3": { - event: json.RawMessage(`{"Records":[{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b\"},\"object\":{\"key\":\"k\"}}}]}"}]}`), + event: json.RawMessage(`{"Records":[{"eventSource":"aws:sqs","body":"{\"Records\":[{\"eventSource\":\"aws:s3\",\"s3\":{\"bucket\":{\"name\":\"b\"},\"object\":{\"key\":\"k\"}}}]}"}]}`), want: []ContentType{ContentTypeS3}, }, "sqs with sns s3": { - event: json.RawMessage(`{"Records":[{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b\\\"},\\\"object\\\":{\\\"key\\\":\\\"k\\\"}}}]}\"}"}]}`), + event: json.RawMessage(`{"Records":[{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"eventSource\\\":\\\"aws:s3\\\",\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b\\\"},\\\"object\\\":{\\\"key\\\":\\\"k\\\"}}}]}\"}"}]}`), want: []ContentType{ContentTypeS3}, }, "sqs with multiple records": { event: json.RawMessage(`{"Records":[` + - `{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b1\"},\"object\":{\"key\":\"k1\"}}}]}"},` + - `{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b2\"},\"object\":{\"key\":\"k2\"}}}]}"}` + + `{"eventSource":"aws:sqs","body":"{\"Records\":[{\"eventSource\":\"aws:s3\",\"s3\":{\"bucket\":{\"name\":\"b1\"},\"object\":{\"key\":\"k1\"}}}]}"},` + + `{"eventSource":"aws:sqs","body":"{\"Records\":[{\"eventSource\":\"aws:s3\",\"s3\":{\"bucket\":{\"name\":\"b2\"},\"object\":{\"key\":\"k2\"}}}]}"}` + `]}`), want: []ContentType{ContentTypeS3, ContentTypeS3}, }, @@ -83,12 +83,12 @@ func TestParse(t *testing.T) { "sqs mixed valid and unrecognized": { event: json.RawMessage(`{"Records":[` + `{"eventSource":"aws:sqs","body":"{\"foo\":\"bar\"}"},` + - `{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b\"},\"object\":{\"key\":\"k\"}}}]}"}` + + `{"eventSource":"aws:sqs","body":"{\"Records\":[{\"eventSource\":\"aws:s3\",\"s3\":{\"bucket\":{\"name\":\"b\"},\"object\":{\"key\":\"k\"}}}]}"}` + `]}`), - want: []ContentType{ContentTypeS3}, + wantErr: true, }, "sqs with extra fields after body": { - event: json.RawMessage(`{"Records":[{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b\"},\"object\":{\"key\":\"k\"}}}]}","messageId":"abc","receiptHandle":"xyz","attributes":{"ApproximateReceiveCount":"1"}}]}`), + event: json.RawMessage(`{"Records":[{"eventSource":"aws:sqs","body":"{\"Records\":[{\"eventSource\":\"aws:s3\",\"s3\":{\"bucket\":{\"name\":\"b\"},\"object\":{\"key\":\"k\"}}}]}","messageId":"abc","receiptHandle":"xyz","attributes":{"ApproximateReceiveCount":"1"}}]}`), want: []ContentType{ContentTypeS3}, }, "sqs with malformed body json": { diff --git a/aws/logs_monitoring_go/internal/parsing/sns.go b/aws/logs_monitoring_go/internal/parsing/sns.go index 71a68fd95..19331edb6 100644 --- a/aws/logs_monitoring_go/internal/parsing/sns.go +++ b/aws/logs_monitoring_go/internal/parsing/sns.go @@ -6,66 +6,30 @@ package parsing import ( - "bytes" "encoding/json" "fmt" -) -func parseSNS(event json.RawMessage) ([]ParsedEvent, error) { - dec := json.NewDecoder(bytes.NewReader(event)) + "github.com/aws/aws-lambda-go/events" +) - if err := SkipToRecords(dec); err != nil { - return nil, fmt.Errorf("skip to records: %w", err) +func sns(event json.RawMessage) ([]Event, error) { + var snsEvent events.SNSEvent + if err := json.Unmarshal(event, &snsEvent); err != nil { + return nil, fmt.Errorf("unmarshal: %w", err) } - var parsed []ParsedEvent - for dec.More() { - var record json.RawMessage - if err := dec.Decode(&record); err != nil { - return nil, fmt.Errorf("decode: %w", err) - } + var events []Event + for _, record := range snsEvent.Records { + inner := json.RawMessage(record.SNS.Message) - recDec := json.NewDecoder(bytes.NewReader(record)) - if err := SkipBrace(recDec); err != nil { - return nil, err - } - if err := SkipToKey(recDec, "Sns"); err != nil { - return nil, fmt.Errorf("skip to key: %w", err) - } - if err := SkipBrace(recDec); err != nil { - return nil, err - } - if err := SkipToKey(recDec, "Message"); err != nil { - return nil, fmt.Errorf("skip to key: %w", err) - } - - var message string - if err := recDec.Decode(&message); err != nil { - return nil, fmt.Errorf("decode: %w", err) - } - - inner := json.RawMessage(message) - if isS3(inner) { - parsed = append(parsed, ParsedEvent{ContentTypeS3, inner}) + var disc eventDiscriminator + if err := json.Unmarshal(inner, &disc); err == nil && len(disc.Records) > 0 && disc.Records[0].EventSource == eventSourceS3 { + events = append(events, Event{ContentType: ContentTypeS3, Payload: inner}) continue } - parsed = append(parsed, ParsedEvent{ContentTypeSNS, record}) - } - - return parsed, nil -} - -func isS3(message json.RawMessage) bool { - dec := json.NewDecoder(bytes.NewReader(message)) - - if err := SkipToRecords(dec); err != nil { - return false - } - - if err := SkipBrace(dec); err != nil { - return false + events = append(events, Event{ContentType: ContentTypeSNS, Payload: event}) } - return SkipToKey(dec, "s3") == nil + return events, nil } diff --git a/aws/logs_monitoring_go/internal/parsing/sqs.go b/aws/logs_monitoring_go/internal/parsing/sqs.go index 2d5bb6964..99955774a 100644 --- a/aws/logs_monitoring_go/internal/parsing/sqs.go +++ b/aws/logs_monitoring_go/internal/parsing/sqs.go @@ -6,108 +6,59 @@ package parsing import ( - "bytes" "encoding/json" "errors" "fmt" - "log/slog" -) - -func parseSQS(event json.RawMessage) ([]ParsedEvent, error) { - dec := json.NewDecoder(bytes.NewReader(event)) - if err := SkipToRecords(dec); err != nil { - return nil, fmt.Errorf("skip to records: %w", err) - } - - var parsed []ParsedEvent - for i := 0; dec.More(); i++ { - body, err := extractBody(dec) - if err != nil { - return nil, fmt.Errorf("extract body: %w", err) - } - pe, err := parseSQSBody(body) - if errors.Is(err, errUnknownEvent) { - slog.Warn("unknown event from SQS record, skipping", slog.Int("index", i)) - continue - } - if err != nil { - return nil, fmt.Errorf("parse body: %w", err) - } - - parsed = append(parsed, pe) - } + "github.com/aws/aws-lambda-go/events" +) - if len(parsed) == 0 { - return nil, errors.New("no recognizable events in SQS batch") - } - return parsed, nil +type sqsBodyDiscriminator struct { + Type string `json:"Type"` // SNS inside SQS when raw message delivery disabled. See https://docs.aws.amazon.com/sns/latest/dg/sns-large-payload-raw-message-delivery.html + Message string `json:"Message"` // SNS inside SQS when raw message delivery disabled. See https://docs.aws.amazon.com/sns/latest/dg/sns-large-payload-raw-message-delivery.html + eventDiscriminator } -func extractBody(dec *json.Decoder) (string, error) { - if err := SkipBrace(dec); err != nil { - return "", err - } - - if err := SkipToKey(dec, "body"); err != nil { - return "", fmt.Errorf("skip to key: %w", err) +func sqs(event json.RawMessage) ([]Event, error) { + var sqsEvent events.SQSEvent + if err := json.Unmarshal(event, &sqsEvent); err != nil { + return nil, fmt.Errorf("unmarshal: %w", err) } - var body string - if err := dec.Decode(&body); err != nil { - return "", fmt.Errorf("decode: %w", err) - } + var events []Event + for _, msg := range sqsEvent.Records { + e, err := sqsBody(msg.Body) + if err != nil { + return nil, err + } - if err := skipToEnd(dec); err != nil { - return "", fmt.Errorf("skip to end: %w", err) + e.SQSReceiptHandle = msg.ReceiptHandle + events = append(events, e) } - return body, nil + return events, nil } -func parseSQSBody(body string) (ParsedEvent, error) { - inner := json.RawMessage(body) - dec := json.NewDecoder(bytes.NewReader(inner)) +func sqsBody(body string) (Event, error) { + raw := json.RawMessage(body) - if err := SkipBrace(dec); err != nil { - return ParsedEvent{}, err + var disc sqsBodyDiscriminator + if err := json.Unmarshal(raw, &disc); err != nil { + return Event{}, fmt.Errorf("unmarshal: %w", err) } - var typ, message string - for dec.More() { - key, err := dec.Token() - if err != nil { - return ParsedEvent{}, err - } + switch { + case len(disc.Records) > 0 && disc.Records[0].EventSource == eventSourceS3: + return Event{ContentType: ContentTypeS3, Payload: raw}, nil - switch key { - case "Type": - if err := dec.Decode(&typ); err != nil { - return ParsedEvent{}, fmt.Errorf("decode: %w", err) - } - case "Message": - if err := dec.Decode(&message); err != nil { - return ParsedEvent{}, fmt.Errorf("decode: %w", err) - } - case recordsKey: - if isS3(inner) { - return ParsedEvent{ContentTypeS3, inner}, nil - } - return ParsedEvent{}, errUnknownEvent - default: - if err := Skip(dec); err != nil { - return ParsedEvent{}, err - } + case disc.Type == "Notification" && disc.Message != "": + var innerDisc eventDiscriminator + if err := json.Unmarshal([]byte(disc.Message), &innerDisc); err == nil && len(innerDisc.Records) > 0 && innerDisc.Records[0].EventSource == eventSourceS3 { + return Event{ContentType: ContentTypeS3, Payload: json.RawMessage(disc.Message)}, nil } - } - if typ == "Notification" && message != "" { - msg := json.RawMessage(message) - if isS3(msg) { - return ParsedEvent{ContentTypeS3, msg}, nil - } - return ParsedEvent{ContentTypeSNS, inner}, nil + return Event{ContentType: ContentTypeSNS, Payload: raw}, nil } - return ParsedEvent{}, errUnknownEvent + return Event{}, errors.New("unknown event") } diff --git a/aws/logs_monitoring_go/internal/pipeline/pipeline.go b/aws/logs_monitoring_go/internal/pipeline/pipeline.go index 2148a6c9b..33e6df027 100644 --- a/aws/logs_monitoring_go/internal/pipeline/pipeline.go +++ b/aws/logs_monitoring_go/internal/pipeline/pipeline.go @@ -42,9 +42,9 @@ func New( func (p *Pipeline) Start( ctx context.Context, - parsedEvents []parsing.ParsedEvent, + events []parsing.Event, ) error { - contentType := parsedEvents[0].ContentType + contentType := events[0].ContentType if contentType == parsing.ContentTypeRetry { if err := p.forwarder.Retry(ctx); err != nil { return fmt.Errorf("retry: %w", err) @@ -58,7 +58,7 @@ func (p *Pipeline) Start( eg.Go(func() error { defer close(entries) - for _, parsedEvent := range parsedEvents { + for _, parsedEvent := range events { handler, err := handling.NewHandler(ctx, p.hcfg, p.scrubber, p.filterer, parsedEvent.ContentType) if err != nil { return fmt.Errorf("new handler: %w", err)