Skip to content

Commit e102658

Browse files
committed
perf(go-forwarder): benchmarking decoder vs unmarshal on parsing
1 parent df53d6f commit e102658

8 files changed

Lines changed: 372 additions & 7 deletions

File tree

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed
2+
// under the Apache License Version 2.0.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
// Copyright 2026-Present Datadog, Inc.
5+
6+
package parsing
7+
8+
import (
9+
"encoding/json"
10+
"testing"
11+
)
12+
13+
var (
14+
benchCloudWatch = json.RawMessage(`{"awslogs":{"data":"dGVzdA=="}}`)
15+
benchS3 = json.RawMessage(`{"Records":[{"eventSource":"aws:s3","s3":{"bucket":{"name":"b"},"object":{"key":"k"}}}]}`)
16+
benchSNSS3 = json.RawMessage(`{"Records":[{"EventSource":"aws:sns","Sns":{"Type":"Notification","Message":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b\"},\"object\":{\"key\":\"k\"}}}]}"}}]}`)
17+
benchSQSS3 = json.RawMessage(`{"Records":[` +
18+
`{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b1\"},\"object\":{\"key\":\"k1\"}}}]}"},` +
19+
`{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b2\"},\"object\":{\"key\":\"k2\"}}}]}"},` +
20+
`{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b3\"},\"object\":{\"key\":\"k3\"}}}]}"},` +
21+
`{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b4\"},\"object\":{\"key\":\"k4\"}}}]}"},` +
22+
`{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b5\"},\"object\":{\"key\":\"k5\"}}}]}"},` +
23+
`{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b6\"},\"object\":{\"key\":\"k6\"}}}]}"},` +
24+
`{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b7\"},\"object\":{\"key\":\"k7\"}}}]}"},` +
25+
`{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b8\"},\"object\":{\"key\":\"k8\"}}}]}"},` +
26+
`{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b9\"},\"object\":{\"key\":\"k9\"}}}]}"},` +
27+
`{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b10\"},\"object\":{\"key\":\"k10\"}}}]}"}` +
28+
`]}`)
29+
benchSQSSNSS3 = json.RawMessage(`{"Records":[` +
30+
`{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b1\\\"},\\\"object\\\":{\\\"key\\\":\\\"k1\\\"}}}]}\"}"},` +
31+
`{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b2\\\"},\\\"object\\\":{\\\"key\\\":\\\"k2\\\"}}}]}\"}"},` +
32+
`{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b3\\\"},\\\"object\\\":{\\\"key\\\":\\\"k3\\\"}}}]}\"}"},` +
33+
`{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b4\\\"},\\\"object\\\":{\\\"key\\\":\\\"k4\\\"}}}]}\"}"},` +
34+
`{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b5\\\"},\\\"object\\\":{\\\"key\\\":\\\"k5\\\"}}}]}\"}"},` +
35+
`{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b6\\\"},\\\"object\\\":{\\\"key\\\":\\\"k6\\\"}}}]}\"}"},` +
36+
`{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b7\\\"},\\\"object\\\":{\\\"key\\\":\\\"k7\\\"}}}]}\"}"},` +
37+
`{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b8\\\"},\\\"object\\\":{\\\"key\\\":\\\"k8\\\"}}}]}\"}"},` +
38+
`{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b9\\\"},\\\"object\\\":{\\\"key\\\":\\\"k9\\\"}}}]}\"}"},` +
39+
`{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b10\\\"},\\\"object\\\":{\\\"key\\\":\\\"k10\\\"}}}]}\"}"}` +
40+
`]}`)
41+
benchEventBridge = json.RawMessage(`{"version":"0","detail-type":"Object Created","source":"aws.s3","detail":{"bucket":{"name":"my-bucket"},"object":{"key":"my-key"}}}`)
42+
)
43+
44+
func BenchmarkParse_Decoder_CloudWatch(b *testing.B) {
45+
b.ReportAllocs()
46+
for b.Loop() {
47+
_, _ = Parse(benchCloudWatch)
48+
}
49+
}
50+
51+
func BenchmarkParse_Unmarshal_CloudWatch(b *testing.B) {
52+
b.ReportAllocs()
53+
for b.Loop() {
54+
_, _ = ParseUnmarshal(benchCloudWatch)
55+
}
56+
}
57+
58+
func BenchmarkParse_Decoder_S3(b *testing.B) {
59+
b.ReportAllocs()
60+
for b.Loop() {
61+
_, _ = Parse(benchS3)
62+
}
63+
}
64+
65+
func BenchmarkParse_Unmarshal_S3(b *testing.B) {
66+
b.ReportAllocs()
67+
for b.Loop() {
68+
_, _ = ParseUnmarshal(benchS3)
69+
}
70+
}
71+
72+
func BenchmarkParse_Decoder_SNS_S3(b *testing.B) {
73+
b.ReportAllocs()
74+
for b.Loop() {
75+
_, _ = Parse(benchSNSS3)
76+
}
77+
}
78+
79+
func BenchmarkParse_Unmarshal_SNS_S3(b *testing.B) {
80+
b.ReportAllocs()
81+
for b.Loop() {
82+
_, _ = ParseUnmarshal(benchSNSS3)
83+
}
84+
}
85+
86+
func BenchmarkParse_Decoder_SQS_S3(b *testing.B) {
87+
b.ReportAllocs()
88+
for b.Loop() {
89+
_, _ = Parse(benchSQSS3)
90+
}
91+
}
92+
93+
func BenchmarkParse_Unmarshal_SQS_S3(b *testing.B) {
94+
b.ReportAllocs()
95+
for b.Loop() {
96+
_, _ = ParseUnmarshal(benchSQSS3)
97+
}
98+
}
99+
100+
func BenchmarkParse_Decoder_SQS_SNS_S3(b *testing.B) {
101+
b.ReportAllocs()
102+
for b.Loop() {
103+
_, _ = Parse(benchSQSSNSS3)
104+
}
105+
}
106+
107+
func BenchmarkParse_Unmarshal_SQS_SNS_S3(b *testing.B) {
108+
b.ReportAllocs()
109+
for b.Loop() {
110+
_, _ = ParseUnmarshal(benchSQSSNSS3)
111+
}
112+
}
113+
114+
func BenchmarkParse_Decoder_EventBridge(b *testing.B) {
115+
b.ReportAllocs()
116+
for b.Loop() {
117+
_, _ = Parse(benchEventBridge)
118+
}
119+
}
120+
121+
func BenchmarkParse_Unmarshal_EventBridge(b *testing.B) {
122+
b.ReportAllocs()
123+
for b.Loop() {
124+
_, _ = ParseUnmarshal(benchEventBridge)
125+
}
126+
}

aws/logs_monitoring_go/internal/parsing/content.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ const (
2121
)
2222

2323
type ParsedEvent struct {
24-
ContentType ContentType
25-
Payload json.RawMessage
24+
ContentType ContentType
25+
Payload json.RawMessage
26+
SQSReceiptHandle string
2627
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed
2+
// under the Apache License Version 2.0.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
// Copyright 2026-Present Datadog, Inc.
5+
6+
package parsing
7+
8+
import (
9+
"encoding/json"
10+
"fmt"
11+
"strings"
12+
13+
"github.com/aws/aws-lambda-go/events"
14+
)
15+
16+
type s3EventBridgeDetail struct {
17+
Bucket struct {
18+
Name string `json:"name"`
19+
} `json:"bucket"`
20+
Object struct {
21+
Key string `json:"key"`
22+
} `json:"object"`
23+
}
24+
25+
func eventBridgeUnmarshal(event json.RawMessage) ([]ParsedEvent, error) {
26+
var eventBridgeEvent events.EventBridgeEvent
27+
if err := json.Unmarshal(event, &eventBridgeEvent); err != nil {
28+
return nil, err
29+
}
30+
31+
if eventBridgeEvent.Source == eventSourceS3 && strings.Contains(eventBridgeEvent.DetailType, "Object Created") {
32+
var s3eb s3EventBridgeDetail
33+
if err := json.Unmarshal(eventBridgeEvent.Detail, &s3eb); err != nil {
34+
return nil, err
35+
}
36+
37+
payload, err := mapS3EventBridge(s3eb)
38+
if err != nil {
39+
return nil, err
40+
}
41+
42+
return []ParsedEvent{{ContentType: ContentTypeS3, Payload: payload}}, nil
43+
}
44+
return []ParsedEvent{{ContentType: ContentTypeEventBridge, Payload: event}}, nil
45+
}
46+
47+
func mapS3EventBridge(eb s3EventBridgeDetail) (json.RawMessage, error) {
48+
s3EventRecord := events.S3EventRecord{
49+
EventSource: eventSourceS3,
50+
S3: events.S3Entity{
51+
Bucket: events.S3Bucket{Name: eb.Bucket.Name},
52+
Object: events.S3Object{Key: eb.Object.Key, URLDecodedKey: eb.Object.Key},
53+
},
54+
}
55+
payload, err := json.Marshal(s3EventRecord)
56+
if err != nil {
57+
return nil, fmt.Errorf("marshal: %w", err)
58+
}
59+
60+
return payload, nil
61+
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed
2+
// under the Apache License Version 2.0.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
// Copyright 2026-Present Datadog, Inc.
5+
6+
package parsing
7+
8+
import (
9+
"encoding/json"
10+
"errors"
11+
"fmt"
12+
)
13+
14+
type eventDiscriminator struct {
15+
AWSLogs json.RawMessage `json:"awslogs"` // CloudWatch logs
16+
Records []struct {
17+
EventSource string `json:"eventSource"`
18+
} `json:"Records"` // S3, SQS, SNS
19+
Detail json.RawMessage `json:"detail"` // EventBridge
20+
Retry json.RawMessage `json:"retry"`
21+
}
22+
23+
type recordsDiscriminator struct {
24+
Records json.RawMessage `json:"Records"`
25+
}
26+
27+
func ParseUnmarshal(event json.RawMessage) ([]ParsedEvent, error) {
28+
var disc eventDiscriminator
29+
if err := json.Unmarshal(event, &disc); err != nil {
30+
return nil, err
31+
}
32+
33+
switch {
34+
case disc.AWSLogs != nil:
35+
return []ParsedEvent{{ContentType: ContentTypeCloudwatchLogs, Payload: event}}, nil
36+
37+
case disc.Retry != nil:
38+
return []ParsedEvent{{ContentType: ContentTypeRetry}}, nil
39+
40+
case len(disc.Records) > 0:
41+
parsed, err := recordsUnmarshal(event, disc)
42+
if err != nil {
43+
return nil, fmt.Errorf("records: %w", err)
44+
}
45+
return parsed, nil
46+
47+
case disc.Detail != nil:
48+
parsed, err := eventBridgeUnmarshal(event)
49+
if err != nil {
50+
return nil, fmt.Errorf("eventbridge: %w", err)
51+
}
52+
return parsed, nil
53+
}
54+
55+
return nil, errors.New("unsupported event")
56+
}
57+
58+
func recordsUnmarshal(event json.RawMessage, disc eventDiscriminator) ([]ParsedEvent, error) {
59+
switch disc.Records[0].EventSource {
60+
case eventSourceS3:
61+
return []ParsedEvent{{ContentType: ContentTypeS3, Payload: event}}, nil
62+
63+
case eventSourceKinesis:
64+
return []ParsedEvent{{ContentType: ContentTypeKinesis, Payload: event}}, nil
65+
66+
case eventSourceSQS:
67+
parsed, err := sqsUnmarshal(event)
68+
if err != nil {
69+
return nil, fmt.Errorf("sqs: %w", err)
70+
}
71+
return parsed, nil
72+
73+
case eventSourceSNS:
74+
parsed, err := snsUnmarshal(event)
75+
if err != nil {
76+
return nil, fmt.Errorf("sns: %w", err)
77+
}
78+
return parsed, nil
79+
80+
default:
81+
return nil, fmt.Errorf("unsupported event source %q", disc.Records[0].EventSource)
82+
}
83+
}

aws/logs_monitoring_go/internal/parsing/sns.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,11 @@ func parseSNS(event json.RawMessage) ([]ParsedEvent, error) {
4646

4747
inner := json.RawMessage(message)
4848
if isS3(inner) {
49-
parsed = append(parsed, ParsedEvent{ContentTypeS3, inner})
49+
parsed = append(parsed, ParsedEvent{ContentType: ContentTypeS3, Payload: inner})
5050
continue
5151
}
5252

53-
parsed = append(parsed, ParsedEvent{ContentTypeSNS, record})
53+
parsed = append(parsed, ParsedEvent{ContentType: ContentTypeSNS, Payload: record})
5454
}
5555

5656
return parsed, nil
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed
2+
// under the Apache License Version 2.0.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
// Copyright 2026-Present Datadog, Inc.
5+
6+
package parsing
7+
8+
import (
9+
"encoding/json"
10+
11+
"github.com/aws/aws-lambda-go/events"
12+
)
13+
14+
func snsUnmarshal(event json.RawMessage) ([]ParsedEvent, error) {
15+
var snsEvent events.SNSEvent
16+
if err := json.Unmarshal(event, &snsEvent); err != nil {
17+
return nil, err
18+
}
19+
20+
var parsed []ParsedEvent
21+
for _, record := range snsEvent.Records {
22+
inner := record.SNS.Message
23+
if err := json.Unmarshal([]byte(inner), &recordsDiscriminator{}); err != nil {
24+
parsed = append(parsed, ParsedEvent{ContentType: ContentTypeS3, Payload: json.RawMessage(inner)})
25+
continue
26+
}
27+
28+
parsed = append(parsed, ParsedEvent{ContentType: ContentTypeSNS, Payload: event})
29+
}
30+
31+
return parsed, nil
32+
}

aws/logs_monitoring_go/internal/parsing/sqs.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func parseSQSBody(body string) (ParsedEvent, error) {
9191
}
9292
case recordsKey:
9393
if isS3(inner) {
94-
return ParsedEvent{ContentTypeS3, inner}, nil
94+
return ParsedEvent{ContentType: ContentTypeS3, Payload: inner}, nil
9595
}
9696
return ParsedEvent{}, errUnknownEvent
9797
default:
@@ -104,9 +104,9 @@ func parseSQSBody(body string) (ParsedEvent, error) {
104104
if typ == "Notification" && message != "" {
105105
msg := json.RawMessage(message)
106106
if isS3(msg) {
107-
return ParsedEvent{ContentTypeS3, msg}, nil
107+
return ParsedEvent{ContentType: ContentTypeS3, Payload: msg}, nil
108108
}
109-
return ParsedEvent{ContentTypeSNS, inner}, nil
109+
return ParsedEvent{ContentType: ContentTypeSNS, Payload: inner}, nil
110110
}
111111

112112
return ParsedEvent{}, errUnknownEvent

0 commit comments

Comments
 (0)