Skip to content

Commit 63115d8

Browse files
committed
feat(go-forwarder): add SQS
1 parent 756b983 commit 63115d8

4 files changed

Lines changed: 179 additions & 5 deletions

File tree

aws/logs_monitoring_go/internal/parsing/error.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,12 @@
55

66
package parsing
77

8-
import "fmt"
8+
import (
9+
"errors"
10+
"fmt"
11+
)
12+
13+
var errUnknownEvent = errors.New("unknown event")
914

1015
type KeyNotFoundError struct {
1116
Key string

aws/logs_monitoring_go/internal/parsing/parse.go

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ const (
2121

2222
eventSourceS3 = "aws:s3"
2323
eventSourceKinesis = "aws:kinesis"
24+
eventSourceSQS = "aws:sqs"
2425
eventSourceSNS = "aws:sns"
2526
)
2627

@@ -95,16 +96,26 @@ func parseRecords(event json.RawMessage, dec *json.Decoder) ([]ParsedEvent, erro
9596

9697
eventSource, ok := val.(string)
9798
if !ok {
98-
return nil, fmt.Errorf("eventSource value should be a string, got %T", eventSource)
99+
return nil, fmt.Errorf("eventSource value should be a string, got %T", val)
99100
}
100101

101102
switch eventSource {
102103
case eventSourceS3:
103104
return []ParsedEvent{{ContentType: ContentTypeS3, Payload: event}}, nil
104105
case eventSourceKinesis:
105106
return []ParsedEvent{{ContentType: ContentTypeKinesis, Payload: event}}, nil
107+
case eventSourceSQS:
108+
parsed, err := parseSQS(event)
109+
if err != nil {
110+
return nil, fmt.Errorf("sqs: %w", err)
111+
}
112+
return parsed, nil
106113
case eventSourceSNS:
107-
return parseSNS(event)
114+
parsed, err := parseSNS(event)
115+
if err != nil {
116+
return nil, fmt.Errorf("sns: %w", err)
117+
}
118+
return parsed, nil
108119
default:
109120
return nil, fmt.Errorf("unsupported event source %q", eventSource)
110121
}
@@ -163,9 +174,19 @@ func skipToRecords(dec *json.Decoder) error {
163174
}
164175

165176
func skip(dec *json.Decoder) error {
166-
var skip json.RawMessage
167-
if err := dec.Decode(&skip); err != nil {
177+
var s json.RawMessage
178+
if err := dec.Decode(&s); err != nil {
168179
return fmt.Errorf("skip: %w", err)
169180
}
170181
return nil
171182
}
183+
184+
func skipToEnd(dec *json.Decoder) error {
185+
for dec.More() {
186+
if err := skip(dec); err != nil {
187+
return err
188+
}
189+
}
190+
_, err := dec.Token()
191+
return err
192+
}

aws/logs_monitoring_go/internal/parsing/parse_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,44 @@ func TestParse(t *testing.T) {
5757
event: json.RawMessage(`{"Records":[{"EventSource":"aws:sns","Sns":{"Type":"Notification","Message":"hello world","TopicArn":"arn:aws:sns:us-east-1:123456789012:my-topic"}}]}`),
5858
want: []ContentType{ContentTypeSNS},
5959
},
60+
"sqs with direct s3": {
61+
event: json.RawMessage(`{"Records":[{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b\"},\"object\":{\"key\":\"k\"}}}]}"}]}`),
62+
want: []ContentType{ContentTypeS3},
63+
},
64+
"sqs with sns s3": {
65+
event: json.RawMessage(`{"Records":[{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b\\\"},\\\"object\\\":{\\\"key\\\":\\\"k\\\"}}}]}\"}"}]}`),
66+
want: []ContentType{ContentTypeS3},
67+
},
68+
"sqs with multiple records": {
69+
event: json.RawMessage(`{"Records":[` +
70+
`{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b1\"},\"object\":{\"key\":\"k1\"}}}]}"},` +
71+
`{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b2\"},\"object\":{\"key\":\"k2\"}}}]}"}` +
72+
`]}`),
73+
want: []ContentType{ContentTypeS3, ContentTypeS3},
74+
},
75+
"sqs with sns standalone": {
76+
event: json.RawMessage(`{"Records":[{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"hello world\"}"}]}`),
77+
want: []ContentType{ContentTypeSNS},
78+
},
79+
"sqs with subscription confirmation skipped": {
80+
event: json.RawMessage(`{"Records":[{"eventSource":"aws:sqs","body":"{\"Type\":\"SubscriptionConfirmation\",\"Message\":\"confirm\"}"}]}`),
81+
wantErr: true,
82+
},
83+
"sqs with unrecognized body skipped": {
84+
event: json.RawMessage(`{"Records":[{"eventSource":"aws:sqs","body":"{\"foo\":\"bar\"}"}]}`),
85+
wantErr: true,
86+
},
87+
"sqs mixed valid and unrecognized": {
88+
event: json.RawMessage(`{"Records":[` +
89+
`{"eventSource":"aws:sqs","body":"{\"foo\":\"bar\"}"},` +
90+
`{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b\"},\"object\":{\"key\":\"k\"}}}]}"}` +
91+
`]}`),
92+
want: []ContentType{ContentTypeS3},
93+
},
94+
"sqs with malformed body json": {
95+
event: json.RawMessage(`{"Records":[{"eventSource":"aws:sqs","body":"not json"}]}`),
96+
wantErr: true,
97+
},
6098
"empty object": {
6199
event: json.RawMessage(`{}`),
62100
wantErr: true,
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed
2+
// under the Apache License Version 2.0.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
// Copyright 2026-Present Datadog, Inc.
5+
6+
package parsing
7+
8+
import (
9+
"bytes"
10+
"encoding/json"
11+
"errors"
12+
"fmt"
13+
"log/slog"
14+
)
15+
16+
func parseSQS(event json.RawMessage) ([]ParsedEvent, error) {
17+
dec := json.NewDecoder(bytes.NewReader(event))
18+
if err := skipToRecords(dec); err != nil {
19+
return nil, fmt.Errorf("skip to records: %w", err)
20+
}
21+
22+
var parsed []ParsedEvent
23+
for i := 0; dec.More(); i++ {
24+
body, err := extractBody(dec)
25+
if err != nil {
26+
return nil, fmt.Errorf("extract body: %w", err)
27+
}
28+
29+
pe, err := parseSQSBody(body)
30+
if errors.Is(err, errUnknownEvent) {
31+
slog.Warn("skipping SQS record", slog.Int("index", i), slog.Any("error", err))
32+
continue
33+
}
34+
if err != nil {
35+
return nil, fmt.Errorf("parse SQS body: %w", err)
36+
}
37+
38+
parsed = append(parsed, pe)
39+
}
40+
41+
if len(parsed) == 0 {
42+
return nil, errors.New("no recognizable events in SQS batch")
43+
}
44+
return parsed, nil
45+
}
46+
47+
func extractBody(dec *json.Decoder) (string, error) {
48+
if err := skipBrace(dec); err != nil {
49+
return "", err
50+
}
51+
52+
if err := skipToKey(dec, "body"); err != nil {
53+
return "", fmt.Errorf("skip to key: %w", err)
54+
}
55+
56+
var body string
57+
if err := dec.Decode(&body); err != nil {
58+
return "", fmt.Errorf("decode: %w", err)
59+
}
60+
61+
if err := skipToEnd(dec); err != nil {
62+
return "", fmt.Errorf("skip to end: %w", err)
63+
}
64+
65+
return body, nil
66+
}
67+
68+
func parseSQSBody(body string) (ParsedEvent, error) {
69+
inner := json.RawMessage(body)
70+
dec := json.NewDecoder(bytes.NewReader(inner))
71+
72+
if err := skipBrace(dec); err != nil {
73+
return ParsedEvent{}, err
74+
}
75+
76+
var typ, message string
77+
for dec.More() {
78+
key, err := dec.Token()
79+
if err != nil {
80+
return ParsedEvent{}, err
81+
}
82+
83+
switch key {
84+
case "Type":
85+
if err := dec.Decode(&typ); err != nil {
86+
return ParsedEvent{}, fmt.Errorf("decode: %w", err)
87+
}
88+
case "Message":
89+
if err := dec.Decode(&message); err != nil {
90+
return ParsedEvent{}, fmt.Errorf("decode: %w", err)
91+
}
92+
case recordsKey:
93+
return ParsedEvent{ContentTypeS3, inner}, nil
94+
default:
95+
if err := skip(dec); err != nil {
96+
return ParsedEvent{}, err
97+
}
98+
}
99+
}
100+
101+
if typ == "Notification" && message != "" {
102+
msg := json.RawMessage(message)
103+
if isS3(msg) {
104+
return ParsedEvent{ContentTypeS3, msg}, nil
105+
}
106+
return ParsedEvent{ContentTypeSNS, inner}, nil
107+
}
108+
109+
return ParsedEvent{}, errUnknownEvent
110+
}

0 commit comments

Comments
 (0)