Skip to content

Commit 5209713

Browse files
authored
fix(go-forwarder): prevent log duplication on SQS event failure (#1157)
1 parent c1f4f8e commit 5209713

10 files changed

Lines changed: 189 additions & 135 deletions

File tree

aws/logs_monitoring_go/cmd/forwarder/main.go

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ package main
88
import (
99
"context"
1010
"encoding/json"
11-
"errors"
1211
"fmt"
1312
"log"
1413
"log/slog"
@@ -19,7 +18,6 @@ import (
1918
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/forwarding"
2019
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/handling"
2120
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/httpclient"
22-
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/parsing"
2321
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/pipeline"
2422
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/scrubbing"
2523
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/storing"
@@ -49,17 +47,8 @@ func main() {
4947
lambda.Start(handleRequest(cfg))
5048
}
5149

52-
func handleRequest(cfg *config.Config) func(ctx context.Context, event json.RawMessage) error {
53-
return func(ctx context.Context, event json.RawMessage) error {
54-
parsed, err := parsing.Parse(event)
55-
if err != nil {
56-
return fmt.Errorf("parse: %w", err)
57-
}
58-
59-
if len(parsed) == 0 {
60-
return errors.New("no events to process")
61-
}
62-
50+
func handleRequest(cfg *config.Config) func(ctx context.Context, awsevent json.RawMessage) (any, error) {
51+
return func(ctx context.Context, awsevent json.RawMessage) (any, error) {
6352
filterer := filtering.NewFilterer(cfg.FilterInclude, cfg.FilterExclude)
6453
scrubber := scrubbing.NewScrubber(cfg.ScrubbingRegex, cfg.ScrubbingReplacement, cfg.ScrubIP, cfg.ScrubEmail)
6554
handlerCfg := handling.Config{
@@ -69,18 +58,18 @@ func handleRequest(cfg *config.Config) func(ctx context.Context, event json.RawM
6958
Tags: cfg.Tags,
7059
S3MultilineLogRegex: cfg.S3MultilineLogRegex,
7160
}
72-
7361
forwarderCfg := forwarding.Config{
7462
APIKey: cfg.APIKey,
7563
IntakeURL: cfg.IntakeURL,
7664
CompressionLevel: cfg.CompressionLevel,
7765
}
7866

7967
var storage storing.Storage
68+
var storageErr error
8069
if cfg.StoreOnFail {
8170
storageOpts := storing.Options{S3Bucket: cfg.S3RetryBucketName}
82-
if storage, err = storing.NewStorage(ctx, storageOpts); err != nil {
83-
return fmt.Errorf("new storage: %w", err)
71+
if storage, storageErr = storing.NewStorage(ctx, storageOpts); storageErr != nil {
72+
return nil, fmt.Errorf("new storage: %w", storageErr)
8473
}
8574
}
8675

@@ -90,6 +79,6 @@ func handleRequest(cfg *config.Config) func(ctx context.Context, event json.RawM
9079
storage,
9180
)
9281

93-
return pipeline.New(handlerCfg, scrubber, filterer, forwarder).Start(ctx, parsed)
82+
return pipeline.New(handlerCfg, scrubber, filterer, forwarder).Start(ctx, awsevent)
9483
}
9584
}

aws/logs_monitoring_go/internal/forwarding/forwarding.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ func (f *Forwarder) Retry(ctx context.Context) error {
122122
for _, key := range keys {
123123
payload, storageTag, getErr := f.storage.Get(ctx, key)
124124
if getErr != nil {
125-
return fmt.Errorf("list: %w", getErr)
125+
return fmt.Errorf("get: %w", getErr)
126126
}
127127

128128
if sendErr := f.send(ctx, payload, storageTag); sendErr != nil {

aws/logs_monitoring_go/internal/handling/handler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ type Config struct {
3030
S3MultilineLogRegex *regexp.Regexp
3131
}
3232

33-
func NewHandler(ctx context.Context, hcfg Config, scrubber *scrubbing.Scrubber, filterer *filtering.Filterer, ct parsing.ContentType) (Handler, error) {
33+
func NewHandler(hcfg Config, scrubber *scrubbing.Scrubber, filterer *filtering.Filterer, ct parsing.ContentType) (Handler, error) {
3434
switch ct {
3535

3636
case parsing.ContentTypeCloudwatchLogs:

aws/logs_monitoring_go/internal/parsing/content.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ const (
2121
)
2222

2323
type Event struct {
24-
ContentType ContentType
25-
Payload json.RawMessage
26-
SQSReceiptHandle string
24+
ContentType ContentType
25+
Payload json.RawMessage
2726
}

aws/logs_monitoring_go/internal/parsing/eventbridge.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,26 +22,26 @@ type s3EventBridgeDetail struct {
2222
} `json:"object"`
2323
}
2424

25-
func eventBridge(event json.RawMessage) ([]Event, error) {
25+
func eventBridge(event json.RawMessage) (Event, error) {
2626
var eventBridgeEvent events.EventBridgeEvent
2727
if err := json.Unmarshal(event, &eventBridgeEvent); err != nil {
28-
return nil, fmt.Errorf("unmarshal: %w", err)
28+
return Event{}, fmt.Errorf("unmarshal: %w", err)
2929
}
3030

3131
if eventBridgeEvent.Source == "aws.s3" && strings.Contains(eventBridgeEvent.DetailType, "Object Created") {
3232
var s3eb s3EventBridgeDetail
3333
if err := json.Unmarshal(eventBridgeEvent.Detail, &s3eb); err != nil {
34-
return nil, fmt.Errorf("unmarshal: %w", err)
34+
return Event{}, fmt.Errorf("unmarshal: %w", err)
3535
}
3636

3737
payload, err := mapS3EventBridge(s3eb)
3838
if err != nil {
39-
return nil, err
39+
return Event{}, err
4040
}
4141

42-
return []Event{{ContentType: ContentTypeS3, Payload: payload}}, nil
42+
return Event{ContentType: ContentTypeS3, Payload: payload}, nil
4343
}
44-
return []Event{{ContentType: ContentTypeEventBridge, Payload: event}}, nil
44+
return Event{ContentType: ContentTypeEventBridge, Payload: event}, nil
4545
}
4646

4747
func mapS3EventBridge(eb s3EventBridgeDetail) (json.RawMessage, error) {

aws/logs_monitoring_go/internal/parsing/parse.go

Lines changed: 24 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -20,67 +20,64 @@ const (
2020

2121
type eventDiscriminator struct {
2222
AWSLogs json.RawMessage `json:"awslogs"` // CloudWatch logs
23+
recordsDiscriminator
24+
Detail json.RawMessage `json:"detail"` // EventBridge
25+
Retry json.RawMessage `json:"retry"`
26+
}
27+
28+
type recordsDiscriminator struct {
2329
Records []struct {
2430
EventSource string `json:"eventSource"`
2531
} `json:"Records"` // S3, SQS, SNS
26-
Detail json.RawMessage `json:"detail"` // EventBridge
27-
Retry json.RawMessage `json:"retry"`
2832
}
2933

30-
func Parse(event json.RawMessage) ([]Event, error) {
34+
func Parse(event json.RawMessage) (Event, error) {
3135
var disc eventDiscriminator
3236
if err := json.Unmarshal(event, &disc); err != nil {
33-
return nil, fmt.Errorf("unmarshal: %w", err)
37+
return Event{}, fmt.Errorf("unmarshal: %w", err)
3438
}
3539

3640
switch {
3741
case disc.AWSLogs != nil:
38-
return []Event{{ContentType: ContentTypeCloudwatchLogs, Payload: event}}, nil
42+
return Event{ContentType: ContentTypeCloudwatchLogs, Payload: event}, nil
3943

4044
case disc.Retry != nil:
41-
return []Event{{ContentType: ContentTypeRetry}}, nil
45+
return Event{ContentType: ContentTypeRetry}, nil
4246

4347
case len(disc.Records) > 0:
44-
parsed, err := records(event, disc)
48+
event, err := records(event, disc)
4549
if err != nil {
46-
return nil, fmt.Errorf("records: %w", err)
50+
return Event{}, fmt.Errorf("records: %w", err)
4751
}
48-
return parsed, nil
52+
return event, nil
4953

5054
case disc.Detail != nil:
51-
parsed, err := eventBridge(event)
55+
event, err := eventBridge(event)
5256
if err != nil {
53-
return nil, fmt.Errorf("eventbridge: %w", err)
57+
return Event{}, fmt.Errorf("eventbridge: %w", err)
5458
}
55-
return parsed, nil
59+
return event, nil
5660
}
5761

58-
return nil, errors.New("unsupported event")
62+
return Event{}, errors.New("unsupported event")
5963
}
6064

61-
func records(event json.RawMessage, disc eventDiscriminator) ([]Event, error) {
65+
func records(event json.RawMessage, disc eventDiscriminator) (Event, error) {
6266
switch disc.Records[0].EventSource {
6367
case eventSourceS3:
64-
return []Event{{ContentType: ContentTypeS3, Payload: event}}, nil
68+
return Event{ContentType: ContentTypeS3, Payload: event}, nil
6569

6670
case eventSourceKinesis:
67-
return []Event{{ContentType: ContentTypeKinesis, Payload: event}}, nil
68-
69-
case eventSourceSQS:
70-
parsed, err := sqs(event)
71-
if err != nil {
72-
return nil, fmt.Errorf("sqs: %w", err)
73-
}
74-
return parsed, nil
71+
return Event{ContentType: ContentTypeKinesis, Payload: event}, nil
7572

7673
case eventSourceSNS:
77-
parsed, err := sns(event)
74+
event, err := sns(event)
7875
if err != nil {
79-
return nil, fmt.Errorf("sns: %w", err)
76+
return Event{}, fmt.Errorf("sns: %w", err)
8077
}
81-
return parsed, nil
78+
return event, nil
8279

8380
default:
84-
return nil, fmt.Errorf("unsupported event source %q", disc.Records[0].EventSource)
81+
return Event{}, fmt.Errorf("unsupported event source %q", disc.Records[0].EventSource)
8582
}
8683
}

aws/logs_monitoring_go/internal/parsing/parse_test.go

Lines changed: 62 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -18,102 +18,130 @@ func TestParse(t *testing.T) {
1818

1919
tests := map[string]struct {
2020
event json.RawMessage
21-
want []ContentType
21+
want ContentType
2222
wantErr bool
2323
}{
2424
"cloudwatch logs": {
2525
event: json.RawMessage(`{"awslogs":{"data":"dGVzdA=="}}`),
26-
want: []ContentType{ContentTypeCloudwatchLogs},
26+
want: ContentTypeCloudwatchLogs,
2727
},
2828
"s3": {
2929
event: json.RawMessage(`{"Records":[{"eventSource":"aws:s3","s3":{"bucket":{"name":"b"},"object":{"key":"k"}}}]}`),
30-
want: []ContentType{ContentTypeS3},
30+
want: ContentTypeS3,
3131
},
3232
"kinesis": {
3333
event: json.RawMessage(`{"Records":[{"eventSource":"aws:kinesis","kinesis":{"data":"dGVzdA=="}}]}`),
34-
want: []ContentType{ContentTypeKinesis},
34+
want: ContentTypeKinesis,
3535
},
3636
"eventbridge generic": {
3737
event: json.RawMessage(`{"version":"0","id":"abc","detail-type":"Scheduled Event","source":"aws.events","detail":{}}`),
38-
want: []ContentType{ContentTypeEventBridge},
38+
want: ContentTypeEventBridge,
3939
},
4040
"eventbridge s3": {
4141
event: json.RawMessage(`{"version":"0","detail-type":"Object Created","source":"aws.s3","detail":{"bucket":{"name":"my-bucket"},"object":{"key":"my-key"}}}`),
42-
want: []ContentType{ContentTypeS3},
42+
want: ContentTypeS3,
4343
},
4444
"eventbridge ec2": {
4545
event: json.RawMessage(`{"version":"0","detail-type":"EC2 Instance State-change Notification","source":"aws.ec2","detail":{"instance-id":"i-123"}}`),
46-
want: []ContentType{ContentTypeEventBridge},
46+
want: ContentTypeEventBridge,
4747
},
4848
"eventbridge s3 without object created": {
4949
event: json.RawMessage(`{"version":"0","detail-type":"Object Deleted","source":"aws.s3","detail":{}}`),
50-
want: []ContentType{ContentTypeEventBridge},
50+
want: ContentTypeEventBridge,
5151
},
5252
"sns with s3": {
5353
event: json.RawMessage(`{"Records":[{"EventSource":"aws:sns","Sns":{"Type":"Notification","Message":"{\"Records\":[{\"eventSource\":\"aws:s3\",\"s3\":{\"bucket\":{\"name\":\"b\"},\"object\":{\"key\":\"k\"}}}]}"}}]}`),
54-
want: []ContentType{ContentTypeS3},
54+
want: ContentTypeS3,
5555
},
5656
"sns standalone": {
5757
event: json.RawMessage(`{"Records":[{"EventSource":"aws:sns","Sns":{"Type":"Notification","Message":"hello world","TopicArn":"arn:aws:sns:us-east-1:123456789012:my-topic"}}]}`),
58-
want: []ContentType{ContentTypeSNS},
58+
want: ContentTypeSNS,
59+
},
60+
"empty object": {
61+
event: json.RawMessage(`{}`),
62+
wantErr: true,
63+
},
64+
"unsupported source": {
65+
event: json.RawMessage(`{"Records":[{"eventSource":"aws:dynamodb"}]}`),
66+
wantErr: true,
67+
},
68+
"not JSON": {
69+
event: json.RawMessage(`not json`),
70+
wantErr: true,
5971
},
60-
"sqs with direct s3": {
72+
}
73+
74+
for name, tc := range tests {
75+
t.Run(name, func(t *testing.T) {
76+
t.Parallel()
77+
78+
got, err := Parse(tc.event)
79+
80+
if tc.wantErr {
81+
require.Error(t, err)
82+
return
83+
}
84+
85+
require.NoError(t, err)
86+
assert.Equal(t, tc.want, got.ContentType)
87+
assert.NotEmpty(t, got.Payload)
88+
})
89+
}
90+
}
91+
92+
func TestSQS(t *testing.T) {
93+
t.Parallel()
94+
95+
tests := map[string]struct {
96+
event json.RawMessage
97+
want []ContentType
98+
wantErr bool
99+
}{
100+
"direct s3": {
61101
event: json.RawMessage(`{"Records":[{"eventSource":"aws:sqs","body":"{\"Records\":[{\"eventSource\":\"aws:s3\",\"s3\":{\"bucket\":{\"name\":\"b\"},\"object\":{\"key\":\"k\"}}}]}"}]}`),
62102
want: []ContentType{ContentTypeS3},
63103
},
64-
"sqs with sns s3": {
104+
"sns s3": {
65105
event: json.RawMessage(`{"Records":[{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"eventSource\\\":\\\"aws:s3\\\",\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b\\\"},\\\"object\\\":{\\\"key\\\":\\\"k\\\"}}}]}\"}"}]}`),
66106
want: []ContentType{ContentTypeS3},
67107
},
68-
"sqs with multiple records": {
108+
"multiple records": {
69109
event: json.RawMessage(`{"Records":[` +
70110
`{"eventSource":"aws:sqs","body":"{\"Records\":[{\"eventSource\":\"aws:s3\",\"s3\":{\"bucket\":{\"name\":\"b1\"},\"object\":{\"key\":\"k1\"}}}]}"},` +
71111
`{"eventSource":"aws:sqs","body":"{\"Records\":[{\"eventSource\":\"aws:s3\",\"s3\":{\"bucket\":{\"name\":\"b2\"},\"object\":{\"key\":\"k2\"}}}]}"}` +
72112
`]}`),
73113
want: []ContentType{ContentTypeS3, ContentTypeS3},
74114
},
75-
"sqs with sns standalone": {
115+
"sns standalone": {
76116
event: json.RawMessage(`{"Records":[{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"hello world\"}"}]}`),
77117
want: []ContentType{ContentTypeSNS},
78118
},
79-
"sqs with subscription confirmation skipped": {
119+
"subscription confirmation skipped": {
80120
event: json.RawMessage(`{"Records":[{"eventSource":"aws:sqs","body":"{\"Type\":\"SubscriptionConfirmation\",\"Message\":\"confirm\"}"}]}`),
81121
wantErr: true,
82122
},
83-
"sqs mixed valid and unrecognized": {
123+
"mixed valid and unrecognized": {
84124
event: json.RawMessage(`{"Records":[` +
85125
`{"eventSource":"aws:sqs","body":"{\"foo\":\"bar\"}"},` +
86126
`{"eventSource":"aws:sqs","body":"{\"Records\":[{\"eventSource\":\"aws:s3\",\"s3\":{\"bucket\":{\"name\":\"b\"},\"object\":{\"key\":\"k\"}}}]}"}` +
87127
`]}`),
88128
wantErr: true,
89129
},
90-
"sqs with extra fields after body": {
130+
"extra fields after body": {
91131
event: json.RawMessage(`{"Records":[{"eventSource":"aws:sqs","body":"{\"Records\":[{\"eventSource\":\"aws:s3\",\"s3\":{\"bucket\":{\"name\":\"b\"},\"object\":{\"key\":\"k\"}}}]}","messageId":"abc","receiptHandle":"xyz","attributes":{"ApproximateReceiveCount":"1"}}]}`),
92132
want: []ContentType{ContentTypeS3},
93133
},
94-
"sqs with malformed body json": {
134+
"malformed body json": {
95135
event: json.RawMessage(`{"Records":[{"eventSource":"aws:sqs","body":"not json"}]}`),
96136
wantErr: true,
97137
},
98-
"empty object": {
99-
event: json.RawMessage(`{}`),
100-
wantErr: true,
101-
},
102-
"unsupported source": {
103-
event: json.RawMessage(`{"Records":[{"eventSource":"aws:dynamodb"}]}`),
104-
wantErr: true,
105-
},
106-
"not JSON": {
107-
event: json.RawMessage(`not json`),
108-
wantErr: true,
109-
},
110138
}
111139

112140
for name, tc := range tests {
113141
t.Run(name, func(t *testing.T) {
114142
t.Parallel()
115143

116-
got, err := Parse(tc.event)
144+
got, err := SQS(tc.event)
117145

118146
if tc.wantErr {
119147
require.Error(t, err)
@@ -123,9 +151,9 @@ func TestParse(t *testing.T) {
123151
require.NoError(t, err)
124152
require.Len(t, got, len(tc.want))
125153

126-
for i, pe := range got {
127-
assert.Equal(t, tc.want[i], pe.ContentType)
128-
assert.NotEmpty(t, pe.Payload)
154+
for i, se := range got {
155+
assert.Equal(t, tc.want[i], se.ContentType)
156+
assert.NotEmpty(t, se.Payload)
129157
}
130158
})
131159
}

0 commit comments

Comments
 (0)