-
Notifications
You must be signed in to change notification settings - Fork 390
[AWSINTS-3517] feat(go-forwarder): add EventBridge #1119
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
ndakkoune
merged 6 commits into
nabil.dakkoune/go-forwarder
from
nabil.dakkoune/AWSINTS-3517
May 11, 2026
Merged
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
8f7c8f2
feat(go-forwarder): add EventBridge
ndakkoune 62da5c3
declare const keys
ndakkoune 1e73283
improve error handling, reading and struct attributes when creating
ndakkoune d49fea1
inverting condition
ndakkoune 47840bf
comment
ndakkoune 782854e
error wrapping
ndakkoune File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,94 @@ | ||
| // Unless explicitly stated otherwise all files in this repository are licensed | ||
| // under the Apache License Version 2.0. | ||
| // This product includes software developed at Datadog (https://www.datadoghq.com/). | ||
| // Copyright 2026-Present Datadog, Inc. | ||
|
|
||
| package handling | ||
|
|
||
| import ( | ||
| "bytes" | ||
| "cmp" | ||
| "context" | ||
| "encoding/json" | ||
| "errors" | ||
| "fmt" | ||
| "strings" | ||
|
|
||
| "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/concurrent" | ||
| "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" | ||
| "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" | ||
| ) | ||
|
|
||
| type EventBridgeHandler struct { | ||
| cfg *config.Config | ||
| } | ||
|
|
||
| func NewEventBridge(cfg *config.Config) *EventBridgeHandler { | ||
| return &EventBridgeHandler{ | ||
| cfg: cfg, | ||
| } | ||
| } | ||
|
|
||
| func (h *EventBridgeHandler) Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error { | ||
| lambdaOrigin, err := model.GetLambdaOrigin(ctx) | ||
| if err != nil { | ||
| return fmt.Errorf("get lambda origin: %w", err) | ||
| } | ||
|
|
||
| ebSource, err := decodeEventBridgeSource(event) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| source := cmp.Or(h.cfg.Source, ebSource) | ||
| service := cmp.Or(h.cfg.Service, source) | ||
|
|
||
| entry := model.NewLogEntry() | ||
| entry.Message = string(event) | ||
| entry.Source = source | ||
| entry.Service = service | ||
| entry.Tags = h.cfg.Tags | ||
| entry.Metadata = lambdaOrigin | ||
|
|
||
| if h.cfg.Filter.ShouldExclude(entry.Message) { | ||
| return nil | ||
| } | ||
|
|
||
| entry.Message = h.cfg.Scrubber.Scrub(entry.Message) | ||
| return concurrent.SafeSender(ctx, out, entry) | ||
| } | ||
|
|
||
| func decodeEventBridgeSource(event json.RawMessage) (string, error) { | ||
| dec := json.NewDecoder(bytes.NewReader(event)) | ||
|
|
||
| if t, err := dec.Token(); err != nil || t != json.Delim('{') { | ||
| return "", errors.New("decode eventbridge source: expected '{'") | ||
| } | ||
|
|
||
| for dec.More() { | ||
| key, err := dec.Token() | ||
| if err != nil { | ||
| return "", fmt.Errorf("decode eventbridge source: read key: %w", err) | ||
| } | ||
| if key == "source" { | ||
| var source string | ||
| if err := dec.Decode(&source); err != nil { | ||
| return "", fmt.Errorf("decode eventbridge source: %w", err) | ||
| } | ||
| return eventBridgeSource(source), nil | ||
| } | ||
| var skip json.RawMessage | ||
| if err := dec.Decode(&skip); err != nil { | ||
| return "", fmt.Errorf("decode eventbridge source: skip field: %w", err) | ||
| } | ||
| } | ||
|
|
||
| return "", nil | ||
| } | ||
|
|
||
| func eventBridgeSource(source string) string { | ||
| _, after, found := strings.Cut(source, ".") | ||
| if found { | ||
| return after | ||
| } | ||
| return sourceCloudwatch | ||
| } | ||
132 changes: 132 additions & 0 deletions
132
aws/logs_monitoring_go/internal/handling/eventbridge_test.go
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,132 @@ | ||
| // Unless explicitly stated otherwise all files in this repository are licensed | ||
| // under the Apache License Version 2.0. | ||
| // This product includes software developed at Datadog (https://www.datadoghq.com/). | ||
| // Copyright 2026-Present Datadog, Inc. | ||
|
|
||
| package handling | ||
|
|
||
| import ( | ||
| "encoding/json" | ||
| "testing" | ||
|
|
||
| "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" | ||
| "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" | ||
| "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/testutil" | ||
| "github.com/google/go-cmp/cmp" | ||
| ) | ||
|
|
||
| func TestEventBridgeHandler_Handle(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| ctx := testutil.LambdaContext(t) | ||
|
|
||
| tests := map[string]struct { | ||
| event json.RawMessage | ||
| cfg *config.Config | ||
| want []model.LogEntry | ||
| wantErr bool | ||
| }{ | ||
| "scheduled event": { | ||
| event: json.RawMessage(`{"version":"0","id":"abc","detail-type":"Scheduled Event","source":"aws.events","account":"123456789012","time":"1970-01-01T00:00:00Z","region":"us-east-1","resources":[],"detail":{}}`), | ||
| cfg: testutil.EmptyConfig(), | ||
| want: []model.LogEntry{ | ||
| { | ||
| Message: `{"version":"0","id":"abc","detail-type":"Scheduled Event","source":"aws.events","account":"123456789012","time":"1970-01-01T00:00:00Z","region":"us-east-1","resources":[],"detail":{}}`, | ||
| Source: "events", | ||
| SourceCategory: "aws", | ||
| Service: "events", | ||
| Metadata: testutil.LambdaOrigin(), | ||
| }, | ||
| }, | ||
| }, | ||
| "ec2 event": { | ||
| event: json.RawMessage(`{"version":"0","id":"abc","detail-type":"EC2 Instance State-change Notification","source":"aws.ec2","account":"123456789012","time":"1970-01-01T00:00:00Z","region":"us-east-1","resources":[],"detail":{"instance-id":"i-123","state":"running"}}`), | ||
| cfg: testutil.EmptyConfig(), | ||
| want: []model.LogEntry{ | ||
| { | ||
| Message: `{"version":"0","id":"abc","detail-type":"EC2 Instance State-change Notification","source":"aws.ec2","account":"123456789012","time":"1970-01-01T00:00:00Z","region":"us-east-1","resources":[],"detail":{"instance-id":"i-123","state":"running"}}`, | ||
| Source: "ec2", | ||
| SourceCategory: "aws", | ||
| Service: "ec2", | ||
| Metadata: testutil.LambdaOrigin(), | ||
| }, | ||
| }, | ||
| }, | ||
| "custom source override": { | ||
| event: json.RawMessage(`{"version":"0","id":"abc","detail-type":"Scheduled Event","source":"aws.events","account":"123456789012","time":"1970-01-01T00:00:00Z","region":"us-east-1","resources":[],"detail":{}}`), | ||
| cfg: &config.Config{Source: "custom-source"}, | ||
| want: []model.LogEntry{ | ||
| { | ||
| Message: `{"version":"0","id":"abc","detail-type":"Scheduled Event","source":"aws.events","account":"123456789012","time":"1970-01-01T00:00:00Z","region":"us-east-1","resources":[],"detail":{}}`, | ||
| Source: "custom-source", | ||
| SourceCategory: "aws", | ||
| Service: "custom-source", | ||
| Metadata: testutil.LambdaOrigin(), | ||
| }, | ||
| }, | ||
| }, | ||
| "invalid JSON": { | ||
| event: json.RawMessage(`not json`), | ||
| cfg: testutil.EmptyConfig(), | ||
| wantErr: true, | ||
| }, | ||
| } | ||
|
|
||
| for name, tc := range tests { | ||
| t.Run(name, func(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| handler := NewEventBridge(tc.cfg) | ||
| out := make(chan model.LogEntry, len(tc.want)) | ||
|
|
||
| err := handler.Handle(ctx, tc.event, out) | ||
| close(out) | ||
|
|
||
| if tc.wantErr { | ||
| if err == nil { | ||
| t.Fatal("expected error, got nil") | ||
| } | ||
| return | ||
| } | ||
|
|
||
| if err != nil { | ||
| t.Fatalf("unexpected error: %v", err) | ||
| } | ||
|
|
||
| var got []model.LogEntry | ||
| for entry := range out { | ||
| got = append(got, entry) | ||
| } | ||
|
|
||
| if diff := cmp.Diff(tc.want, got); diff != "" { | ||
| t.Errorf("entries mismatch (-want +got):\n%s", diff) | ||
| } | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| func TestEventBridgeSource(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| tests := map[string]struct { | ||
| source string | ||
| want string | ||
| }{ | ||
| "aws.events": {source: "aws.events", want: "events"}, | ||
| "aws.ec2": {source: "aws.ec2", want: "ec2"}, | ||
| "aws.s3": {source: "aws.s3", want: "s3"}, | ||
| "custom.app": {source: "custom.app", want: "app"}, | ||
| "no dot": {source: "nodot", want: "cloudwatch"}, | ||
| "empty string": {source: "", want: "cloudwatch"}, | ||
| } | ||
|
|
||
| for name, tc := range tests { | ||
| t.Run(name, func(t *testing.T) { | ||
| t.Parallel() | ||
| got := eventBridgeSource(tc.source) | ||
| if got != tc.want { | ||
| t.Errorf("got %q, want %q", got, tc.want) | ||
| } | ||
| }) | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| // Unless explicitly stated otherwise all files in this repository are licensed | ||
| // under the Apache License Version 2.0. | ||
| // This product includes software developed at Datadog (https://www.datadoghq.com/). | ||
| // Copyright 2026-Present Datadog, Inc. | ||
|
|
||
| package parsing | ||
|
|
||
| import "encoding/json" | ||
|
|
||
| //go:generate stringer -type ContentType -trimprefix ContentType -output content_string.go | ||
| type ContentType int | ||
|
|
||
| const ( | ||
| ContentTypeUnknown ContentType = iota | ||
| ContentTypeCloudwatchLogs | ||
| ContentTypeS3 | ||
| ContentTypeKinesis | ||
| ContentTypeEventBridge | ||
| ) | ||
|
|
||
| type ParsedEvent struct { | ||
| ContentType ContentType | ||
| Payload json.RawMessage | ||
| } |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.