Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 6 additions & 17 deletions aws/logs_monitoring_go/cmd/forwarder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"log/slog"
Expand All @@ -19,7 +18,6 @@ import (
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/forwarding"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/handling"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/httpclient"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/parsing"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/pipeline"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/scrubbing"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/storing"
Expand Down Expand Up @@ -49,17 +47,8 @@ func main() {
lambda.Start(handleRequest(cfg))
}

func handleRequest(cfg *config.Config) func(ctx context.Context, event json.RawMessage) error {
return func(ctx context.Context, event json.RawMessage) error {
parsed, err := parsing.Parse(event)
if err != nil {
return fmt.Errorf("parse: %w", err)
}

if len(parsed) == 0 {
return errors.New("no events to process")
}

func handleRequest(cfg *config.Config) func(ctx context.Context, awsevent json.RawMessage) (any, error) {
return func(ctx context.Context, awsevent json.RawMessage) (any, error) {
filterer := filtering.NewFilterer(cfg.FilterInclude, cfg.FilterExclude)
scrubber := scrubbing.NewScrubber(cfg.ScrubbingRegex, cfg.ScrubbingReplacement, cfg.ScrubIP, cfg.ScrubEmail)
handlerCfg := handling.Config{
Expand All @@ -69,18 +58,18 @@ func handleRequest(cfg *config.Config) func(ctx context.Context, event json.RawM
Tags: cfg.Tags,
S3MultilineLogRegex: cfg.S3MultilineLogRegex,
}

forwarderCfg := forwarding.Config{
APIKey: cfg.APIKey,
IntakeURL: cfg.IntakeURL,
CompressionLevel: cfg.CompressionLevel,
}

var storage storing.Storage
var storageErr error
if cfg.StoreOnFail {
storageOpts := storing.Options{S3Bucket: cfg.S3RetryBucketName}
if storage, err = storing.NewStorage(ctx, storageOpts); err != nil {
return fmt.Errorf("new storage: %w", err)
if storage, storageErr = storing.NewStorage(ctx, storageOpts); storageErr != nil {
return nil, fmt.Errorf("new storage: %w", storageErr)
}
}

Expand All @@ -90,6 +79,6 @@ func handleRequest(cfg *config.Config) func(ctx context.Context, event json.RawM
storage,
)

return pipeline.New(handlerCfg, scrubber, filterer, forwarder).Start(ctx, parsed)
return pipeline.New(handlerCfg, scrubber, filterer, forwarder).Start(ctx, awsevent)
}
}
2 changes: 1 addition & 1 deletion aws/logs_monitoring_go/internal/forwarding/forwarding.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (f *Forwarder) Retry(ctx context.Context) error {
for _, key := range keys {
payload, storageTag, getErr := f.storage.Get(ctx, key)
if getErr != nil {
return fmt.Errorf("list: %w", getErr)
return fmt.Errorf("get: %w", getErr)
}

if sendErr := f.send(ctx, payload, storageTag); sendErr != nil {
Expand Down
2 changes: 1 addition & 1 deletion aws/logs_monitoring_go/internal/handling/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type Config struct {
S3MultilineLogRegex *regexp.Regexp
}

func NewHandler(ctx context.Context, hcfg Config, scrubber *scrubbing.Scrubber, filterer *filtering.Filterer, ct parsing.ContentType) (Handler, error) {
func NewHandler(hcfg Config, scrubber *scrubbing.Scrubber, filterer *filtering.Filterer, ct parsing.ContentType) (Handler, error) {
switch ct {

case parsing.ContentTypeCloudwatchLogs:
Expand Down
5 changes: 2 additions & 3 deletions aws/logs_monitoring_go/internal/parsing/content.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ const (
)

type Event struct {
ContentType ContentType
Payload json.RawMessage
SQSReceiptHandle string
ContentType ContentType
Payload json.RawMessage
}
12 changes: 6 additions & 6 deletions aws/logs_monitoring_go/internal/parsing/eventbridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,26 @@ type s3EventBridgeDetail struct {
} `json:"object"`
}

func eventBridge(event json.RawMessage) ([]Event, error) {
func eventBridge(event json.RawMessage) (Event, error) {
var eventBridgeEvent events.EventBridgeEvent
if err := json.Unmarshal(event, &eventBridgeEvent); err != nil {
return nil, fmt.Errorf("unmarshal: %w", err)
return Event{}, fmt.Errorf("unmarshal: %w", err)
}

if eventBridgeEvent.Source == "aws.s3" && strings.Contains(eventBridgeEvent.DetailType, "Object Created") {
var s3eb s3EventBridgeDetail
if err := json.Unmarshal(eventBridgeEvent.Detail, &s3eb); err != nil {
return nil, fmt.Errorf("unmarshal: %w", err)
return Event{}, fmt.Errorf("unmarshal: %w", err)
}

payload, err := mapS3EventBridge(s3eb)
if err != nil {
return nil, err
return Event{}, err
}

return []Event{{ContentType: ContentTypeS3, Payload: payload}}, nil
return Event{ContentType: ContentTypeS3, Payload: payload}, nil
}
return []Event{{ContentType: ContentTypeEventBridge, Payload: event}}, nil
return Event{ContentType: ContentTypeEventBridge, Payload: event}, nil
}

func mapS3EventBridge(eb s3EventBridgeDetail) (json.RawMessage, error) {
Expand Down
51 changes: 24 additions & 27 deletions aws/logs_monitoring_go/internal/parsing/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,67 +20,64 @@ const (

type eventDiscriminator struct {
AWSLogs json.RawMessage `json:"awslogs"` // CloudWatch logs
recordsDiscriminator
Detail json.RawMessage `json:"detail"` // EventBridge
Retry json.RawMessage `json:"retry"`
}

type recordsDiscriminator struct {
Records []struct {
EventSource string `json:"eventSource"`
} `json:"Records"` // S3, SQS, SNS
Detail json.RawMessage `json:"detail"` // EventBridge
Retry json.RawMessage `json:"retry"`
}

func Parse(event json.RawMessage) ([]Event, error) {
func Parse(event json.RawMessage) (Event, error) {
var disc eventDiscriminator
if err := json.Unmarshal(event, &disc); err != nil {
return nil, fmt.Errorf("unmarshal: %w", err)
return Event{}, fmt.Errorf("unmarshal: %w", err)
}

switch {
case disc.AWSLogs != nil:
return []Event{{ContentType: ContentTypeCloudwatchLogs, Payload: event}}, nil
return Event{ContentType: ContentTypeCloudwatchLogs, Payload: event}, nil

case disc.Retry != nil:
return []Event{{ContentType: ContentTypeRetry}}, nil
return Event{ContentType: ContentTypeRetry}, nil

case len(disc.Records) > 0:
parsed, err := records(event, disc)
event, err := records(event, disc)
if err != nil {
return nil, fmt.Errorf("records: %w", err)
return Event{}, fmt.Errorf("records: %w", err)
}
return parsed, nil
return event, nil

case disc.Detail != nil:
parsed, err := eventBridge(event)
event, err := eventBridge(event)
if err != nil {
return nil, fmt.Errorf("eventbridge: %w", err)
return Event{}, fmt.Errorf("eventbridge: %w", err)
}
return parsed, nil
return event, nil
}

return nil, errors.New("unsupported event")
return Event{}, errors.New("unsupported event")
}

func records(event json.RawMessage, disc eventDiscriminator) ([]Event, error) {
func records(event json.RawMessage, disc eventDiscriminator) (Event, error) {
switch disc.Records[0].EventSource {
case eventSourceS3:
return []Event{{ContentType: ContentTypeS3, Payload: event}}, nil
return Event{ContentType: ContentTypeS3, Payload: event}, nil

case eventSourceKinesis:
return []Event{{ContentType: ContentTypeKinesis, Payload: event}}, nil

case eventSourceSQS:
parsed, err := sqs(event)
if err != nil {
return nil, fmt.Errorf("sqs: %w", err)
}
return parsed, nil
return Event{ContentType: ContentTypeKinesis, Payload: event}, nil

case eventSourceSNS:
parsed, err := sns(event)
event, err := sns(event)
if err != nil {
return nil, fmt.Errorf("sns: %w", err)
return Event{}, fmt.Errorf("sns: %w", err)
}
return parsed, nil
return event, nil

default:
return nil, fmt.Errorf("unsupported event source %q", disc.Records[0].EventSource)
return Event{}, fmt.Errorf("unsupported event source %q", disc.Records[0].EventSource)
}
}
96 changes: 62 additions & 34 deletions aws/logs_monitoring_go/internal/parsing/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,102 +18,130 @@ func TestParse(t *testing.T) {

tests := map[string]struct {
event json.RawMessage
want []ContentType
want ContentType
wantErr bool
}{
"cloudwatch logs": {
event: json.RawMessage(`{"awslogs":{"data":"dGVzdA=="}}`),
want: []ContentType{ContentTypeCloudwatchLogs},
want: ContentTypeCloudwatchLogs,
},
"s3": {
event: json.RawMessage(`{"Records":[{"eventSource":"aws:s3","s3":{"bucket":{"name":"b"},"object":{"key":"k"}}}]}`),
want: []ContentType{ContentTypeS3},
want: ContentTypeS3,
},
"kinesis": {
event: json.RawMessage(`{"Records":[{"eventSource":"aws:kinesis","kinesis":{"data":"dGVzdA=="}}]}`),
want: []ContentType{ContentTypeKinesis},
want: ContentTypeKinesis,
},
"eventbridge generic": {
event: json.RawMessage(`{"version":"0","id":"abc","detail-type":"Scheduled Event","source":"aws.events","detail":{}}`),
want: []ContentType{ContentTypeEventBridge},
want: ContentTypeEventBridge,
},
"eventbridge s3": {
event: json.RawMessage(`{"version":"0","detail-type":"Object Created","source":"aws.s3","detail":{"bucket":{"name":"my-bucket"},"object":{"key":"my-key"}}}`),
want: []ContentType{ContentTypeS3},
want: ContentTypeS3,
},
"eventbridge ec2": {
event: json.RawMessage(`{"version":"0","detail-type":"EC2 Instance State-change Notification","source":"aws.ec2","detail":{"instance-id":"i-123"}}`),
want: []ContentType{ContentTypeEventBridge},
want: ContentTypeEventBridge,
},
"eventbridge s3 without object created": {
event: json.RawMessage(`{"version":"0","detail-type":"Object Deleted","source":"aws.s3","detail":{}}`),
want: []ContentType{ContentTypeEventBridge},
want: ContentTypeEventBridge,
},
"sns with s3": {
event: json.RawMessage(`{"Records":[{"EventSource":"aws:sns","Sns":{"Type":"Notification","Message":"{\"Records\":[{\"eventSource\":\"aws:s3\",\"s3\":{\"bucket\":{\"name\":\"b\"},\"object\":{\"key\":\"k\"}}}]}"}}]}`),
want: []ContentType{ContentTypeS3},
want: ContentTypeS3,
},
"sns standalone": {
event: json.RawMessage(`{"Records":[{"EventSource":"aws:sns","Sns":{"Type":"Notification","Message":"hello world","TopicArn":"arn:aws:sns:us-east-1:123456789012:my-topic"}}]}`),
want: []ContentType{ContentTypeSNS},
want: ContentTypeSNS,
},
"empty object": {
event: json.RawMessage(`{}`),
wantErr: true,
},
"unsupported source": {
event: json.RawMessage(`{"Records":[{"eventSource":"aws:dynamodb"}]}`),
wantErr: true,
},
"not JSON": {
event: json.RawMessage(`not json`),
wantErr: true,
},
"sqs with direct s3": {
}

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
t.Parallel()

got, err := Parse(tc.event)

if tc.wantErr {
require.Error(t, err)
return
}

require.NoError(t, err)
assert.Equal(t, tc.want, got.ContentType)
assert.NotEmpty(t, got.Payload)
})
}
}

func TestSQS(t *testing.T) {
t.Parallel()

tests := map[string]struct {
event json.RawMessage
want []ContentType
wantErr bool
}{
"direct s3": {
event: json.RawMessage(`{"Records":[{"eventSource":"aws:sqs","body":"{\"Records\":[{\"eventSource\":\"aws:s3\",\"s3\":{\"bucket\":{\"name\":\"b\"},\"object\":{\"key\":\"k\"}}}]}"}]}`),
want: []ContentType{ContentTypeS3},
},
"sqs with sns s3": {
"sns s3": {
event: json.RawMessage(`{"Records":[{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"eventSource\\\":\\\"aws:s3\\\",\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b\\\"},\\\"object\\\":{\\\"key\\\":\\\"k\\\"}}}]}\"}"}]}`),
want: []ContentType{ContentTypeS3},
},
"sqs with multiple records": {
"multiple records": {
event: json.RawMessage(`{"Records":[` +
`{"eventSource":"aws:sqs","body":"{\"Records\":[{\"eventSource\":\"aws:s3\",\"s3\":{\"bucket\":{\"name\":\"b1\"},\"object\":{\"key\":\"k1\"}}}]}"},` +
`{"eventSource":"aws:sqs","body":"{\"Records\":[{\"eventSource\":\"aws:s3\",\"s3\":{\"bucket\":{\"name\":\"b2\"},\"object\":{\"key\":\"k2\"}}}]}"}` +
`]}`),
want: []ContentType{ContentTypeS3, ContentTypeS3},
},
"sqs with sns standalone": {
"sns standalone": {
event: json.RawMessage(`{"Records":[{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"hello world\"}"}]}`),
want: []ContentType{ContentTypeSNS},
},
"sqs with subscription confirmation skipped": {
"subscription confirmation skipped": {
event: json.RawMessage(`{"Records":[{"eventSource":"aws:sqs","body":"{\"Type\":\"SubscriptionConfirmation\",\"Message\":\"confirm\"}"}]}`),
wantErr: true,
},
"sqs mixed valid and unrecognized": {
"mixed valid and unrecognized": {
event: json.RawMessage(`{"Records":[` +
`{"eventSource":"aws:sqs","body":"{\"foo\":\"bar\"}"},` +
`{"eventSource":"aws:sqs","body":"{\"Records\":[{\"eventSource\":\"aws:s3\",\"s3\":{\"bucket\":{\"name\":\"b\"},\"object\":{\"key\":\"k\"}}}]}"}` +
`]}`),
wantErr: true,
},
"sqs with extra fields after body": {
"extra fields after body": {
event: json.RawMessage(`{"Records":[{"eventSource":"aws:sqs","body":"{\"Records\":[{\"eventSource\":\"aws:s3\",\"s3\":{\"bucket\":{\"name\":\"b\"},\"object\":{\"key\":\"k\"}}}]}","messageId":"abc","receiptHandle":"xyz","attributes":{"ApproximateReceiveCount":"1"}}]}`),
want: []ContentType{ContentTypeS3},
},
"sqs with malformed body json": {
"malformed body json": {
event: json.RawMessage(`{"Records":[{"eventSource":"aws:sqs","body":"not json"}]}`),
wantErr: true,
},
"empty object": {
event: json.RawMessage(`{}`),
wantErr: true,
},
"unsupported source": {
event: json.RawMessage(`{"Records":[{"eventSource":"aws:dynamodb"}]}`),
wantErr: true,
},
"not JSON": {
event: json.RawMessage(`not json`),
wantErr: true,
},
}

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
t.Parallel()

got, err := Parse(tc.event)
got, err := SQS(tc.event)

if tc.wantErr {
require.Error(t, err)
Expand All @@ -123,9 +151,9 @@ func TestParse(t *testing.T) {
require.NoError(t, err)
require.Len(t, got, len(tc.want))

for i, pe := range got {
assert.Equal(t, tc.want[i], pe.ContentType)
assert.NotEmpty(t, pe.Payload)
for i, se := range got {
assert.Equal(t, tc.want[i], se.ContentType)
assert.NotEmpty(t, se.Payload)
}
})
}
Expand Down
Loading
Loading