Skip to content

Commit 2df78ac

Browse files
blockmardomalessige0Aja
authored
Add support for S3 Object Created events distributed using Eventbridge (#1052)
* Add support for S3 Object Created events distrubuted using Eventbridge * Update aws/logs_monitoring/README.md Co-authored-by: domalessi <111786334+domalessi@users.noreply.github.com> * Update aws/logs_monitoring/README.md Co-authored-by: domalessi <111786334+domalessi@users.noreply.github.com> * Update aws/logs_monitoring/README.md Co-authored-by: domalessi <111786334+domalessi@users.noreply.github.com> * Update aws/logs_monitoring/README.md Co-authored-by: domalessi <111786334+domalessi@users.noreply.github.com> * Update aws/logs_monitoring/README.md Co-authored-by: domalessi <111786334+domalessi@users.noreply.github.com> * Update aws/logs_monitoring/README.md Co-authored-by: domalessi <111786334+domalessi@users.noreply.github.com> * Moved event parsing/rewrite login to parsing.py * Refactored tests not to be end-to-end * Refactored EventBridge S3 events to a new event type. Extracted handler, similar to kinesis. * Apply suggestion from @blockmar * Update test_s3_handler.py Removed added tests, since s3_handler.py is no longer modified. * Update aws/logs_monitoring/steps/parsing.py Co-authored-by: Georgi <georgi.ajaeiya@datadoghq.com> * Black code formatting --------- Co-authored-by: domalessi <111786334+domalessi@users.noreply.github.com> Co-authored-by: Georgi <georgi.ajaeiya@datadoghq.com>
1 parent a0f9539 commit 2df78ac

4 files changed

Lines changed: 234 additions & 1 deletion

File tree

aws/logs_monitoring/README.md

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,88 @@ In case you encounter the following error when creating S3 triggers, we recommen
290290
An error occurred when creating the trigger: Configuration is ambiguously defined. Cannot have overlapping suffixes in two rules if the prefixes are overlapping for the same event type.
291291
```
292292

293+
### Using EventBridge for S3 event notifications
294+
295+
The Datadog Forwarder supports S3 event notifications delivered through Amazon EventBridge, in addition to direct S3 event notifications. EventBridge provides advanced filtering and supports more complex integration patterns.
296+
297+
#### Supported event types
298+
299+
- `Object Created` events from S3 delivered through EventBridge
300+
301+
#### Configuration
302+
303+
To use EventBridge for S3 event notifications:
304+
305+
1. **Enable EventBridge notifications on your S3 bucket:**
306+
307+
Using AWS CLI:
308+
```bash
309+
aws s3api put-bucket-notification-configuration \
310+
--bucket my-bucket \
311+
--notification-configuration '{
312+
"EventBridgeConfiguration": {}
313+
}'
314+
```
315+
316+
Using the AWS Console:
317+
- Navigate to your S3 bucket.
318+
- Go to **Properties** > **Event notifications**.
319+
- Enable **Amazon EventBridge**.
320+
321+
2. **Create an EventBridge rule to trigger the Forwarder:**
322+
323+
Using AWS CLI:
324+
```bash
325+
aws events put-rule \
326+
--name s3-logs-to-datadog \
327+
--event-pattern '{
328+
"source": ["aws.s3"],
329+
"detail-type": ["Object Created"]
330+
}'
331+
332+
aws events put-targets \
333+
--rule s3-logs-to-datadog \
334+
--targets "Id"="1","Arn"="arn:aws:lambda:REGION:ACCOUNT:function:datadog-forwarder"
335+
```
336+
337+
Using the AWS Console:
338+
- Navigate to Amazon EventBridge.
339+
- Create a new rule.
340+
- Set **Event source** to `aws.s3`.
341+
- Set **Detail type** to `Object Created`.
342+
- Add your Datadog Forwarder Lambda as a target.
343+
344+
3. **Add advanced filtering (optional):**
345+
346+
You can filter events by bucket name or object key patterns:
347+
```json
348+
{
349+
"source": ["aws.s3"],
350+
"detail-type": ["Object Created"],
351+
"detail": {
352+
"bucket": {
353+
"name": ["my-logs-bucket"]
354+
},
355+
"object": {
356+
"key": [{
357+
"prefix": "logs/"
358+
}]
359+
}
360+
}
361+
}
362+
```
363+
364+
#### Benefits of using EventBridge
365+
366+
- **Advanced filtering**: Filter events based on bucket names, object key patterns, object size, and more
367+
- **Multiple targets**: Route the same S3 events to multiple Lambda functions or services
368+
- **Cross-account routing**: Forward events across AWS accounts
369+
- **Better delivery guarantees**: Use built-in retry and dead-letter queue capabilities
370+
371+
#### Backward compatibility
372+
373+
Direct S3 event notifications and SNS-wrapped S3 events continue to work without any changes. You can use EventBridge alongside existing triggers.
374+
293375
## Contributing
294376

295377
We love pull requests. Here's a quick guide.

aws/logs_monitoring/steps/enums.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ def __str__(self):
5858
class AwsEventType(Enum):
5959
AWSLOGS = "awslogs"
6060
EVENTS = "events"
61+
EVENTBRIDGE_S3 = "eventbridge_s3"
6162
KINESIS = "kinesis"
6263
S3 = "s3"
6364
SNS = "sns"

aws/logs_monitoring/steps/parsing.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ def parse(event, context, cache_layer):
3939
case AwsEventType.S3:
4040
s3_handler = S3EventHandler(context, metadata, cache_layer)
4141
events = s3_handler.handle(event)
42+
case AwsEventType.EVENTBRIDGE_S3:
43+
events = eventbridge_s3_handler(event, context, metadata, cache_layer)
4244
case AwsEventType.EVENTS:
4345
events = cwevent_handler(event, metadata)
4446
case AwsEventType.SNS:
@@ -79,10 +81,61 @@ def parse_event_type(event):
7981
elif str(AwsEventType.AWSLOGS) in event:
8082
return AwsEventType.AWSLOGS
8183
elif "detail" in event:
84+
# Check if this is an EventBridge S3 event
85+
if event.get("source", "") == "aws.s3" and "Object Created" in event.get(
86+
"detail-type", ""
87+
):
88+
return AwsEventType.EVENTBRIDGE_S3
8289
return AwsEventType.EVENTS
8390
raise Exception("Event type not supported (see #Event supported section)")
8491

8592

93+
# Handle S3 event over EventBridge
94+
def eventbridge_s3_handler(event, context, metadata, cache_layer):
95+
"""
96+
Transform EventBridge S3 event to standard S3 event format.
97+
98+
EventBridge format:
99+
{
100+
"version": "0",
101+
"detail-type": "Object Created",
102+
"source": "aws.s3",
103+
"detail": {
104+
"bucket": {"name": "bucket-name"},
105+
"object": {"key": "object-key", "size": 1234}
106+
}
107+
}
108+
109+
Standard S3 format:
110+
{
111+
"Records": [{
112+
"s3": {
113+
"bucket": {"name": "bucket-name"},
114+
"object": {"key": "object-key"}
115+
}
116+
}]
117+
}
118+
"""
119+
120+
def reformat_eventbridge_s3_event(event):
121+
return {
122+
"Records": [
123+
{
124+
"s3": {
125+
"bucket": event["detail"]["bucket"],
126+
"object": {
127+
"key": event["detail"]["object"]["key"],
128+
},
129+
}
130+
}
131+
]
132+
}
133+
134+
event = reformat_eventbridge_s3_event(event)
135+
s3_handler = S3EventHandler(context, metadata, cache_layer)
136+
return s3_handler.handle(event)
137+
138+
86139
# Handle Cloudwatch Events
87140
def cwevent_handler(event, metadata):
88141
# Set the source on the log

aws/logs_monitoring/tests/test_parsing.py

Lines changed: 98 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import unittest
2+
from unittest.mock import MagicMock, patch
23

34
from settings import DD_CUSTOM_TAGS, DD_SOURCE
45
from steps.common import get_service_from_tags_and_remove_duplicates, parse_event_source
5-
from steps.enums import AwsEventSource
6+
from steps.enums import AwsEventSource, AwsEventType
7+
from steps.parsing import parse, parse_event_type
68

79

810
class TestParseEventSource(unittest.TestCase):
@@ -183,5 +185,100 @@ def test_get_service_from_tags_removing_duplicates(self):
183185
)
184186

185187

188+
class TestParseEventType(unittest.TestCase):
189+
def test_parse_eventbridge_s3_event_type(self):
190+
"""Test that EventBridge S3 events are correctly identified as EventBridge S3 type"""
191+
eventbridge_s3_event = {
192+
"version": "0",
193+
"id": "test-event-id",
194+
"detail-type": "Object Created",
195+
"source": "aws.s3",
196+
"account": "123456789012",
197+
"time": "2024-01-15T12:00:00Z",
198+
"region": "us-east-1",
199+
"resources": ["arn:aws:s3:::my-bucket"],
200+
"detail": {
201+
"bucket": {"name": "my-bucket"},
202+
"object": {"key": "my-key.log"},
203+
},
204+
}
205+
206+
event_type = parse_event_type(eventbridge_s3_event)
207+
self.assertEqual(event_type, AwsEventType.EVENTBRIDGE_S3)
208+
209+
def test_parse_direct_s3_event_type(self):
210+
"""Test that direct S3 events are still correctly identified as S3 type"""
211+
direct_s3_event = {
212+
"Records": [
213+
{
214+
"s3": {
215+
"bucket": {"name": "my-bucket"},
216+
"object": {"key": "my-key"},
217+
}
218+
}
219+
]
220+
}
221+
222+
event_type = parse_event_type(direct_s3_event)
223+
self.assertEqual(event_type, AwsEventType.S3)
224+
225+
def test_parse_non_s3_eventbridge_event_type(self):
226+
"""Test that non-S3 EventBridge events are identified as EVENTS type"""
227+
eventbridge_other_event = {
228+
"version": "0",
229+
"detail-type": "EC2 Instance State-change Notification",
230+
"source": "aws.ec2",
231+
"detail": {"instance-id": "i-1234567890abcdef0", "state": "terminated"},
232+
}
233+
234+
event_type = parse_event_type(eventbridge_other_event)
235+
self.assertEqual(event_type, AwsEventType.EVENTS)
236+
237+
238+
class TestEventBridgeS3Parsing(unittest.TestCase):
239+
class Context:
240+
function_version = "$LATEST"
241+
invoked_function_arn = (
242+
"arn:aws:lambda:us-east-1:123456789012:function:datadog-forwarder"
243+
)
244+
function_name = "datadog-forwarder"
245+
memory_limit_in_mb = "128"
246+
247+
@patch("steps.parsing.S3EventHandler")
248+
def test_parse_normalizes_eventbridge_s3_event_before_s3_handler(
249+
self, mock_s3_handler_cls
250+
):
251+
# Arrange: handler yields one log line; we only care about the input event it received
252+
mock_s3_handler = mock_s3_handler_cls.return_value
253+
mock_s3_handler.handle.return_value = iter([{"message": "ok"}])
254+
255+
eventbridge_event = {
256+
"version": "0",
257+
"detail-type": "Object Created",
258+
"source": "aws.s3",
259+
"detail": {
260+
"bucket": {"name": "my-bucket"},
261+
"object": {"key": "my-key.log", "size": 1234},
262+
},
263+
}
264+
265+
cache_layer = MagicMock()
266+
267+
# Act
268+
_ = parse(eventbridge_event, self.Context(), cache_layer)
269+
270+
# Assert: parse() passed a canonical S3-shaped event into the S3 handler
271+
mock_s3_handler.handle.assert_called_once()
272+
(normalized_event,) = mock_s3_handler.handle.call_args.args
273+
274+
self.assertIn("Records", normalized_event)
275+
self.assertEqual(
276+
normalized_event["Records"][0]["s3"]["bucket"]["name"], "my-bucket"
277+
)
278+
self.assertEqual(
279+
normalized_event["Records"][0]["s3"]["object"]["key"], "my-key.log"
280+
)
281+
282+
186283
if __name__ == "__main__":
187284
unittest.main()

0 commit comments

Comments
 (0)