Skip to content

Commit 55fccea

Browse files
LorisFriedelclaude
andcommitted
feat(forwarder): add SQS support as event source for S3 notifications
Support S3 event notifications delivered via SQS queues (S3 -> SQS -> Lambda and S3 -> SNS -> SQS -> Lambda). The forwarder detects SQS event records, unwraps the inner S3 event from the SQS body, and delegates to the existing S3EventHandler. Each SQS record gets fresh metadata to avoid cross-contamination across batch items. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 2df78ac commit 55fccea

6 files changed

Lines changed: 246 additions & 34 deletions

File tree

aws/logs_monitoring/steps/enums.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ class AwsEventType(Enum):
6262
KINESIS = "kinesis"
6363
S3 = "s3"
6464
SNS = "sns"
65+
SQS = "sqs"
6566
UNKNOWN = "unknown"
6667

6768
def __str__(self):

aws/logs_monitoring/steps/parsing.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ def parse(event, context, cache_layer):
3939
case AwsEventType.S3:
4040
s3_handler = S3EventHandler(context, metadata, cache_layer)
4141
events = s3_handler.handle(event)
42+
case AwsEventType.SQS:
43+
events = sqs_handler(event, context, cache_layer)
44+
return collect_and_count(events)
4245
case AwsEventType.EVENTBRIDGE_S3:
4346
events = eventbridge_s3_handler(event, context, metadata, cache_layer)
4447
case AwsEventType.EVENTS:
@@ -78,6 +81,8 @@ def parse_event_type(event):
7881
return AwsEventType.SNS
7982
elif str(AwsEventType.KINESIS) in record:
8083
return AwsEventType.KINESIS
84+
elif record.get("eventSource") == "aws:sqs":
85+
return AwsEventType.SQS
8186
elif str(AwsEventType.AWSLOGS) in event:
8287
return AwsEventType.AWSLOGS
8388
elif "detail" in event:
@@ -90,6 +95,52 @@ def parse_event_type(event):
9095
raise Exception("Event type not supported (see #Event supported section)")
9196

9297

98+
# Handle S3 events delivered via SQS (S3 -> SQS or S3 -> SNS -> SQS)
99+
def sqs_handler(event, context, cache_layer):
100+
for record in event["Records"]:
101+
inner_event = _extract_inner_event_from_sqs(record)
102+
if inner_event is None:
103+
continue
104+
# Fresh metadata per SQS record: S3EventHandler mutates metadata
105+
# (DD_SOURCE, tags, service), so each record needs its own copy.
106+
metadata = generate_metadata(context)
107+
s3_handler = S3EventHandler(context, metadata, cache_layer)
108+
for log_event in s3_handler.handle(inner_event):
109+
if isinstance(log_event, dict):
110+
yield merge_dicts(log_event, metadata)
111+
elif isinstance(log_event, str):
112+
yield merge_dicts({"message": log_event}, metadata)
113+
114+
115+
def _extract_inner_event_from_sqs(sqs_record):
116+
try:
117+
body = json.loads(sqs_record["body"])
118+
except (json.JSONDecodeError, KeyError, TypeError):
119+
logger.warning("SQS record has missing or malformed body, skipping")
120+
return None
121+
122+
# Direct S3 event: body contains Records[0].s3
123+
if _contains_s3_records(body):
124+
return body
125+
126+
# SNS-wrapped S3 event: body.Type == "Notification" and body.Message contains S3 event
127+
if body.get("Type") == "Notification":
128+
try:
129+
message = json.loads(body.get("Message", ""))
130+
if _contains_s3_records(message):
131+
return message
132+
except (json.JSONDecodeError, TypeError):
133+
pass
134+
135+
logger.warning("SQS record body does not contain a recognized S3 event, skipping")
136+
return None
137+
138+
139+
def _contains_s3_records(event):
140+
records = event.get("Records")
141+
return isinstance(records, list) and len(records) > 0 and records[0].get("s3")
142+
143+
93144
# Handle S3 event over EventBridge
94145
def eventbridge_s3_handler(event, context, metadata, cache_layer):
95146
"""

aws/logs_monitoring/template.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -754,6 +754,13 @@ Resources:
754754
Resource: "*"
755755
Effect: Allow
756756
- !Ref AWS::NoValue
757+
# Required for Lambda event source mappings that poll SQS queues
758+
- Action:
759+
- sqs:ReceiveMessage
760+
- sqs:DeleteMessage
761+
- sqs:GetQueueAttributes
762+
Resource: "*"
763+
Effect: Allow
757764
Tags:
758765
- Value: !FindInMap [Constants, DdForwarder, Version]
759766
Key: dd_forwarder_version
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
{
2+
"Records": [
3+
{
4+
"messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
5+
"receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
6+
"body": "{\"Records\":[{\"eventVersion\":\"2.1\",\"eventSource\":\"aws:s3\",\"awsRegion\":\"us-east-1\",\"eventTime\":\"2024-01-15T12:00:00.000Z\",\"eventName\":\"ObjectCreated:Put\",\"s3\":{\"bucket\":{\"name\":\"my-bucket\",\"arn\":\"arn:aws:s3:::my-bucket\"},\"object\":{\"key\":\"my-key.log\",\"size\":1234}}}]}",
7+
"attributes": {
8+
"ApproximateReceiveCount": "1",
9+
"SentTimestamp": "1545082649636",
10+
"SenderId": "AIDAIENQZJOLO23YVJ4VO",
11+
"ApproximateFirstReceiveTimestamp": "1545082649636"
12+
},
13+
"messageAttributes": {},
14+
"md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
15+
"eventSource": "aws:sqs",
16+
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:my-queue",
17+
"awsRegion": "us-east-1"
18+
}
19+
]
20+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
{
2+
"Records": [
3+
{
4+
"messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
5+
"receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
6+
"body": "{\"Type\":\"Notification\",\"MessageId\":\"a1b2c3d4-e5f6-7890-abcd-ef1234567890\",\"TopicArn\":\"arn:aws:sns:us-east-1:123456789012:my-topic\",\"Subject\":\"Amazon S3 Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"eventVersion\\\":\\\"2.1\\\",\\\"eventSource\\\":\\\"aws:s3\\\",\\\"awsRegion\\\":\\\"us-east-1\\\",\\\"eventTime\\\":\\\"2024-01-15T12:00:00.000Z\\\",\\\"eventName\\\":\\\"ObjectCreated:Put\\\",\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"my-bucket\\\",\\\"arn\\\":\\\"arn:aws:s3:::my-bucket\\\"},\\\"object\\\":{\\\"key\\\":\\\"my-key.log\\\",\\\"size\\\":1234}}}]}\",\"Timestamp\":\"2024-01-15T12:00:00.000Z\",\"SignatureVersion\":\"1\"}",
7+
"attributes": {
8+
"ApproximateReceiveCount": "1",
9+
"SentTimestamp": "1545082649636",
10+
"SenderId": "AIDAIENQZJOLO23YVJ4VO",
11+
"ApproximateFirstReceiveTimestamp": "1545082649636"
12+
},
13+
"messageAttributes": {},
14+
"md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
15+
"eventSource": "aws:sqs",
16+
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:my-queue",
17+
"awsRegion": "us-east-1"
18+
}
19+
]
20+
}

aws/logs_monitoring/tests/test_parsing.py

Lines changed: 147 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import json
12
import unittest
23
from unittest.mock import MagicMock, patch
34

@@ -7,6 +8,28 @@
78
from steps.parsing import parse, parse_event_type
89

910

11+
class Context:
12+
function_version = "$LATEST"
13+
invoked_function_arn = (
14+
"arn:aws:lambda:us-east-1:123456789012:function:datadog-forwarder"
15+
)
16+
function_name = "datadog-forwarder"
17+
memory_limit_in_mb = "128"
18+
19+
20+
def _make_s3_event(bucket, key):
21+
return {"Records": [{"s3": {"bucket": {"name": bucket}, "object": {"key": key}}}]}
22+
23+
24+
def _make_sqs_record(body, message_id="msg-1"):
25+
return {
26+
"messageId": message_id,
27+
"body": body if isinstance(body, str) else json.dumps(body),
28+
"eventSource": "aws:sqs",
29+
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:q",
30+
}
31+
32+
1033
class TestParseEventSource(unittest.TestCase):
1134
def test_aws_source_if_none_found(self):
1235
self.assertEqual(parse_event_source({}, "asdfalsfhalskjdfhalsjdf"), "aws")
@@ -187,7 +210,7 @@ def test_get_service_from_tags_removing_duplicates(self):
187210

188211
class TestParseEventType(unittest.TestCase):
189212
def test_parse_eventbridge_s3_event_type(self):
190-
"""Test that EventBridge S3 events are correctly identified as EventBridge S3 type"""
213+
"""EventBridge S3 events are correctly identified"""
191214
eventbridge_s3_event = {
192215
"version": "0",
193216
"id": "test-event-id",
@@ -202,53 +225,147 @@ def test_parse_eventbridge_s3_event_type(self):
202225
"object": {"key": "my-key.log"},
203226
},
204227
}
205-
206-
event_type = parse_event_type(eventbridge_s3_event)
207-
self.assertEqual(event_type, AwsEventType.EVENTBRIDGE_S3)
228+
self.assertEqual(parse_event_type(eventbridge_s3_event), AwsEventType.EVENTBRIDGE_S3)
208229

209230
def test_parse_direct_s3_event_type(self):
210-
"""Test that direct S3 events are still correctly identified as S3 type"""
211-
direct_s3_event = {
212-
"Records": [
213-
{
214-
"s3": {
215-
"bucket": {"name": "my-bucket"},
216-
"object": {"key": "my-key"},
217-
}
218-
}
219-
]
220-
}
221-
222-
event_type = parse_event_type(direct_s3_event)
223-
self.assertEqual(event_type, AwsEventType.S3)
231+
"""Direct S3 events are correctly identified"""
232+
self.assertEqual(
233+
parse_event_type(_make_s3_event("my-bucket", "my-key")), AwsEventType.S3
234+
)
224235

225236
def test_parse_non_s3_eventbridge_event_type(self):
226-
"""Test that non-S3 EventBridge events are identified as EVENTS type"""
237+
"""Non-S3 EventBridge events are identified as EVENTS type"""
227238
eventbridge_other_event = {
228239
"version": "0",
229240
"detail-type": "EC2 Instance State-change Notification",
230241
"source": "aws.ec2",
231242
"detail": {"instance-id": "i-1234567890abcdef0", "state": "terminated"},
232243
}
244+
self.assertEqual(parse_event_type(eventbridge_other_event), AwsEventType.EVENTS)
233245

234-
event_type = parse_event_type(eventbridge_other_event)
235-
self.assertEqual(event_type, AwsEventType.EVENTS)
246+
def test_parse_sqs_event_type(self):
247+
"""SQS events are correctly identified"""
248+
sqs_event = {"Records": [_make_sqs_record(_make_s3_event("b", "k"))]}
249+
self.assertEqual(parse_event_type(sqs_event), AwsEventType.SQS)
236250

251+
def test_direct_s3_event_not_detected_as_sqs(self):
252+
"""Direct S3 events must still be detected as S3, not SQS"""
253+
self.assertEqual(
254+
parse_event_type(_make_s3_event("my-bucket", "my-key")), AwsEventType.S3
255+
)
237256

238-
class TestEventBridgeS3Parsing(unittest.TestCase):
239-
class Context:
240-
function_version = "$LATEST"
241-
invoked_function_arn = (
242-
"arn:aws:lambda:us-east-1:123456789012:function:datadog-forwarder"
257+
def test_sns_event_not_detected_as_sqs(self):
258+
"""SNS events must still be detected as SNS, not SQS"""
259+
sns_event = {"Records": [{"Sns": {"Message": "hello"}}]}
260+
self.assertEqual(parse_event_type(sns_event), AwsEventType.SNS)
261+
262+
def test_kinesis_event_not_detected_as_sqs(self):
263+
"""Kinesis events must still be detected as Kinesis, not SQS"""
264+
kinesis_event = {"Records": [{"kinesis": {"data": "base64data"}}]}
265+
self.assertEqual(parse_event_type(kinesis_event), AwsEventType.KINESIS)
266+
267+
268+
class TestSQSEventParsing(unittest.TestCase):
269+
@patch("steps.parsing.S3EventHandler")
270+
def test_parse_sqs_s3_event(self, mock_s3_handler_cls):
271+
"""S3 event delivered via SQS is unwrapped and forwarded to S3EventHandler"""
272+
mock_s3_handler = mock_s3_handler_cls.return_value
273+
mock_s3_handler.handle.return_value = iter([{"message": "log line"}])
274+
275+
sqs_event = {
276+
"Records": [_make_sqs_record(_make_s3_event("my-bucket", "my-key.log"))]
277+
}
278+
279+
result = parse(sqs_event, Context(), MagicMock())
280+
281+
mock_s3_handler.handle.assert_called_once()
282+
inner_event = mock_s3_handler.handle.call_args.args[0]
283+
self.assertEqual(inner_event["Records"][0]["s3"]["bucket"]["name"], "my-bucket")
284+
self.assertEqual(len(result), 1)
285+
self.assertIn("ddsourcecategory", result[0])
286+
self.assertIn("aws", result[0])
287+
self.assertIn("invoked_function_arn", result[0]["aws"])
288+
289+
@patch("steps.parsing.S3EventHandler")
290+
def test_parse_sqs_sns_s3_event(self, mock_s3_handler_cls):
291+
"""S3 event delivered via SNS -> SQS is unwrapped and forwarded to S3EventHandler"""
292+
mock_s3_handler = mock_s3_handler_cls.return_value
293+
mock_s3_handler.handle.return_value = iter([{"message": "log line"}])
294+
295+
sns_body = {
296+
"Type": "Notification",
297+
"MessageId": "a1b2c3d4",
298+
"TopicArn": "arn:aws:sns:us-east-1:123456789012:my-topic",
299+
"Message": json.dumps(_make_s3_event("sns-bucket", "sns-key.log")),
300+
}
301+
sqs_event = {"Records": [_make_sqs_record(sns_body)]}
302+
303+
result = parse(sqs_event, Context(), MagicMock())
304+
305+
mock_s3_handler.handle.assert_called_once()
306+
inner_event = mock_s3_handler.handle.call_args.args[0]
307+
self.assertEqual(
308+
inner_event["Records"][0]["s3"]["bucket"]["name"], "sns-bucket"
243309
)
244-
function_name = "datadog-forwarder"
245-
memory_limit_in_mb = "128"
310+
self.assertEqual(len(result), 1)
311+
312+
@patch("steps.parsing.S3EventHandler")
313+
def test_parse_sqs_batch_multiple_records(self, mock_s3_handler_cls):
314+
"""Multiple SQS records in a single batch are all processed"""
315+
mock_s3_handler = mock_s3_handler_cls.return_value
316+
mock_s3_handler.handle.side_effect = [
317+
iter([{"message": "line1"}]),
318+
iter([{"message": "line2"}]),
319+
]
320+
321+
sqs_event = {
322+
"Records": [
323+
_make_sqs_record(_make_s3_event("b1", "k1"), message_id="msg-1"),
324+
_make_sqs_record(_make_s3_event("b2", "k2"), message_id="msg-2"),
325+
]
326+
}
327+
328+
result = parse(sqs_event, Context(), MagicMock())
329+
330+
self.assertEqual(mock_s3_handler.handle.call_count, 2)
331+
self.assertEqual(len(result), 2)
332+
333+
@patch("steps.parsing.S3EventHandler")
334+
def test_parse_sqs_malformed_body_skipped(self, mock_s3_handler_cls):
335+
"""SQS records with malformed body are skipped without crashing"""
336+
mock_s3_handler = mock_s3_handler_cls.return_value
337+
mock_s3_handler.handle.return_value = iter([{"message": "ok"}])
246338

339+
sqs_event = {
340+
"Records": [
341+
_make_sqs_record("not valid json", message_id="bad"),
342+
_make_sqs_record(_make_s3_event("b", "k"), message_id="good"),
343+
]
344+
}
345+
346+
result = parse(sqs_event, Context(), MagicMock())
347+
348+
mock_s3_handler.handle.assert_called_once()
349+
self.assertEqual(len(result), 1)
350+
351+
@patch("steps.parsing.S3EventHandler")
352+
def test_parse_sqs_unrecognized_body_skipped(self, mock_s3_handler_cls):
353+
"""SQS records with valid JSON but unrecognized content are skipped"""
354+
mock_s3_handler = mock_s3_handler_cls.return_value
355+
356+
sqs_event = {"Records": [_make_sqs_record({"foo": "bar"})]}
357+
358+
result = parse(sqs_event, Context(), MagicMock())
359+
360+
mock_s3_handler.handle.assert_not_called()
361+
self.assertEqual(len(result), 0)
362+
363+
364+
class TestEventBridgeS3Parsing(unittest.TestCase):
247365
@patch("steps.parsing.S3EventHandler")
248366
def test_parse_normalizes_eventbridge_s3_event_before_s3_handler(
249367
self, mock_s3_handler_cls
250368
):
251-
# Arrange: handler yields one log line; we only care about the input event it received
252369
mock_s3_handler = mock_s3_handler_cls.return_value
253370
mock_s3_handler.handle.return_value = iter([{"message": "ok"}])
254371

@@ -262,12 +379,8 @@ def test_parse_normalizes_eventbridge_s3_event_before_s3_handler(
262379
},
263380
}
264381

265-
cache_layer = MagicMock()
266-
267-
# Act
268-
_ = parse(eventbridge_event, self.Context(), cache_layer)
382+
parse(eventbridge_event, Context(), MagicMock())
269383

270-
# Assert: parse() passed a canonical S3-shaped event into the S3 handler
271384
mock_s3_handler.handle.assert_called_once()
272385
(normalized_event,) = mock_s3_handler.handle.call_args.args
273386

0 commit comments

Comments
 (0)