From 803d27ab8fcf2fdcc111d2b15de3f0f82d478076 Mon Sep 17 00:00:00 2001 From: Nabil Dakkoune Date: Thu, 18 Jun 2026 15:15:48 +0200 Subject: [PATCH 1/5] fix(go-forwarder): prevent log duplication on SQS event failure --- aws/logs_monitoring_go/cmd/forwarder/main.go | 51 ++++++++++++++++--- .../internal/forwarding/forwarding.go | 2 +- .../internal/handling/handler.go | 2 +- .../internal/pipeline/pipeline.go | 46 +++-------------- 4 files changed, 53 insertions(+), 48 deletions(-) diff --git a/aws/logs_monitoring_go/cmd/forwarder/main.go b/aws/logs_monitoring_go/cmd/forwarder/main.go index ac2717ee6..d57f926dd 100644 --- a/aws/logs_monitoring_go/cmd/forwarder/main.go +++ b/aws/logs_monitoring_go/cmd/forwarder/main.go @@ -24,6 +24,7 @@ import ( "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/scrubbing" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/storing" + awsevents "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) @@ -49,15 +50,15 @@ func main() { lambda.Start(handleRequest(cfg)) } -func handleRequest(cfg *config.Config) func(ctx context.Context, event json.RawMessage) error { - return func(ctx context.Context, event json.RawMessage) error { - parsed, err := parsing.Parse(event) +func handleRequest(cfg *config.Config) func(ctx context.Context, awsevent json.RawMessage) (any, error) { + return func(ctx context.Context, awsevent json.RawMessage) (any, error) { + events, err := parsing.Parse(awsevent) if err != nil { - return fmt.Errorf("parse: %w", err) + return nil, fmt.Errorf("parse: %w", err) } - if len(parsed) == 0 { - return errors.New("no events to process") + if len(events) == 0 { + return nil, errors.New("no events to process") } filterer := filtering.NewFilterer(cfg.FilterInclude, cfg.FilterExclude) @@ -80,7 +81,7 @@ func handleRequest(cfg *config.Config) func(ctx context.Context, event json.RawM if cfg.StoreOnFail { storageOpts := storing.Options{S3Bucket: cfg.S3RetryBucketName} if storage, err = storing.NewStorage(ctx, storageOpts); err != nil { - return fmt.Errorf("new storage: %w", err) + return nil, fmt.Errorf("new storage: %w", err) } } @@ -90,6 +91,40 @@ func handleRequest(cfg *config.Config) func(ctx context.Context, event json.RawM storage, ) - return pipeline.New(handlerCfg, scrubber, filterer, forwarder).Start(ctx, parsed) + for i, event := range events { + if event.ContentType == parsing.ContentTypeRetry { + if err := forwarder.Retry(ctx); err != nil { + return nil, fmt.Errorf("retry: %w", err) + } + return nil, nil + } + + handler, err := handling.NewHandler(handlerCfg, scrubber, filterer, event.ContentType) + if err != nil { + return nil, fmt.Errorf("new handler: %w", err) + } + + err = pipeline.New(handler, forwarder).Start(ctx, event) + if err == nil { + continue + } + + if event.SQSReceiptHandle != "" { + return nil, err + } + + var response awsevents.SQSEventResponse + for _, remaining := range events[i:] { + response.BatchItemFailures = append(response.BatchItemFailures, awsevents.SQSBatchItemFailure{ItemIdentifier: remaining.SQSReceiptHandle}) + } + + sqsBatchResponse, marshallErr := json.Marshal(response) + if marshallErr != nil { + return nil, errors.Join(err, marshallErr) + } + return sqsBatchResponse, err + } + + return nil, nil } } diff --git a/aws/logs_monitoring_go/internal/forwarding/forwarding.go b/aws/logs_monitoring_go/internal/forwarding/forwarding.go index c30ee8bbf..bf44b0480 100644 --- a/aws/logs_monitoring_go/internal/forwarding/forwarding.go +++ b/aws/logs_monitoring_go/internal/forwarding/forwarding.go @@ -122,7 +122,7 @@ func (f *Forwarder) Retry(ctx context.Context) error { for _, key := range keys { payload, storageTag, getErr := f.storage.Get(ctx, key) if getErr != nil { - return fmt.Errorf("list: %w", getErr) + return fmt.Errorf("get: %w", getErr) } if sendErr := f.send(ctx, payload, storageTag); sendErr != nil { diff --git a/aws/logs_monitoring_go/internal/handling/handler.go b/aws/logs_monitoring_go/internal/handling/handler.go index 92376b70a..5bd1a02c0 100644 --- a/aws/logs_monitoring_go/internal/handling/handler.go +++ b/aws/logs_monitoring_go/internal/handling/handler.go @@ -30,7 +30,7 @@ type Config struct { S3MultilineLogRegex *regexp.Regexp } -func NewHandler(ctx context.Context, hcfg Config, scrubber *scrubbing.Scrubber, filterer *filtering.Filterer, ct parsing.ContentType) (Handler, error) { +func NewHandler(hcfg Config, scrubber *scrubbing.Scrubber, filterer *filtering.Filterer, ct parsing.ContentType) (Handler, error) { switch ct { case parsing.ContentTypeCloudwatchLogs: diff --git a/aws/logs_monitoring_go/internal/pipeline/pipeline.go b/aws/logs_monitoring_go/internal/pipeline/pipeline.go index 33e6df027..17d47017f 100644 --- a/aws/logs_monitoring_go/internal/pipeline/pipeline.go +++ b/aws/logs_monitoring_go/internal/pipeline/pipeline.go @@ -10,68 +10,38 @@ import ( "errors" "fmt" - "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/filtering" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/forwarding" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/handling" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/parsing" - "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/scrubbing" "golang.org/x/sync/errgroup" ) type Pipeline struct { - hcfg handling.Config - scrubber *scrubbing.Scrubber - filterer *filtering.Filterer + handler handling.Handler forwarder *forwarding.Forwarder } -func New( - hcfg handling.Config, - scrubber *scrubbing.Scrubber, - filterer *filtering.Filterer, - forwarder *forwarding.Forwarder, -) *Pipeline { - return &Pipeline{ - hcfg: hcfg, - scrubber: scrubber, - filterer: filterer, - forwarder: forwarder, - } +func New(handler handling.Handler, forwarder *forwarding.Forwarder) Pipeline { + return Pipeline{handler: handler, forwarder: forwarder} } -func (p *Pipeline) Start( +func (p Pipeline) Start( ctx context.Context, - events []parsing.Event, + event parsing.Event, ) error { - contentType := events[0].ContentType - if contentType == parsing.ContentTypeRetry { - if err := p.forwarder.Retry(ctx); err != nil { - return fmt.Errorf("retry: %w", err) - } - return nil - } - eg, ctx := errgroup.WithContext(ctx) entries := make(chan model.LogEntry) - eg.Go(func() error { defer close(entries) - for _, parsedEvent := range events { - handler, err := handling.NewHandler(ctx, p.hcfg, p.scrubber, p.filterer, parsedEvent.ContentType) - if err != nil { - return fmt.Errorf("new handler: %w", err) - } - - if err := handler.Handle(ctx, parsedEvent.Payload, entries); err != nil { - return fmt.Errorf("handle: %w", err) - } + if err := p.handler.Handle(ctx, event.Payload, entries); err != nil { + return fmt.Errorf("handle: %w", err) } return nil }) - err := p.forwarder.Start(ctx, entries, forwarding.StorageTag(contentType)) + err := p.forwarder.Start(ctx, entries, forwarding.StorageTag(event.ContentType)) if waitErr := eg.Wait(); waitErr != nil { return errors.Join(err, waitErr) } From 018b6671a72326b9d5042f1d3fd59f2874879b43 Mon Sep 17 00:00:00 2001 From: Nabil Dakkoune Date: Fri, 19 Jun 2026 13:03:47 +0200 Subject: [PATCH 2/5] if sqs, lets use a different path + use its own type --- aws/logs_monitoring_go/cmd/forwarder/main.go | 54 +++-------- .../internal/parsing/content.go | 5 +- .../internal/parsing/eventbridge.go | 12 +-- .../internal/parsing/parse.go | 51 +++++----- .../internal/parsing/parse_test.go | 96 ++++++++++++------- .../internal/parsing/sns.go | 23 +++-- .../internal/parsing/sqs.go | 27 ++++-- .../internal/pipeline/pipeline.go | 63 ++++++++++-- 8 files changed, 191 insertions(+), 140 deletions(-) diff --git a/aws/logs_monitoring_go/cmd/forwarder/main.go b/aws/logs_monitoring_go/cmd/forwarder/main.go index d57f926dd..c92a221d0 100644 --- a/aws/logs_monitoring_go/cmd/forwarder/main.go +++ b/aws/logs_monitoring_go/cmd/forwarder/main.go @@ -8,7 +8,6 @@ package main import ( "context" "encoding/json" - "errors" "fmt" "log" "log/slog" @@ -24,7 +23,6 @@ import ( "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/scrubbing" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/storing" - awsevents "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) @@ -52,15 +50,6 @@ func main() { func handleRequest(cfg *config.Config) func(ctx context.Context, awsevent json.RawMessage) (any, error) { return func(ctx context.Context, awsevent json.RawMessage) (any, error) { - events, err := parsing.Parse(awsevent) - if err != nil { - return nil, fmt.Errorf("parse: %w", err) - } - - if len(events) == 0 { - return nil, errors.New("no events to process") - } - filterer := filtering.NewFilterer(cfg.FilterInclude, cfg.FilterExclude) scrubber := scrubbing.NewScrubber(cfg.ScrubbingRegex, cfg.ScrubbingReplacement, cfg.ScrubIP, cfg.ScrubEmail) handlerCfg := handling.Config{ @@ -70,7 +59,6 @@ func handleRequest(cfg *config.Config) func(ctx context.Context, awsevent json.R Tags: cfg.Tags, S3MultilineLogRegex: cfg.S3MultilineLogRegex, } - forwarderCfg := forwarding.Config{ APIKey: cfg.APIKey, IntakeURL: cfg.IntakeURL, @@ -78,10 +66,11 @@ func handleRequest(cfg *config.Config) func(ctx context.Context, awsevent json.R } var storage storing.Storage + var storageErr error if cfg.StoreOnFail { storageOpts := storing.Options{S3Bucket: cfg.S3RetryBucketName} - if storage, err = storing.NewStorage(ctx, storageOpts); err != nil { - return nil, fmt.Errorf("new storage: %w", err) + if storage, storageErr = storing.NewStorage(ctx, storageOpts); storageErr != nil { + return nil, fmt.Errorf("new storage: %w", storageErr) } } @@ -90,41 +79,22 @@ func handleRequest(cfg *config.Config) func(ctx context.Context, awsevent json.R httpclient.Client, storage, ) + pipeline := pipeline.New(handlerCfg, scrubber, filterer, forwarder) - for i, event := range events { - if event.ContentType == parsing.ContentTypeRetry { - if err := forwarder.Retry(ctx); err != nil { - return nil, fmt.Errorf("retry: %w", err) - } - return nil, nil - } - - handler, err := handling.NewHandler(handlerCfg, scrubber, filterer, event.ContentType) + if parsing.IsSQS(awsevent) { + events, err := parsing.SQS(awsevent) if err != nil { - return nil, fmt.Errorf("new handler: %w", err) - } - - err = pipeline.New(handler, forwarder).Start(ctx, event) - if err == nil { - continue - } - - if event.SQSReceiptHandle != "" { return nil, err } - var response awsevents.SQSEventResponse - for _, remaining := range events[i:] { - response.BatchItemFailures = append(response.BatchItemFailures, awsevents.SQSBatchItemFailure{ItemIdentifier: remaining.SQSReceiptHandle}) - } + return pipeline.StartSQS(ctx, events) + } - sqsBatchResponse, marshallErr := json.Marshal(response) - if marshallErr != nil { - return nil, errors.Join(err, marshallErr) - } - return sqsBatchResponse, err + event, err := parsing.Parse(awsevent) + if err != nil { + return nil, fmt.Errorf("parse: %w", err) } - return nil, nil + return nil, pipeline.Start(ctx, event) } } diff --git a/aws/logs_monitoring_go/internal/parsing/content.go b/aws/logs_monitoring_go/internal/parsing/content.go index 8818e7d62..9a98d62e3 100644 --- a/aws/logs_monitoring_go/internal/parsing/content.go +++ b/aws/logs_monitoring_go/internal/parsing/content.go @@ -21,7 +21,6 @@ const ( ) type Event struct { - ContentType ContentType - Payload json.RawMessage - SQSReceiptHandle string + ContentType ContentType + Payload json.RawMessage } diff --git a/aws/logs_monitoring_go/internal/parsing/eventbridge.go b/aws/logs_monitoring_go/internal/parsing/eventbridge.go index 823fd8e23..96cc5bcfd 100644 --- a/aws/logs_monitoring_go/internal/parsing/eventbridge.go +++ b/aws/logs_monitoring_go/internal/parsing/eventbridge.go @@ -22,26 +22,26 @@ type s3EventBridgeDetail struct { } `json:"object"` } -func eventBridge(event json.RawMessage) ([]Event, error) { +func eventBridge(event json.RawMessage) (Event, error) { var eventBridgeEvent events.EventBridgeEvent if err := json.Unmarshal(event, &eventBridgeEvent); err != nil { - return nil, fmt.Errorf("unmarshal: %w", err) + return Event{}, fmt.Errorf("unmarshal: %w", err) } if eventBridgeEvent.Source == "aws.s3" && strings.Contains(eventBridgeEvent.DetailType, "Object Created") { var s3eb s3EventBridgeDetail if err := json.Unmarshal(eventBridgeEvent.Detail, &s3eb); err != nil { - return nil, fmt.Errorf("unmarshal: %w", err) + return Event{}, fmt.Errorf("unmarshal: %w", err) } payload, err := mapS3EventBridge(s3eb) if err != nil { - return nil, err + return Event{}, err } - return []Event{{ContentType: ContentTypeS3, Payload: payload}}, nil + return Event{ContentType: ContentTypeS3, Payload: payload}, nil } - return []Event{{ContentType: ContentTypeEventBridge, Payload: event}}, nil + return Event{ContentType: ContentTypeEventBridge, Payload: event}, nil } func mapS3EventBridge(eb s3EventBridgeDetail) (json.RawMessage, error) { diff --git a/aws/logs_monitoring_go/internal/parsing/parse.go b/aws/logs_monitoring_go/internal/parsing/parse.go index 537771cd6..f12b5b6bd 100644 --- a/aws/logs_monitoring_go/internal/parsing/parse.go +++ b/aws/logs_monitoring_go/internal/parsing/parse.go @@ -20,67 +20,64 @@ const ( type eventDiscriminator struct { AWSLogs json.RawMessage `json:"awslogs"` // CloudWatch logs + recordsDiscriminator + Detail json.RawMessage `json:"detail"` // EventBridge + Retry json.RawMessage `json:"retry"` +} + +type recordsDiscriminator struct { Records []struct { EventSource string `json:"eventSource"` } `json:"Records"` // S3, SQS, SNS - Detail json.RawMessage `json:"detail"` // EventBridge - Retry json.RawMessage `json:"retry"` } -func Parse(event json.RawMessage) ([]Event, error) { +func Parse(event json.RawMessage) (Event, error) { var disc eventDiscriminator if err := json.Unmarshal(event, &disc); err != nil { - return nil, fmt.Errorf("unmarshal: %w", err) + return Event{}, fmt.Errorf("unmarshal: %w", err) } switch { case disc.AWSLogs != nil: - return []Event{{ContentType: ContentTypeCloudwatchLogs, Payload: event}}, nil + return Event{ContentType: ContentTypeCloudwatchLogs, Payload: event}, nil case disc.Retry != nil: - return []Event{{ContentType: ContentTypeRetry}}, nil + return Event{ContentType: ContentTypeRetry}, nil case len(disc.Records) > 0: - parsed, err := records(event, disc) + event, err := records(event, disc) if err != nil { - return nil, fmt.Errorf("records: %w", err) + return Event{}, fmt.Errorf("records: %w", err) } - return parsed, nil + return event, nil case disc.Detail != nil: - parsed, err := eventBridge(event) + event, err := eventBridge(event) if err != nil { - return nil, fmt.Errorf("eventbridge: %w", err) + return Event{}, fmt.Errorf("eventbridge: %w", err) } - return parsed, nil + return event, nil } - return nil, errors.New("unsupported event") + return Event{}, errors.New("unsupported event") } -func records(event json.RawMessage, disc eventDiscriminator) ([]Event, error) { +func records(event json.RawMessage, disc eventDiscriminator) (Event, error) { switch disc.Records[0].EventSource { case eventSourceS3: - return []Event{{ContentType: ContentTypeS3, Payload: event}}, nil + return Event{ContentType: ContentTypeS3, Payload: event}, nil case eventSourceKinesis: - return []Event{{ContentType: ContentTypeKinesis, Payload: event}}, nil - - case eventSourceSQS: - parsed, err := sqs(event) - if err != nil { - return nil, fmt.Errorf("sqs: %w", err) - } - return parsed, nil + return Event{ContentType: ContentTypeKinesis, Payload: event}, nil case eventSourceSNS: - parsed, err := sns(event) + event, err := sns(event) if err != nil { - return nil, fmt.Errorf("sns: %w", err) + return Event{}, fmt.Errorf("sns: %w", err) } - return parsed, nil + return event, nil default: - return nil, fmt.Errorf("unsupported event source %q", disc.Records[0].EventSource) + return Event{}, fmt.Errorf("unsupported event source %q", disc.Records[0].EventSource) } } diff --git a/aws/logs_monitoring_go/internal/parsing/parse_test.go b/aws/logs_monitoring_go/internal/parsing/parse_test.go index 2566d5604..a5b108a9b 100644 --- a/aws/logs_monitoring_go/internal/parsing/parse_test.go +++ b/aws/logs_monitoring_go/internal/parsing/parse_test.go @@ -18,102 +18,130 @@ func TestParse(t *testing.T) { tests := map[string]struct { event json.RawMessage - want []ContentType + want ContentType wantErr bool }{ "cloudwatch logs": { event: json.RawMessage(`{"awslogs":{"data":"dGVzdA=="}}`), - want: []ContentType{ContentTypeCloudwatchLogs}, + want: ContentTypeCloudwatchLogs, }, "s3": { event: json.RawMessage(`{"Records":[{"eventSource":"aws:s3","s3":{"bucket":{"name":"b"},"object":{"key":"k"}}}]}`), - want: []ContentType{ContentTypeS3}, + want: ContentTypeS3, }, "kinesis": { event: json.RawMessage(`{"Records":[{"eventSource":"aws:kinesis","kinesis":{"data":"dGVzdA=="}}]}`), - want: []ContentType{ContentTypeKinesis}, + want: ContentTypeKinesis, }, "eventbridge generic": { event: json.RawMessage(`{"version":"0","id":"abc","detail-type":"Scheduled Event","source":"aws.events","detail":{}}`), - want: []ContentType{ContentTypeEventBridge}, + want: ContentTypeEventBridge, }, "eventbridge s3": { event: json.RawMessage(`{"version":"0","detail-type":"Object Created","source":"aws.s3","detail":{"bucket":{"name":"my-bucket"},"object":{"key":"my-key"}}}`), - want: []ContentType{ContentTypeS3}, + want: ContentTypeS3, }, "eventbridge ec2": { event: json.RawMessage(`{"version":"0","detail-type":"EC2 Instance State-change Notification","source":"aws.ec2","detail":{"instance-id":"i-123"}}`), - want: []ContentType{ContentTypeEventBridge}, + want: ContentTypeEventBridge, }, "eventbridge s3 without object created": { event: json.RawMessage(`{"version":"0","detail-type":"Object Deleted","source":"aws.s3","detail":{}}`), - want: []ContentType{ContentTypeEventBridge}, + want: ContentTypeEventBridge, }, "sns with s3": { event: json.RawMessage(`{"Records":[{"EventSource":"aws:sns","Sns":{"Type":"Notification","Message":"{\"Records\":[{\"eventSource\":\"aws:s3\",\"s3\":{\"bucket\":{\"name\":\"b\"},\"object\":{\"key\":\"k\"}}}]}"}}]}`), - want: []ContentType{ContentTypeS3}, + want: ContentTypeS3, }, "sns standalone": { event: json.RawMessage(`{"Records":[{"EventSource":"aws:sns","Sns":{"Type":"Notification","Message":"hello world","TopicArn":"arn:aws:sns:us-east-1:123456789012:my-topic"}}]}`), - want: []ContentType{ContentTypeSNS}, + want: ContentTypeSNS, + }, + "empty object": { + event: json.RawMessage(`{}`), + wantErr: true, + }, + "unsupported source": { + event: json.RawMessage(`{"Records":[{"eventSource":"aws:dynamodb"}]}`), + wantErr: true, + }, + "not JSON": { + event: json.RawMessage(`not json`), + wantErr: true, }, - "sqs with direct s3": { + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + t.Parallel() + + got, err := Parse(tc.event) + + if tc.wantErr { + require.Error(t, err) + return + } + + require.NoError(t, err) + assert.Equal(t, tc.want, got.ContentType) + assert.NotEmpty(t, got.Payload) + }) + } +} + +func TestSQS(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + event json.RawMessage + want []ContentType + wantErr bool + }{ + "direct s3": { event: json.RawMessage(`{"Records":[{"eventSource":"aws:sqs","body":"{\"Records\":[{\"eventSource\":\"aws:s3\",\"s3\":{\"bucket\":{\"name\":\"b\"},\"object\":{\"key\":\"k\"}}}]}"}]}`), want: []ContentType{ContentTypeS3}, }, - "sqs with sns s3": { + "sns s3": { event: json.RawMessage(`{"Records":[{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"eventSource\\\":\\\"aws:s3\\\",\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b\\\"},\\\"object\\\":{\\\"key\\\":\\\"k\\\"}}}]}\"}"}]}`), want: []ContentType{ContentTypeS3}, }, - "sqs with multiple records": { + "multiple records": { event: json.RawMessage(`{"Records":[` + `{"eventSource":"aws:sqs","body":"{\"Records\":[{\"eventSource\":\"aws:s3\",\"s3\":{\"bucket\":{\"name\":\"b1\"},\"object\":{\"key\":\"k1\"}}}]}"},` + `{"eventSource":"aws:sqs","body":"{\"Records\":[{\"eventSource\":\"aws:s3\",\"s3\":{\"bucket\":{\"name\":\"b2\"},\"object\":{\"key\":\"k2\"}}}]}"}` + `]}`), want: []ContentType{ContentTypeS3, ContentTypeS3}, }, - "sqs with sns standalone": { + "sns standalone": { event: json.RawMessage(`{"Records":[{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"hello world\"}"}]}`), want: []ContentType{ContentTypeSNS}, }, - "sqs with subscription confirmation skipped": { + "subscription confirmation skipped": { event: json.RawMessage(`{"Records":[{"eventSource":"aws:sqs","body":"{\"Type\":\"SubscriptionConfirmation\",\"Message\":\"confirm\"}"}]}`), wantErr: true, }, - "sqs mixed valid and unrecognized": { + "mixed valid and unrecognized": { event: json.RawMessage(`{"Records":[` + `{"eventSource":"aws:sqs","body":"{\"foo\":\"bar\"}"},` + `{"eventSource":"aws:sqs","body":"{\"Records\":[{\"eventSource\":\"aws:s3\",\"s3\":{\"bucket\":{\"name\":\"b\"},\"object\":{\"key\":\"k\"}}}]}"}` + `]}`), wantErr: true, }, - "sqs with extra fields after body": { + "extra fields after body": { event: json.RawMessage(`{"Records":[{"eventSource":"aws:sqs","body":"{\"Records\":[{\"eventSource\":\"aws:s3\",\"s3\":{\"bucket\":{\"name\":\"b\"},\"object\":{\"key\":\"k\"}}}]}","messageId":"abc","receiptHandle":"xyz","attributes":{"ApproximateReceiveCount":"1"}}]}`), want: []ContentType{ContentTypeS3}, }, - "sqs with malformed body json": { + "malformed body json": { event: json.RawMessage(`{"Records":[{"eventSource":"aws:sqs","body":"not json"}]}`), wantErr: true, }, - "empty object": { - event: json.RawMessage(`{}`), - wantErr: true, - }, - "unsupported source": { - event: json.RawMessage(`{"Records":[{"eventSource":"aws:dynamodb"}]}`), - wantErr: true, - }, - "not JSON": { - event: json.RawMessage(`not json`), - wantErr: true, - }, } for name, tc := range tests { t.Run(name, func(t *testing.T) { t.Parallel() - got, err := Parse(tc.event) + got, err := SQS(tc.event) if tc.wantErr { require.Error(t, err) @@ -123,9 +151,9 @@ func TestParse(t *testing.T) { require.NoError(t, err) require.Len(t, got, len(tc.want)) - for i, pe := range got { - assert.Equal(t, tc.want[i], pe.ContentType) - assert.NotEmpty(t, pe.Payload) + for i, se := range got { + assert.Equal(t, tc.want[i], se.ContentType) + assert.NotEmpty(t, se.Payload) } }) } diff --git a/aws/logs_monitoring_go/internal/parsing/sns.go b/aws/logs_monitoring_go/internal/parsing/sns.go index 19331edb6..cf4847859 100644 --- a/aws/logs_monitoring_go/internal/parsing/sns.go +++ b/aws/logs_monitoring_go/internal/parsing/sns.go @@ -7,29 +7,28 @@ package parsing import ( "encoding/json" + "errors" "fmt" "github.com/aws/aws-lambda-go/events" ) -func sns(event json.RawMessage) ([]Event, error) { +func sns(event json.RawMessage) (Event, error) { var snsEvent events.SNSEvent if err := json.Unmarshal(event, &snsEvent); err != nil { - return nil, fmt.Errorf("unmarshal: %w", err) + return Event{}, fmt.Errorf("unmarshal: %w", err) } - var events []Event - for _, record := range snsEvent.Records { - inner := json.RawMessage(record.SNS.Message) + inner := json.RawMessage(snsEvent.Records[0].SNS.Message) - var disc eventDiscriminator - if err := json.Unmarshal(inner, &disc); err == nil && len(disc.Records) > 0 && disc.Records[0].EventSource == eventSourceS3 { - events = append(events, Event{ContentType: ContentTypeS3, Payload: inner}) - continue - } + var disc recordsDiscriminator + if err := json.Unmarshal(inner, &disc); err != nil { + return Event{ContentType: ContentTypeSNS, Payload: inner}, nil + } - events = append(events, Event{ContentType: ContentTypeSNS, Payload: event}) + if len(disc.Records) > 0 && disc.Records[0].EventSource == eventSourceS3 { + return Event{ContentType: ContentTypeS3, Payload: inner}, nil } - return events, nil + return Event{}, errors.New("") } diff --git a/aws/logs_monitoring_go/internal/parsing/sqs.go b/aws/logs_monitoring_go/internal/parsing/sqs.go index 99955774a..eb6165dd3 100644 --- a/aws/logs_monitoring_go/internal/parsing/sqs.go +++ b/aws/logs_monitoring_go/internal/parsing/sqs.go @@ -10,30 +10,41 @@ import ( "errors" "fmt" - "github.com/aws/aws-lambda-go/events" + awsevents "github.com/aws/aws-lambda-go/events" ) +type SQSEvent struct { + Event + SQSReceiptHandle string +} + type sqsBodyDiscriminator struct { Type string `json:"Type"` // SNS inside SQS when raw message delivery disabled. See https://docs.aws.amazon.com/sns/latest/dg/sns-large-payload-raw-message-delivery.html Message string `json:"Message"` // SNS inside SQS when raw message delivery disabled. See https://docs.aws.amazon.com/sns/latest/dg/sns-large-payload-raw-message-delivery.html eventDiscriminator } -func sqs(event json.RawMessage) ([]Event, error) { - var sqsEvent events.SQSEvent - if err := json.Unmarshal(event, &sqsEvent); err != nil { +func IsSQS(event json.RawMessage) bool { + var disc recordsDiscriminator + if err := json.Unmarshal(event, &disc); err != nil { + return false + } + return len(disc.Records) > 0 && disc.Records[0].EventSource == eventSourceSQS +} + +func SQS(awsevent json.RawMessage) (events []SQSEvent, err error) { + var sqsEvent awsevents.SQSEvent + if err := json.Unmarshal(awsevent, &sqsEvent); err != nil { return nil, fmt.Errorf("unmarshal: %w", err) } - var events []Event for _, msg := range sqsEvent.Records { - e, err := sqsBody(msg.Body) + event, err := sqsBody(msg.Body) if err != nil { return nil, err } - e.SQSReceiptHandle = msg.ReceiptHandle - events = append(events, e) + events = append(events, SQSEvent{Event: event, SQSReceiptHandle: msg.ReceiptHandle}) } return events, nil diff --git a/aws/logs_monitoring_go/internal/pipeline/pipeline.go b/aws/logs_monitoring_go/internal/pipeline/pipeline.go index 17d47017f..88d12e599 100644 --- a/aws/logs_monitoring_go/internal/pipeline/pipeline.go +++ b/aws/logs_monitoring_go/internal/pipeline/pipeline.go @@ -7,35 +7,61 @@ package pipeline import ( "context" + "encoding/json" "errors" "fmt" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/filtering" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/forwarding" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/handling" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/parsing" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/scrubbing" "golang.org/x/sync/errgroup" + + awsevents "github.com/aws/aws-lambda-go/events" ) type Pipeline struct { - handler handling.Handler + hcfg handling.Config + scrubber *scrubbing.Scrubber + filterer *filtering.Filterer forwarder *forwarding.Forwarder } -func New(handler handling.Handler, forwarder *forwarding.Forwarder) Pipeline { - return Pipeline{handler: handler, forwarder: forwarder} +func New( + hcfg handling.Config, + scrubber *scrubbing.Scrubber, + filterer *filtering.Filterer, + forwarder *forwarding.Forwarder, +) *Pipeline { + return &Pipeline{ + hcfg: hcfg, + scrubber: scrubber, + filterer: filterer, + forwarder: forwarder, + } } -func (p Pipeline) Start( - ctx context.Context, - event parsing.Event, -) error { +func (p Pipeline) Start(ctx context.Context, event parsing.Event) error { + if event.ContentType == parsing.ContentTypeRetry { + if err := p.forwarder.Retry(ctx); err != nil { + return fmt.Errorf("retry: %w", err) + } + return nil + } + eg, ctx := errgroup.WithContext(ctx) entries := make(chan model.LogEntry) eg.Go(func() error { defer close(entries) - if err := p.handler.Handle(ctx, event.Payload, entries); err != nil { + handler, err := handling.NewHandler(p.hcfg, p.scrubber, p.filterer, event.ContentType) + if err != nil { + return fmt.Errorf("new handler: %w", err) + } + + if err := handler.Handle(ctx, event.Payload, entries); err != nil { return fmt.Errorf("handle: %w", err) } return nil @@ -47,3 +73,24 @@ func (p Pipeline) Start( } return err } + +func (p Pipeline) StartSQS(ctx context.Context, events []parsing.SQSEvent) (sqsEventResponse json.RawMessage, err error) { + var response awsevents.SQSEventResponse + for i, event := range events { + err = p.Start(ctx, event.Event) + if err == nil { + continue + } + + for _, remaining := range events[i:] { + response.BatchItemFailures = append(response.BatchItemFailures, awsevents.SQSBatchItemFailure{ItemIdentifier: remaining.SQSReceiptHandle}) + } + + break + } + + sqsEventResponse, marshallErr := json.Marshal(response) + err = errors.Join(err, marshallErr) + + return sqsEventResponse, err +} From 121c54098d744060377244fb8abf1324aa2a97c9 Mon Sep 17 00:00:00 2001 From: Nabil Dakkoune Date: Fri, 19 Jun 2026 14:22:18 +0200 Subject: [PATCH 3/5] move parsing logic down to the pipeline --- aws/logs_monitoring_go/cmd/forwarder/main.go | 18 +---- .../internal/pipeline/pipeline.go | 70 ++++++++++++------- 2 files changed, 44 insertions(+), 44 deletions(-) diff --git a/aws/logs_monitoring_go/cmd/forwarder/main.go b/aws/logs_monitoring_go/cmd/forwarder/main.go index c92a221d0..98fcf02a9 100644 --- a/aws/logs_monitoring_go/cmd/forwarder/main.go +++ b/aws/logs_monitoring_go/cmd/forwarder/main.go @@ -18,7 +18,6 @@ import ( "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/forwarding" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/handling" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/httpclient" - "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/parsing" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/pipeline" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/scrubbing" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/storing" @@ -79,22 +78,7 @@ func handleRequest(cfg *config.Config) func(ctx context.Context, awsevent json.R httpclient.Client, storage, ) - pipeline := pipeline.New(handlerCfg, scrubber, filterer, forwarder) - if parsing.IsSQS(awsevent) { - events, err := parsing.SQS(awsevent) - if err != nil { - return nil, err - } - - return pipeline.StartSQS(ctx, events) - } - - event, err := parsing.Parse(awsevent) - if err != nil { - return nil, fmt.Errorf("parse: %w", err) - } - - return nil, pipeline.Start(ctx, event) + return pipeline.New(handlerCfg, scrubber, filterer, forwarder).Start(ctx, awsevent) } } diff --git a/aws/logs_monitoring_go/internal/pipeline/pipeline.go b/aws/logs_monitoring_go/internal/pipeline/pipeline.go index 88d12e599..e90cdfffc 100644 --- a/aws/logs_monitoring_go/internal/pipeline/pipeline.go +++ b/aws/logs_monitoring_go/internal/pipeline/pipeline.go @@ -34,8 +34,8 @@ func New( scrubber *scrubbing.Scrubber, filterer *filtering.Filterer, forwarder *forwarding.Forwarder, -) *Pipeline { - return &Pipeline{ +) Pipeline { + return Pipeline{ hcfg: hcfg, scrubber: scrubber, filterer: filterer, @@ -43,14 +43,50 @@ func New( } } -func (p Pipeline) Start(ctx context.Context, event parsing.Event) error { +func (p Pipeline) Start(ctx context.Context, awsevent json.RawMessage) (any, error) { + if parsing.IsSQS(awsevent) { + events, err := parsing.SQS(awsevent) + if err != nil { + return nil, err + } + + return p.startSQS(ctx, events) + } + + event, err := parsing.Parse(awsevent) + if err != nil { + return nil, fmt.Errorf("parse: %w", err) + } + if event.ContentType == parsing.ContentTypeRetry { - if err := p.forwarder.Retry(ctx); err != nil { - return fmt.Errorf("retry: %w", err) + return nil, p.forwarder.Retry(ctx) + } + + return nil, p.run(ctx, event) +} + +func (p Pipeline) startSQS(ctx context.Context, events []parsing.SQSEvent) (sqsEventResponse json.RawMessage, err error) { + var response awsevents.SQSEventResponse + for i, event := range events { + err = p.run(ctx, event.Event) + if err == nil { + continue } - return nil + + for _, remaining := range events[i:] { + response.BatchItemFailures = append(response.BatchItemFailures, awsevents.SQSBatchItemFailure{ItemIdentifier: remaining.SQSReceiptHandle}) + } + + break } + sqsEventResponse, marshallErr := json.Marshal(response) + err = errors.Join(err, marshallErr) + + return sqsEventResponse, err +} + +func (p Pipeline) run(ctx context.Context, event parsing.Event) error { eg, ctx := errgroup.WithContext(ctx) entries := make(chan model.LogEntry) @@ -71,26 +107,6 @@ func (p Pipeline) Start(ctx context.Context, event parsing.Event) error { if waitErr := eg.Wait(); waitErr != nil { return errors.Join(err, waitErr) } - return err -} - -func (p Pipeline) StartSQS(ctx context.Context, events []parsing.SQSEvent) (sqsEventResponse json.RawMessage, err error) { - var response awsevents.SQSEventResponse - for i, event := range events { - err = p.Start(ctx, event.Event) - if err == nil { - continue - } - - for _, remaining := range events[i:] { - response.BatchItemFailures = append(response.BatchItemFailures, awsevents.SQSBatchItemFailure{ItemIdentifier: remaining.SQSReceiptHandle}) - } - - break - } - - sqsEventResponse, marshallErr := json.Marshal(response) - err = errors.Join(err, marshallErr) - return sqsEventResponse, err + return err } From 47fdde3d80dbe2eb803b8d60055d598f34bf29c1 Mon Sep 17 00:00:00 2001 From: Nabil Dakkoune Date: Fri, 19 Jun 2026 14:25:21 +0200 Subject: [PATCH 4/5] fix messages on errors --- aws/logs_monitoring_go/internal/parsing/sns.go | 3 +-- aws/logs_monitoring_go/internal/parsing/sqs.go | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/aws/logs_monitoring_go/internal/parsing/sns.go b/aws/logs_monitoring_go/internal/parsing/sns.go index cf4847859..654f55bbe 100644 --- a/aws/logs_monitoring_go/internal/parsing/sns.go +++ b/aws/logs_monitoring_go/internal/parsing/sns.go @@ -7,7 +7,6 @@ package parsing import ( "encoding/json" - "errors" "fmt" "github.com/aws/aws-lambda-go/events" @@ -30,5 +29,5 @@ func sns(event json.RawMessage) (Event, error) { return Event{ContentType: ContentTypeS3, Payload: inner}, nil } - return Event{}, errors.New("") + return Event{}, fmt.Errorf("unknown event: %q", disc.Records[0].EventSource) } diff --git a/aws/logs_monitoring_go/internal/parsing/sqs.go b/aws/logs_monitoring_go/internal/parsing/sqs.go index eb6165dd3..ce3acc16b 100644 --- a/aws/logs_monitoring_go/internal/parsing/sqs.go +++ b/aws/logs_monitoring_go/internal/parsing/sqs.go @@ -7,7 +7,6 @@ package parsing import ( "encoding/json" - "errors" "fmt" awsevents "github.com/aws/aws-lambda-go/events" @@ -71,5 +70,5 @@ func sqsBody(body string) (Event, error) { return Event{ContentType: ContentTypeSNS, Payload: raw}, nil } - return Event{}, errors.New("unknown event") + return Event{}, fmt.Errorf("unknown event: %q", disc.Records[0].EventSource) } From 94ed26f31be830a9d89a1dd2269ef402e94dab4e Mon Sep 17 00:00:00 2001 From: Nabil Dakkoune Date: Fri, 19 Jun 2026 15:06:33 +0200 Subject: [PATCH 5/5] fixup! fix messages on errors --- aws/logs_monitoring_go/internal/parsing/sqs.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/aws/logs_monitoring_go/internal/parsing/sqs.go b/aws/logs_monitoring_go/internal/parsing/sqs.go index ce3acc16b..86674c93f 100644 --- a/aws/logs_monitoring_go/internal/parsing/sqs.go +++ b/aws/logs_monitoring_go/internal/parsing/sqs.go @@ -20,7 +20,7 @@ type SQSEvent struct { type sqsBodyDiscriminator struct { Type string `json:"Type"` // SNS inside SQS when raw message delivery disabled. See https://docs.aws.amazon.com/sns/latest/dg/sns-large-payload-raw-message-delivery.html Message string `json:"Message"` // SNS inside SQS when raw message delivery disabled. See https://docs.aws.amazon.com/sns/latest/dg/sns-large-payload-raw-message-delivery.html - eventDiscriminator + recordsDiscriminator } func IsSQS(event json.RawMessage) bool { @@ -62,7 +62,7 @@ func sqsBody(body string) (Event, error) { return Event{ContentType: ContentTypeS3, Payload: raw}, nil case disc.Type == "Notification" && disc.Message != "": - var innerDisc eventDiscriminator + var innerDisc recordsDiscriminator if err := json.Unmarshal([]byte(disc.Message), &innerDisc); err == nil && len(innerDisc.Records) > 0 && innerDisc.Records[0].EventSource == eventSourceS3 { return Event{ContentType: ContentTypeS3, Payload: json.RawMessage(disc.Message)}, nil } @@ -70,5 +70,5 @@ func sqsBody(body string) (Event, error) { return Event{ContentType: ContentTypeSNS, Payload: raw}, nil } - return Event{}, fmt.Errorf("unknown event: %q", disc.Records[0].EventSource) + return Event{}, fmt.Errorf("unknown event") }