diff --git a/aws/logs_monitoring_go/cmd/forwarder/main.go b/aws/logs_monitoring_go/cmd/forwarder/main.go index ac2717ee6..98fcf02a9 100644 --- a/aws/logs_monitoring_go/cmd/forwarder/main.go +++ b/aws/logs_monitoring_go/cmd/forwarder/main.go @@ -8,7 +8,6 @@ package main import ( "context" "encoding/json" - "errors" "fmt" "log" "log/slog" @@ -19,7 +18,6 @@ import ( "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/forwarding" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/handling" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/httpclient" - "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/parsing" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/pipeline" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/scrubbing" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/storing" @@ -49,17 +47,8 @@ func main() { lambda.Start(handleRequest(cfg)) } -func handleRequest(cfg *config.Config) func(ctx context.Context, event json.RawMessage) error { - return func(ctx context.Context, event json.RawMessage) error { - parsed, err := parsing.Parse(event) - if err != nil { - return fmt.Errorf("parse: %w", err) - } - - if len(parsed) == 0 { - return errors.New("no events to process") - } - +func handleRequest(cfg *config.Config) func(ctx context.Context, awsevent json.RawMessage) (any, error) { + return func(ctx context.Context, awsevent json.RawMessage) (any, error) { filterer := filtering.NewFilterer(cfg.FilterInclude, cfg.FilterExclude) scrubber := scrubbing.NewScrubber(cfg.ScrubbingRegex, cfg.ScrubbingReplacement, cfg.ScrubIP, cfg.ScrubEmail) handlerCfg := handling.Config{ @@ -69,7 +58,6 @@ func handleRequest(cfg *config.Config) func(ctx context.Context, event json.RawM Tags: cfg.Tags, S3MultilineLogRegex: cfg.S3MultilineLogRegex, } - forwarderCfg := forwarding.Config{ APIKey: cfg.APIKey, IntakeURL: cfg.IntakeURL, @@ -77,10 +65,11 @@ func handleRequest(cfg *config.Config) func(ctx context.Context, event json.RawM } var storage storing.Storage + var storageErr error if cfg.StoreOnFail { storageOpts := storing.Options{S3Bucket: cfg.S3RetryBucketName} - if storage, err = storing.NewStorage(ctx, storageOpts); err != nil { - return fmt.Errorf("new storage: %w", err) + if storage, storageErr = storing.NewStorage(ctx, storageOpts); storageErr != nil { + return nil, fmt.Errorf("new storage: %w", storageErr) } } @@ -90,6 +79,6 @@ func handleRequest(cfg *config.Config) func(ctx context.Context, event json.RawM storage, ) - return pipeline.New(handlerCfg, scrubber, filterer, forwarder).Start(ctx, parsed) + return pipeline.New(handlerCfg, scrubber, filterer, forwarder).Start(ctx, awsevent) } } diff --git a/aws/logs_monitoring_go/internal/forwarding/forwarding.go b/aws/logs_monitoring_go/internal/forwarding/forwarding.go index c30ee8bbf..bf44b0480 100644 --- a/aws/logs_monitoring_go/internal/forwarding/forwarding.go +++ b/aws/logs_monitoring_go/internal/forwarding/forwarding.go @@ -122,7 +122,7 @@ func (f *Forwarder) Retry(ctx context.Context) error { for _, key := range keys { payload, storageTag, getErr := f.storage.Get(ctx, key) if getErr != nil { - return fmt.Errorf("list: %w", getErr) + return fmt.Errorf("get: %w", getErr) } if sendErr := f.send(ctx, payload, storageTag); sendErr != nil { diff --git a/aws/logs_monitoring_go/internal/handling/handler.go b/aws/logs_monitoring_go/internal/handling/handler.go index 92376b70a..5bd1a02c0 100644 --- a/aws/logs_monitoring_go/internal/handling/handler.go +++ b/aws/logs_monitoring_go/internal/handling/handler.go @@ -30,7 +30,7 @@ type Config struct { S3MultilineLogRegex *regexp.Regexp } -func NewHandler(ctx context.Context, hcfg Config, scrubber *scrubbing.Scrubber, filterer *filtering.Filterer, ct parsing.ContentType) (Handler, error) { +func NewHandler(hcfg Config, scrubber *scrubbing.Scrubber, filterer *filtering.Filterer, ct parsing.ContentType) (Handler, error) { switch ct { case parsing.ContentTypeCloudwatchLogs: diff --git a/aws/logs_monitoring_go/internal/parsing/content.go b/aws/logs_monitoring_go/internal/parsing/content.go index 8818e7d62..9a98d62e3 100644 --- a/aws/logs_monitoring_go/internal/parsing/content.go +++ b/aws/logs_monitoring_go/internal/parsing/content.go @@ -21,7 +21,6 @@ const ( ) type Event struct { - ContentType ContentType - Payload json.RawMessage - SQSReceiptHandle string + ContentType ContentType + Payload json.RawMessage } diff --git a/aws/logs_monitoring_go/internal/parsing/eventbridge.go b/aws/logs_monitoring_go/internal/parsing/eventbridge.go index 823fd8e23..96cc5bcfd 100644 --- a/aws/logs_monitoring_go/internal/parsing/eventbridge.go +++ b/aws/logs_monitoring_go/internal/parsing/eventbridge.go @@ -22,26 +22,26 @@ type s3EventBridgeDetail struct { } `json:"object"` } -func eventBridge(event json.RawMessage) ([]Event, error) { +func eventBridge(event json.RawMessage) (Event, error) { var eventBridgeEvent events.EventBridgeEvent if err := json.Unmarshal(event, &eventBridgeEvent); err != nil { - return nil, fmt.Errorf("unmarshal: %w", err) + return Event{}, fmt.Errorf("unmarshal: %w", err) } if eventBridgeEvent.Source == "aws.s3" && strings.Contains(eventBridgeEvent.DetailType, "Object Created") { var s3eb s3EventBridgeDetail if err := json.Unmarshal(eventBridgeEvent.Detail, &s3eb); err != nil { - return nil, fmt.Errorf("unmarshal: %w", err) + return Event{}, fmt.Errorf("unmarshal: %w", err) } payload, err := mapS3EventBridge(s3eb) if err != nil { - return nil, err + return Event{}, err } - return []Event{{ContentType: ContentTypeS3, Payload: payload}}, nil + return Event{ContentType: ContentTypeS3, Payload: payload}, nil } - return []Event{{ContentType: ContentTypeEventBridge, Payload: event}}, nil + return Event{ContentType: ContentTypeEventBridge, Payload: event}, nil } func mapS3EventBridge(eb s3EventBridgeDetail) (json.RawMessage, error) { diff --git a/aws/logs_monitoring_go/internal/parsing/parse.go b/aws/logs_monitoring_go/internal/parsing/parse.go index 537771cd6..f12b5b6bd 100644 --- a/aws/logs_monitoring_go/internal/parsing/parse.go +++ b/aws/logs_monitoring_go/internal/parsing/parse.go @@ -20,67 +20,64 @@ const ( type eventDiscriminator struct { AWSLogs json.RawMessage `json:"awslogs"` // CloudWatch logs + recordsDiscriminator + Detail json.RawMessage `json:"detail"` // EventBridge + Retry json.RawMessage `json:"retry"` +} + +type recordsDiscriminator struct { Records []struct { EventSource string `json:"eventSource"` } `json:"Records"` // S3, SQS, SNS - Detail json.RawMessage `json:"detail"` // EventBridge - Retry json.RawMessage `json:"retry"` } -func Parse(event json.RawMessage) ([]Event, error) { +func Parse(event json.RawMessage) (Event, error) { var disc eventDiscriminator if err := json.Unmarshal(event, &disc); err != nil { - return nil, fmt.Errorf("unmarshal: %w", err) + return Event{}, fmt.Errorf("unmarshal: %w", err) } switch { case disc.AWSLogs != nil: - return []Event{{ContentType: ContentTypeCloudwatchLogs, Payload: event}}, nil + return Event{ContentType: ContentTypeCloudwatchLogs, Payload: event}, nil case disc.Retry != nil: - return []Event{{ContentType: ContentTypeRetry}}, nil + return Event{ContentType: ContentTypeRetry}, nil case len(disc.Records) > 0: - parsed, err := records(event, disc) + event, err := records(event, disc) if err != nil { - return nil, fmt.Errorf("records: %w", err) + return Event{}, fmt.Errorf("records: %w", err) } - return parsed, nil + return event, nil case disc.Detail != nil: - parsed, err := eventBridge(event) + event, err := eventBridge(event) if err != nil { - return nil, fmt.Errorf("eventbridge: %w", err) + return Event{}, fmt.Errorf("eventbridge: %w", err) } - return parsed, nil + return event, nil } - return nil, errors.New("unsupported event") + return Event{}, errors.New("unsupported event") } -func records(event json.RawMessage, disc eventDiscriminator) ([]Event, error) { +func records(event json.RawMessage, disc eventDiscriminator) (Event, error) { switch disc.Records[0].EventSource { case eventSourceS3: - return []Event{{ContentType: ContentTypeS3, Payload: event}}, nil + return Event{ContentType: ContentTypeS3, Payload: event}, nil case eventSourceKinesis: - return []Event{{ContentType: ContentTypeKinesis, Payload: event}}, nil - - case eventSourceSQS: - parsed, err := sqs(event) - if err != nil { - return nil, fmt.Errorf("sqs: %w", err) - } - return parsed, nil + return Event{ContentType: ContentTypeKinesis, Payload: event}, nil case eventSourceSNS: - parsed, err := sns(event) + event, err := sns(event) if err != nil { - return nil, fmt.Errorf("sns: %w", err) + return Event{}, fmt.Errorf("sns: %w", err) } - return parsed, nil + return event, nil default: - return nil, fmt.Errorf("unsupported event source %q", disc.Records[0].EventSource) + return Event{}, fmt.Errorf("unsupported event source %q", disc.Records[0].EventSource) } } diff --git a/aws/logs_monitoring_go/internal/parsing/parse_test.go b/aws/logs_monitoring_go/internal/parsing/parse_test.go index 2566d5604..a5b108a9b 100644 --- a/aws/logs_monitoring_go/internal/parsing/parse_test.go +++ b/aws/logs_monitoring_go/internal/parsing/parse_test.go @@ -18,102 +18,130 @@ func TestParse(t *testing.T) { tests := map[string]struct { event json.RawMessage - want []ContentType + want ContentType wantErr bool }{ "cloudwatch logs": { event: json.RawMessage(`{"awslogs":{"data":"dGVzdA=="}}`), - want: []ContentType{ContentTypeCloudwatchLogs}, + want: ContentTypeCloudwatchLogs, }, "s3": { event: json.RawMessage(`{"Records":[{"eventSource":"aws:s3","s3":{"bucket":{"name":"b"},"object":{"key":"k"}}}]}`), - want: []ContentType{ContentTypeS3}, + want: ContentTypeS3, }, "kinesis": { event: json.RawMessage(`{"Records":[{"eventSource":"aws:kinesis","kinesis":{"data":"dGVzdA=="}}]}`), - want: []ContentType{ContentTypeKinesis}, + want: ContentTypeKinesis, }, "eventbridge generic": { event: json.RawMessage(`{"version":"0","id":"abc","detail-type":"Scheduled Event","source":"aws.events","detail":{}}`), - want: []ContentType{ContentTypeEventBridge}, + want: ContentTypeEventBridge, }, "eventbridge s3": { event: json.RawMessage(`{"version":"0","detail-type":"Object Created","source":"aws.s3","detail":{"bucket":{"name":"my-bucket"},"object":{"key":"my-key"}}}`), - want: []ContentType{ContentTypeS3}, + want: ContentTypeS3, }, "eventbridge ec2": { event: json.RawMessage(`{"version":"0","detail-type":"EC2 Instance State-change Notification","source":"aws.ec2","detail":{"instance-id":"i-123"}}`), - want: []ContentType{ContentTypeEventBridge}, + want: ContentTypeEventBridge, }, "eventbridge s3 without object created": { event: json.RawMessage(`{"version":"0","detail-type":"Object Deleted","source":"aws.s3","detail":{}}`), - want: []ContentType{ContentTypeEventBridge}, + want: ContentTypeEventBridge, }, "sns with s3": { event: json.RawMessage(`{"Records":[{"EventSource":"aws:sns","Sns":{"Type":"Notification","Message":"{\"Records\":[{\"eventSource\":\"aws:s3\",\"s3\":{\"bucket\":{\"name\":\"b\"},\"object\":{\"key\":\"k\"}}}]}"}}]}`), - want: []ContentType{ContentTypeS3}, + want: ContentTypeS3, }, "sns standalone": { event: json.RawMessage(`{"Records":[{"EventSource":"aws:sns","Sns":{"Type":"Notification","Message":"hello world","TopicArn":"arn:aws:sns:us-east-1:123456789012:my-topic"}}]}`), - want: []ContentType{ContentTypeSNS}, + want: ContentTypeSNS, + }, + "empty object": { + event: json.RawMessage(`{}`), + wantErr: true, + }, + "unsupported source": { + event: json.RawMessage(`{"Records":[{"eventSource":"aws:dynamodb"}]}`), + wantErr: true, + }, + "not JSON": { + event: json.RawMessage(`not json`), + wantErr: true, }, - "sqs with direct s3": { + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + t.Parallel() + + got, err := Parse(tc.event) + + if tc.wantErr { + require.Error(t, err) + return + } + + require.NoError(t, err) + assert.Equal(t, tc.want, got.ContentType) + assert.NotEmpty(t, got.Payload) + }) + } +} + +func TestSQS(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + event json.RawMessage + want []ContentType + wantErr bool + }{ + "direct s3": { event: json.RawMessage(`{"Records":[{"eventSource":"aws:sqs","body":"{\"Records\":[{\"eventSource\":\"aws:s3\",\"s3\":{\"bucket\":{\"name\":\"b\"},\"object\":{\"key\":\"k\"}}}]}"}]}`), want: []ContentType{ContentTypeS3}, }, - "sqs with sns s3": { + "sns s3": { event: json.RawMessage(`{"Records":[{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"eventSource\\\":\\\"aws:s3\\\",\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b\\\"},\\\"object\\\":{\\\"key\\\":\\\"k\\\"}}}]}\"}"}]}`), want: []ContentType{ContentTypeS3}, }, - "sqs with multiple records": { + "multiple records": { event: json.RawMessage(`{"Records":[` + `{"eventSource":"aws:sqs","body":"{\"Records\":[{\"eventSource\":\"aws:s3\",\"s3\":{\"bucket\":{\"name\":\"b1\"},\"object\":{\"key\":\"k1\"}}}]}"},` + `{"eventSource":"aws:sqs","body":"{\"Records\":[{\"eventSource\":\"aws:s3\",\"s3\":{\"bucket\":{\"name\":\"b2\"},\"object\":{\"key\":\"k2\"}}}]}"}` + `]}`), want: []ContentType{ContentTypeS3, ContentTypeS3}, }, - "sqs with sns standalone": { + "sns standalone": { event: json.RawMessage(`{"Records":[{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"hello world\"}"}]}`), want: []ContentType{ContentTypeSNS}, }, - "sqs with subscription confirmation skipped": { + "subscription confirmation skipped": { event: json.RawMessage(`{"Records":[{"eventSource":"aws:sqs","body":"{\"Type\":\"SubscriptionConfirmation\",\"Message\":\"confirm\"}"}]}`), wantErr: true, }, - "sqs mixed valid and unrecognized": { + "mixed valid and unrecognized": { event: json.RawMessage(`{"Records":[` + `{"eventSource":"aws:sqs","body":"{\"foo\":\"bar\"}"},` + `{"eventSource":"aws:sqs","body":"{\"Records\":[{\"eventSource\":\"aws:s3\",\"s3\":{\"bucket\":{\"name\":\"b\"},\"object\":{\"key\":\"k\"}}}]}"}` + `]}`), wantErr: true, }, - "sqs with extra fields after body": { + "extra fields after body": { event: json.RawMessage(`{"Records":[{"eventSource":"aws:sqs","body":"{\"Records\":[{\"eventSource\":\"aws:s3\",\"s3\":{\"bucket\":{\"name\":\"b\"},\"object\":{\"key\":\"k\"}}}]}","messageId":"abc","receiptHandle":"xyz","attributes":{"ApproximateReceiveCount":"1"}}]}`), want: []ContentType{ContentTypeS3}, }, - "sqs with malformed body json": { + "malformed body json": { event: json.RawMessage(`{"Records":[{"eventSource":"aws:sqs","body":"not json"}]}`), wantErr: true, }, - "empty object": { - event: json.RawMessage(`{}`), - wantErr: true, - }, - "unsupported source": { - event: json.RawMessage(`{"Records":[{"eventSource":"aws:dynamodb"}]}`), - wantErr: true, - }, - "not JSON": { - event: json.RawMessage(`not json`), - wantErr: true, - }, } for name, tc := range tests { t.Run(name, func(t *testing.T) { t.Parallel() - got, err := Parse(tc.event) + got, err := SQS(tc.event) if tc.wantErr { require.Error(t, err) @@ -123,9 +151,9 @@ func TestParse(t *testing.T) { require.NoError(t, err) require.Len(t, got, len(tc.want)) - for i, pe := range got { - assert.Equal(t, tc.want[i], pe.ContentType) - assert.NotEmpty(t, pe.Payload) + for i, se := range got { + assert.Equal(t, tc.want[i], se.ContentType) + assert.NotEmpty(t, se.Payload) } }) } diff --git a/aws/logs_monitoring_go/internal/parsing/sns.go b/aws/logs_monitoring_go/internal/parsing/sns.go index 19331edb6..654f55bbe 100644 --- a/aws/logs_monitoring_go/internal/parsing/sns.go +++ b/aws/logs_monitoring_go/internal/parsing/sns.go @@ -12,24 +12,22 @@ import ( "github.com/aws/aws-lambda-go/events" ) -func sns(event json.RawMessage) ([]Event, error) { +func sns(event json.RawMessage) (Event, error) { var snsEvent events.SNSEvent if err := json.Unmarshal(event, &snsEvent); err != nil { - return nil, fmt.Errorf("unmarshal: %w", err) + return Event{}, fmt.Errorf("unmarshal: %w", err) } - var events []Event - for _, record := range snsEvent.Records { - inner := json.RawMessage(record.SNS.Message) + inner := json.RawMessage(snsEvent.Records[0].SNS.Message) - var disc eventDiscriminator - if err := json.Unmarshal(inner, &disc); err == nil && len(disc.Records) > 0 && disc.Records[0].EventSource == eventSourceS3 { - events = append(events, Event{ContentType: ContentTypeS3, Payload: inner}) - continue - } + var disc recordsDiscriminator + if err := json.Unmarshal(inner, &disc); err != nil { + return Event{ContentType: ContentTypeSNS, Payload: inner}, nil + } - events = append(events, Event{ContentType: ContentTypeSNS, Payload: event}) + if len(disc.Records) > 0 && disc.Records[0].EventSource == eventSourceS3 { + return Event{ContentType: ContentTypeS3, Payload: inner}, nil } - return events, nil + return Event{}, fmt.Errorf("unknown event: %q", disc.Records[0].EventSource) } diff --git a/aws/logs_monitoring_go/internal/parsing/sqs.go b/aws/logs_monitoring_go/internal/parsing/sqs.go index 99955774a..86674c93f 100644 --- a/aws/logs_monitoring_go/internal/parsing/sqs.go +++ b/aws/logs_monitoring_go/internal/parsing/sqs.go @@ -7,33 +7,43 @@ package parsing import ( "encoding/json" - "errors" "fmt" - "github.com/aws/aws-lambda-go/events" + awsevents "github.com/aws/aws-lambda-go/events" ) +type SQSEvent struct { + Event + SQSReceiptHandle string +} + type sqsBodyDiscriminator struct { Type string `json:"Type"` // SNS inside SQS when raw message delivery disabled. See https://docs.aws.amazon.com/sns/latest/dg/sns-large-payload-raw-message-delivery.html Message string `json:"Message"` // SNS inside SQS when raw message delivery disabled. See https://docs.aws.amazon.com/sns/latest/dg/sns-large-payload-raw-message-delivery.html - eventDiscriminator + recordsDiscriminator +} + +func IsSQS(event json.RawMessage) bool { + var disc recordsDiscriminator + if err := json.Unmarshal(event, &disc); err != nil { + return false + } + return len(disc.Records) > 0 && disc.Records[0].EventSource == eventSourceSQS } -func sqs(event json.RawMessage) ([]Event, error) { - var sqsEvent events.SQSEvent - if err := json.Unmarshal(event, &sqsEvent); err != nil { +func SQS(awsevent json.RawMessage) (events []SQSEvent, err error) { + var sqsEvent awsevents.SQSEvent + if err := json.Unmarshal(awsevent, &sqsEvent); err != nil { return nil, fmt.Errorf("unmarshal: %w", err) } - var events []Event for _, msg := range sqsEvent.Records { - e, err := sqsBody(msg.Body) + event, err := sqsBody(msg.Body) if err != nil { return nil, err } - e.SQSReceiptHandle = msg.ReceiptHandle - events = append(events, e) + events = append(events, SQSEvent{Event: event, SQSReceiptHandle: msg.ReceiptHandle}) } return events, nil @@ -52,7 +62,7 @@ func sqsBody(body string) (Event, error) { return Event{ContentType: ContentTypeS3, Payload: raw}, nil case disc.Type == "Notification" && disc.Message != "": - var innerDisc eventDiscriminator + var innerDisc recordsDiscriminator if err := json.Unmarshal([]byte(disc.Message), &innerDisc); err == nil && len(innerDisc.Records) > 0 && innerDisc.Records[0].EventSource == eventSourceS3 { return Event{ContentType: ContentTypeS3, Payload: json.RawMessage(disc.Message)}, nil } @@ -60,5 +70,5 @@ func sqsBody(body string) (Event, error) { return Event{ContentType: ContentTypeSNS, Payload: raw}, nil } - return Event{}, errors.New("unknown event") + return Event{}, fmt.Errorf("unknown event") } diff --git a/aws/logs_monitoring_go/internal/pipeline/pipeline.go b/aws/logs_monitoring_go/internal/pipeline/pipeline.go index 33e6df027..e90cdfffc 100644 --- a/aws/logs_monitoring_go/internal/pipeline/pipeline.go +++ b/aws/logs_monitoring_go/internal/pipeline/pipeline.go @@ -7,6 +7,7 @@ package pipeline import ( "context" + "encoding/json" "errors" "fmt" @@ -17,6 +18,8 @@ import ( "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/parsing" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/scrubbing" "golang.org/x/sync/errgroup" + + awsevents "github.com/aws/aws-lambda-go/events" ) type Pipeline struct { @@ -31,8 +34,8 @@ func New( scrubber *scrubbing.Scrubber, filterer *filtering.Filterer, forwarder *forwarding.Forwarder, -) *Pipeline { - return &Pipeline{ +) Pipeline { + return Pipeline{ hcfg: hcfg, scrubber: scrubber, filterer: filterer, @@ -40,40 +43,70 @@ func New( } } -func (p *Pipeline) Start( - ctx context.Context, - events []parsing.Event, -) error { - contentType := events[0].ContentType - if contentType == parsing.ContentTypeRetry { - if err := p.forwarder.Retry(ctx); err != nil { - return fmt.Errorf("retry: %w", err) +func (p Pipeline) Start(ctx context.Context, awsevent json.RawMessage) (any, error) { + if parsing.IsSQS(awsevent) { + events, err := parsing.SQS(awsevent) + if err != nil { + return nil, err } - return nil + + return p.startSQS(ctx, events) + } + + event, err := parsing.Parse(awsevent) + if err != nil { + return nil, fmt.Errorf("parse: %w", err) + } + + if event.ContentType == parsing.ContentTypeRetry { + return nil, p.forwarder.Retry(ctx) + } + + return nil, p.run(ctx, event) +} + +func (p Pipeline) startSQS(ctx context.Context, events []parsing.SQSEvent) (sqsEventResponse json.RawMessage, err error) { + var response awsevents.SQSEventResponse + for i, event := range events { + err = p.run(ctx, event.Event) + if err == nil { + continue + } + + for _, remaining := range events[i:] { + response.BatchItemFailures = append(response.BatchItemFailures, awsevents.SQSBatchItemFailure{ItemIdentifier: remaining.SQSReceiptHandle}) + } + + break } + sqsEventResponse, marshallErr := json.Marshal(response) + err = errors.Join(err, marshallErr) + + return sqsEventResponse, err +} + +func (p Pipeline) run(ctx context.Context, event parsing.Event) error { eg, ctx := errgroup.WithContext(ctx) entries := make(chan model.LogEntry) - eg.Go(func() error { defer close(entries) - for _, parsedEvent := range events { - handler, err := handling.NewHandler(ctx, p.hcfg, p.scrubber, p.filterer, parsedEvent.ContentType) - if err != nil { - return fmt.Errorf("new handler: %w", err) - } - - if err := handler.Handle(ctx, parsedEvent.Payload, entries); err != nil { - return fmt.Errorf("handle: %w", err) - } + handler, err := handling.NewHandler(p.hcfg, p.scrubber, p.filterer, event.ContentType) + if err != nil { + return fmt.Errorf("new handler: %w", err) + } + + if err := handler.Handle(ctx, event.Payload, entries); err != nil { + return fmt.Errorf("handle: %w", err) } return nil }) - err := p.forwarder.Start(ctx, entries, forwarding.StorageTag(contentType)) + err := p.forwarder.Start(ctx, entries, forwarding.StorageTag(event.ContentType)) if waitErr := eg.Wait(); waitErr != nil { return errors.Join(err, waitErr) } + return err }