Skip to content

Commit ace0bf3

Browse files
authored
Merge pull request #2 from blockmar/log-forwarder-eventbridge-s3-support-feedback
Log forwarder eventbridge s3 support feedback
2 parents 1bfcde5 + c905735 commit ace0bf3

4 files changed

Lines changed: 85 additions & 138 deletions

File tree

aws/logs_monitoring/steps/handlers/s3_handler.py

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -58,42 +58,6 @@ def __init__(self, context, metadata, cache_layer):
5858
# a private data store for event attributes
5959
self.data_store = S3EventDataStore()
6060

61-
def _transform_eventbridge_to_s3_format(self, event):
62-
"""
63-
Transform EventBridge S3 event to standard S3 event format.
64-
65-
EventBridge format:
66-
{
67-
"version": "0",
68-
"detail-type": "Object Created",
69-
"source": "aws.s3",
70-
"detail": {
71-
"bucket": {"name": "bucket-name"},
72-
"object": {"key": "object-key", "size": 1234}
73-
}
74-
}
75-
76-
Standard S3 format:
77-
{
78-
"Records": [{
79-
"s3": {
80-
"bucket": {"name": "bucket-name"},
81-
"object": {"key": "object-key"}
82-
}
83-
}]
84-
}
85-
"""
86-
return {
87-
"Records": [{
88-
"s3": {
89-
"bucket": event["detail"]["bucket"],
90-
"object": {
91-
"key": event["detail"]["object"]["key"]
92-
}
93-
}
94-
}]
95-
}
96-
9761
def handle(self, event):
9862
event = self._extract_event(event)
9963

@@ -109,9 +73,6 @@ def handle(self, event):
10973
yield from self._get_structured_lines_for_s3_handler()
11074

11175
def _extract_event(self, event):
112-
# if this is an EventBridge S3 event, transform it to standard S3 format
113-
if event.get("source") == "aws.s3" and "detail" in event:
114-
event = self._transform_eventbridge_to_s3_format(event)
11576
# if this is a S3 event carried in a SNS message, extract it and override the event
11677
if "Sns" in event.get("Records")[0]:
11778
event = json.loads(event.get("Records")[0].get("Sns").get("Message"))

aws/logs_monitoring/steps/parsing.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ def parse(event, context, cache_layer):
3737
events = aws_handler.handle(event)
3838
return collect_and_count(events)
3939
case AwsEventType.S3:
40+
event = _reformat_eventbridge_s3_event(event)
4041
s3_handler = S3EventHandler(context, metadata, cache_layer)
4142
events = s3_handler.handle(event)
4243
case AwsEventType.EVENTS:
@@ -86,6 +87,47 @@ def parse_event_type(event):
8687
raise Exception("Event type not supported (see #Event supported section)")
8788

8889

90+
def _reformat_eventbridge_s3_event(event):
91+
"""
92+
Transform EventBridge S3 event to standard S3 event format.
93+
94+
EventBridge format:
95+
{
96+
"version": "0",
97+
"detail-type": "Object Created",
98+
"source": "aws.s3",
99+
"detail": {
100+
"bucket": {"name": "bucket-name"},
101+
"object": {"key": "object-key", "size": 1234}
102+
}
103+
}
104+
105+
Standard S3 format:
106+
{
107+
"Records": [{
108+
"s3": {
109+
"bucket": {"name": "bucket-name"},
110+
"object": {"key": "object-key"}
111+
}
112+
}]
113+
}
114+
"""
115+
if event.get("source") == "aws.s3" and "detail" in event:
116+
return {
117+
"Records": [
118+
{
119+
"s3": {
120+
"bucket": event["detail"]["bucket"],
121+
"object": {
122+
"key": event["detail"]["object"]["key"],
123+
},
124+
}
125+
}
126+
]
127+
}
128+
return event
129+
130+
89131
# Handle Cloudwatch Events
90132
def cwevent_handler(event, metadata):
91133
# Set the source on the log

aws/logs_monitoring/tests/test_parsing.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import unittest
2+
from unittest.mock import MagicMock, patch
23

34
from settings import DD_CUSTOM_TAGS, DD_SOURCE
45
from steps.common import get_service_from_tags_and_remove_duplicates, parse_event_source
56
from steps.enums import AwsEventSource
7+
from steps.parsing import parse
68

79

810
class TestParseEventSource(unittest.TestCase):
@@ -183,5 +185,46 @@ def test_get_service_from_tags_removing_duplicates(self):
183185
)
184186

185187

188+
class TestEventBridgeS3Parsing(unittest.TestCase):
189+
class Context:
190+
function_version = "$LATEST"
191+
invoked_function_arn = "arn:aws:lambda:us-east-1:123456789012:function:datadog-forwarder"
192+
function_name = "datadog-forwarder"
193+
memory_limit_in_mb = "128"
194+
195+
@patch("steps.parsing.S3EventHandler")
196+
def test_parse_normalizes_eventbridge_s3_event_before_s3_handler(self, mock_s3_handler_cls):
197+
# Arrange: handler yields one log line; we only care about the input event it received
198+
mock_s3_handler = mock_s3_handler_cls.return_value
199+
mock_s3_handler.handle.return_value = iter([{"message": "ok"}])
200+
201+
eventbridge_event = {
202+
"version": "0",
203+
"detail-type": "Object Created",
204+
"source": "aws.s3",
205+
"detail": {
206+
"bucket": {"name": "my-bucket"},
207+
"object": {"key": "my-key.log", "size": 1234},
208+
},
209+
}
210+
211+
cache_layer = MagicMock()
212+
213+
# Act
214+
_ = parse(eventbridge_event, self.Context(), cache_layer)
215+
216+
# Assert: parse() passed a canonical S3-shaped event into the S3 handler
217+
mock_s3_handler.handle.assert_called_once()
218+
(normalized_event,) = mock_s3_handler.handle.call_args.args
219+
220+
self.assertIn("Records", normalized_event)
221+
self.assertEqual(
222+
normalized_event["Records"][0]["s3"]["bucket"]["name"], "my-bucket"
223+
)
224+
self.assertEqual(
225+
normalized_event["Records"][0]["s3"]["object"]["key"], "my-key.log"
226+
)
227+
228+
186229
if __name__ == "__main__":
187230
unittest.main()

aws/logs_monitoring/tests/test_s3_handler.py

Lines changed: 0 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -325,105 +325,6 @@ def test_non_vpc_flowlog_includes_first_line(self):
325325
self.assertEqual(len(structured_lines), 3)
326326
self.assertEqual(structured_lines[0]["message"], "first line of data")
327327

328-
@patch("steps.handlers.s3_handler.boto3")
329-
def test_eventbridge_s3_object_created_event(self, mock_boto3):
330-
"""Test EventBridge S3 Object Created event is transformed and processed correctly"""
331-
mock_s3_client = MagicMock()
332-
mock_s3_client.get_object.return_value = {
333-
"Body": MagicMock(read=MagicMock(return_value=b"test log line"))
334-
}
335-
mock_boto3.client.return_value = mock_s3_client
336-
337-
eventbridge_event = {
338-
"version": "0",
339-
"id": "test-event-id",
340-
"detail-type": "Object Created",
341-
"source": "aws.s3",
342-
"account": "123456789012",
343-
"time": "2024-01-15T12:00:00Z",
344-
"region": "us-east-1",
345-
"resources": ["arn:aws:s3:::my-bucket"],
346-
"detail": {
347-
"version": "0",
348-
"bucket": {"name": "my-bucket"},
349-
"object": {
350-
"key": "my-key.log",
351-
"size": 1234,
352-
"etag": "abc123",
353-
"sequencer": "xyz789",
354-
},
355-
"request-id": "request-id",
356-
"requester": "123456789012",
357-
"source-ip-address": "1.2.3.4",
358-
"reason": "PutObject",
359-
},
360-
}
361-
362-
logs = list(self.s3_handler.handle(eventbridge_event))
363-
364-
self.assertEqual(len(logs), 1)
365-
self.assertEqual(logs[0]["message"], "test log line")
366-
self.assertEqual(logs[0]["aws"]["s3"]["bucket"], "my-bucket")
367-
self.assertEqual(logs[0]["aws"]["s3"]["key"], "my-key.log")
368-
mock_s3_client.get_object.assert_called_once_with(
369-
Bucket="my-bucket", Key="my-key.log"
370-
)
371-
372-
@patch("steps.handlers.s3_handler.boto3")
373-
def test_eventbridge_s3_with_gzip_file(self, mock_boto3):
374-
"""Test EventBridge S3 event with gzipped file"""
375-
mock_s3_client = MagicMock()
376-
gzip_data = gzip.compress(b"gzipped log line")
377-
mock_s3_client.get_object.return_value = {
378-
"Body": MagicMock(read=MagicMock(return_value=gzip_data))
379-
}
380-
mock_boto3.client.return_value = mock_s3_client
381-
382-
eventbridge_event = {
383-
"version": "0",
384-
"detail-type": "Object Created",
385-
"source": "aws.s3",
386-
"detail": {
387-
"bucket": {"name": "my-bucket"},
388-
"object": {"key": "my-key.log.gz"},
389-
},
390-
}
391-
392-
logs = list(self.s3_handler.handle(eventbridge_event))
393-
394-
self.assertEqual(len(logs), 1)
395-
self.assertEqual(logs[0]["message"], "gzipped log line")
396-
397-
@patch("steps.handlers.s3_handler.boto3")
398-
def test_eventbridge_s3_cloudtrail(self, mock_boto3):
399-
"""Test EventBridge S3 event with CloudTrail logs"""
400-
mock_s3_client = MagicMock()
401-
cloudtrail_data = {
402-
"Records": [
403-
{"eventVersion": "1.05", "eventName": "AssumeRole", "eventSource": "sts.amazonaws.com"}
404-
]
405-
}
406-
mock_s3_client.get_object.return_value = {
407-
"Body": MagicMock(read=MagicMock(return_value=gzip.compress(bytes(str(cloudtrail_data).replace("'", '"'), "utf-8"))))
408-
}
409-
mock_boto3.client.return_value = mock_s3_client
410-
411-
eventbridge_event = {
412-
"version": "0",
413-
"detail-type": "Object Created",
414-
"source": "aws.s3",
415-
"detail": {
416-
"bucket": {"name": "my-bucket"},
417-
"object": {"key": "123456779121_CloudTrail_eu-west-3_20180707T1735Z_abcdefghi0MCRL2O.json.gz"},
418-
},
419-
}
420-
421-
logs = list(self.s3_handler.handle(eventbridge_event))
422-
423-
self.assertEqual(len(logs), 1)
424-
self.assertEqual(logs[0]["eventName"], "AssumeRole")
425-
self.assertEqual(logs[0]["aws"]["s3"]["bucket"], "my-bucket")
426-
427328
@patch("steps.handlers.s3_handler.boto3")
428329
def test_backward_compatibility_direct_s3(self, mock_boto3):
429330
"""Test that direct S3 events still work after EventBridge changes"""

0 commit comments

Comments
 (0)