Skip to content

Commit 8f7c8f2

Browse files
committed
feat(go-forwarder): add EventBridge
1 parent c518f8b commit 8f7c8f2

19 files changed

Lines changed: 653 additions & 299 deletions

aws/logs_monitoring_go/cmd/forwarder/main.go

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,9 @@ package main
88
import (
99
"context"
1010
"encoding/json"
11-
"errors"
11+
"fmt"
1212

1313
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config"
14-
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/forwarding"
15-
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/handling"
1614
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/parsing"
1715
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/pipeline"
1816

@@ -33,24 +31,15 @@ func main() {
3331
panic(err)
3432
}
3533

36-
cwHandler := handling.NewCloudwatch(cfg)
37-
kinesisHandler := handling.NewKinesis(cfg)
38-
s3Handler := handling.NewS3(cfg)
39-
handling.Register(parsing.InvocationSourceCloudwatchLogs, cwHandler)
40-
handling.Register(parsing.InvocationSourceKinesis, kinesisHandler)
41-
handling.Register(parsing.InvocationSourceS3, s3Handler)
42-
4334
lambda.Start(handleRequest(cfg))
4435
}
4536

4637
func handleRequest(cfg *config.Config) func(ctx context.Context, event json.RawMessage) error {
4738
return func(ctx context.Context, event json.RawMessage) error {
48-
invocation := parsing.DetectInvocationSource(event)
49-
if invocation == parsing.InvocationSourceUnknown {
50-
return errors.New("unknown invocation")
39+
parsed, err := parsing.Parse(event)
40+
if err != nil {
41+
return fmt.Errorf("parse: %w", err)
5142
}
52-
53-
run := pipeline.NewRun(cfg, handling.Handlers[invocation], forwarding.Storage(invocation))
54-
return pipeline.Start(ctx, event, run)
43+
return pipeline.Start(ctx, parsed, cfg)
5544
}
5645
}

aws/logs_monitoring_go/internal/forwarding/storage.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,11 @@ const (
1212
s3Storage = "s3"
1313
)
1414

15-
func Storage(source parsing.InvocationSource) string {
16-
switch source {
17-
case parsing.InvocationSourceS3:
15+
func StorageFromContentType(contentType parsing.ContentType) string {
16+
switch contentType {
17+
case parsing.ContentTypeS3:
1818
return s3Storage
19-
case parsing.InvocationSourceCloudwatchLogs, parsing.InvocationSourceKinesis:
19+
case parsing.ContentTypeCloudwatchLogs, parsing.ContentTypeKinesis:
2020
return cloudwatchStorage
2121
default:
2222
return ""

aws/logs_monitoring_go/internal/handling/cloudwatch.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func NewCloudwatch(cfg *config.Config) *CloudwatchHandler {
4242
}
4343
}
4444

45-
func (h CloudwatchHandler) Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error {
45+
func (h *CloudwatchHandler) Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error {
4646
var cwEvent events.CloudwatchLogsEvent
4747
if err := json.Unmarshal(event, &cwEvent); err != nil {
4848
return fmt.Errorf("unmarshal: %w", err)
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed
2+
// under the Apache License Version 2.0.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
// Copyright 2026-Present Datadog, Inc.
5+
6+
package handling
7+
8+
import (
9+
"bytes"
10+
"cmp"
11+
"context"
12+
"encoding/json"
13+
"errors"
14+
"fmt"
15+
"strings"
16+
17+
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/concurrent"
18+
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config"
19+
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model"
20+
)
21+
22+
type EventBridgeHandler struct {
23+
cfg *config.Config
24+
}
25+
26+
func NewEventBridge(cfg *config.Config) *EventBridgeHandler {
27+
return &EventBridgeHandler{
28+
cfg: cfg,
29+
}
30+
}
31+
32+
func (h *EventBridgeHandler) Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error {
33+
lambdaOrigin, err := model.GetLambdaOrigin(ctx)
34+
if err != nil {
35+
return fmt.Errorf("get lambda origin: %w", err)
36+
}
37+
38+
ebSource, err := decodeEventBridgeSource(event)
39+
if err != nil {
40+
return err
41+
}
42+
source := cmp.Or(h.cfg.Source, ebSource)
43+
service := cmp.Or(h.cfg.Service, source)
44+
45+
entry := model.NewLogEntry()
46+
entry.Message = string(event)
47+
entry.Source = source
48+
entry.Service = service
49+
entry.Tags = h.cfg.Tags
50+
entry.Metadata = lambdaOrigin
51+
52+
if h.cfg.Filter.ShouldExclude(entry.Message) {
53+
return nil
54+
}
55+
56+
entry.Message = h.cfg.Scrubber.Scrub(entry.Message)
57+
return concurrent.SafeSender(ctx, out, entry)
58+
}
59+
60+
func decodeEventBridgeSource(event json.RawMessage) (string, error) {
61+
dec := json.NewDecoder(bytes.NewReader(event))
62+
63+
if t, err := dec.Token(); err != nil || t != json.Delim('{') {
64+
return "", errors.New("decode eventbridge source: expected '{'")
65+
}
66+
67+
for dec.More() {
68+
key, err := dec.Token()
69+
if err != nil {
70+
return "", fmt.Errorf("decode eventbridge source: read key: %w", err)
71+
}
72+
if key == "source" {
73+
var source string
74+
if err := dec.Decode(&source); err != nil {
75+
return "", fmt.Errorf("decode eventbridge source: %w", err)
76+
}
77+
return eventBridgeSource(source), nil
78+
}
79+
var skip json.RawMessage
80+
if err := dec.Decode(&skip); err != nil {
81+
return "", fmt.Errorf("decode eventbridge source: skip field: %w", err)
82+
}
83+
}
84+
85+
return "", nil
86+
}
87+
88+
func eventBridgeSource(source string) string {
89+
_, after, found := strings.Cut(source, ".")
90+
if found {
91+
return after
92+
}
93+
return sourceCloudwatch
94+
}
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed
2+
// under the Apache License Version 2.0.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
// Copyright 2026-Present Datadog, Inc.
5+
6+
package handling
7+
8+
import (
9+
"encoding/json"
10+
"testing"
11+
12+
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config"
13+
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model"
14+
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/testutil"
15+
"github.com/google/go-cmp/cmp"
16+
)
17+
18+
func TestEventBridgeHandler_Handle(t *testing.T) {
19+
t.Parallel()
20+
21+
ctx := testutil.LambdaContext(t)
22+
23+
tests := map[string]struct {
24+
event json.RawMessage
25+
cfg *config.Config
26+
want []model.LogEntry
27+
wantErr bool
28+
}{
29+
"scheduled event": {
30+
event: json.RawMessage(`{"version":"0","id":"abc","detail-type":"Scheduled Event","source":"aws.events","account":"123456789012","time":"1970-01-01T00:00:00Z","region":"us-east-1","resources":[],"detail":{}}`),
31+
cfg: testutil.EmptyConfig(),
32+
want: []model.LogEntry{
33+
{
34+
Message: `{"version":"0","id":"abc","detail-type":"Scheduled Event","source":"aws.events","account":"123456789012","time":"1970-01-01T00:00:00Z","region":"us-east-1","resources":[],"detail":{}}`,
35+
Source: "events",
36+
SourceCategory: "aws",
37+
Service: "events",
38+
Metadata: testutil.LambdaOrigin(),
39+
},
40+
},
41+
},
42+
"ec2 event": {
43+
event: json.RawMessage(`{"version":"0","id":"abc","detail-type":"EC2 Instance State-change Notification","source":"aws.ec2","account":"123456789012","time":"1970-01-01T00:00:00Z","region":"us-east-1","resources":[],"detail":{"instance-id":"i-123","state":"running"}}`),
44+
cfg: testutil.EmptyConfig(),
45+
want: []model.LogEntry{
46+
{
47+
Message: `{"version":"0","id":"abc","detail-type":"EC2 Instance State-change Notification","source":"aws.ec2","account":"123456789012","time":"1970-01-01T00:00:00Z","region":"us-east-1","resources":[],"detail":{"instance-id":"i-123","state":"running"}}`,
48+
Source: "ec2",
49+
SourceCategory: "aws",
50+
Service: "ec2",
51+
Metadata: testutil.LambdaOrigin(),
52+
},
53+
},
54+
},
55+
"custom source override": {
56+
event: json.RawMessage(`{"version":"0","id":"abc","detail-type":"Scheduled Event","source":"aws.events","account":"123456789012","time":"1970-01-01T00:00:00Z","region":"us-east-1","resources":[],"detail":{}}`),
57+
cfg: &config.Config{Source: "custom-source"},
58+
want: []model.LogEntry{
59+
{
60+
Message: `{"version":"0","id":"abc","detail-type":"Scheduled Event","source":"aws.events","account":"123456789012","time":"1970-01-01T00:00:00Z","region":"us-east-1","resources":[],"detail":{}}`,
61+
Source: "custom-source",
62+
SourceCategory: "aws",
63+
Service: "custom-source",
64+
Metadata: testutil.LambdaOrigin(),
65+
},
66+
},
67+
},
68+
"invalid JSON": {
69+
event: json.RawMessage(`not json`),
70+
cfg: testutil.EmptyConfig(),
71+
wantErr: true,
72+
},
73+
}
74+
75+
for name, tc := range tests {
76+
t.Run(name, func(t *testing.T) {
77+
t.Parallel()
78+
79+
handler := NewEventBridge(tc.cfg)
80+
out := make(chan model.LogEntry, len(tc.want))
81+
82+
err := handler.Handle(ctx, tc.event, out)
83+
close(out)
84+
85+
if tc.wantErr {
86+
if err == nil {
87+
t.Fatal("expected error, got nil")
88+
}
89+
return
90+
}
91+
92+
if err != nil {
93+
t.Fatalf("unexpected error: %v", err)
94+
}
95+
96+
var got []model.LogEntry
97+
for entry := range out {
98+
got = append(got, entry)
99+
}
100+
101+
if diff := cmp.Diff(tc.want, got); diff != "" {
102+
t.Errorf("entries mismatch (-want +got):\n%s", diff)
103+
}
104+
})
105+
}
106+
}
107+
108+
func TestEventBridgeSource(t *testing.T) {
109+
t.Parallel()
110+
111+
tests := map[string]struct {
112+
source string
113+
want string
114+
}{
115+
"aws.events": {source: "aws.events", want: "events"},
116+
"aws.ec2": {source: "aws.ec2", want: "ec2"},
117+
"aws.s3": {source: "aws.s3", want: "s3"},
118+
"custom.app": {source: "custom.app", want: "app"},
119+
"no dot": {source: "nodot", want: "cloudwatch"},
120+
"empty string": {source: "", want: "cloudwatch"},
121+
}
122+
123+
for name, tc := range tests {
124+
t.Run(name, func(t *testing.T) {
125+
t.Parallel()
126+
got := eventBridgeSource(tc.source)
127+
if got != tc.want {
128+
t.Errorf("got %q, want %q", got, tc.want)
129+
}
130+
})
131+
}
132+
}

aws/logs_monitoring_go/internal/handling/handler.go

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,28 @@ package handling
88
import (
99
"context"
1010
"encoding/json"
11+
"fmt"
1112

13+
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config"
1214
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model"
1315
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/parsing"
1416
)
1517

16-
var Handlers = make(map[parsing.InvocationSource]Handler)
17-
1818
type Handler interface {
1919
Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error
2020
}
2121

22-
func Register(invocation parsing.InvocationSource, handler Handler) {
23-
Handlers[invocation] = handler
22+
func NewHandler(ct parsing.ContentType, cfg *config.Config) (Handler, error) {
23+
switch ct {
24+
case parsing.ContentTypeCloudwatchLogs:
25+
return NewCloudwatch(cfg), nil
26+
case parsing.ContentTypeS3:
27+
return NewS3(cfg), nil
28+
case parsing.ContentTypeKinesis:
29+
return NewKinesis(cfg), nil
30+
case parsing.ContentTypeEventBridge:
31+
return NewEventBridge(cfg), nil
32+
default:
33+
return nil, fmt.Errorf("unsupported content type: %v", ct)
34+
}
2435
}

aws/logs_monitoring_go/internal/handling/kinesis.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,13 @@ func NewKinesis(cfg *config.Config) *KinesisHandler {
2626
}
2727
}
2828

29-
func (h KinesisHandler) Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error {
29+
func (h *KinesisHandler) Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error {
3030
var kinesisEvent events.KinesisEvent
3131
if err := json.Unmarshal(event, &kinesisEvent); err != nil {
3232
return fmt.Errorf("unmarshal: %w", err)
3333
}
3434

35-
cw := CloudwatchHandler(h)
35+
cw := CloudwatchHandler(*h)
3636
for i, record := range kinesisEvent.Records {
3737
cwData, err := decompressCloudwatchLogs(record.Kinesis.Data)
3838
if err != nil {

aws/logs_monitoring_go/internal/handling/s3.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func NewS3(cfg *config.Config) *S3Handler {
3838
}
3939
}
4040

41-
func (h S3Handler) Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error {
41+
func (h *S3Handler) Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error {
4242
var s3Event events.S3Event
4343
if err := json.Unmarshal(event, &s3Event); err != nil {
4444
return fmt.Errorf("unmarshal: %w", err)
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed
2+
// under the Apache License Version 2.0.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
// Copyright 2026-Present Datadog, Inc.
5+
6+
package parsing
7+
8+
import "encoding/json"
9+
10+
//go:generate stringer -type ContentType -trimprefix ContentType -output content_string.go
11+
type ContentType int
12+
13+
const (
14+
ContentTypeUnknown ContentType = iota
15+
ContentTypeCloudwatchLogs
16+
ContentTypeS3
17+
ContentTypeKinesis
18+
ContentTypeEventBridge
19+
)
20+
21+
type ParsedEvent struct {
22+
ContentType ContentType
23+
Payload json.RawMessage
24+
}

0 commit comments

Comments
 (0)