Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 5 additions & 16 deletions aws/logs_monitoring_go/cmd/forwarder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,9 @@ package main
import (
"context"
"encoding/json"
"errors"
"fmt"

"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/forwarding"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/handling"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/parsing"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/pipeline"

Expand All @@ -33,24 +31,15 @@ func main() {
panic(err)
}

cwHandler := handling.NewCloudwatch(cfg)
kinesisHandler := handling.NewKinesis(cfg)
s3Handler := handling.NewS3(cfg)
handling.Register(parsing.InvocationSourceCloudwatchLogs, cwHandler)
handling.Register(parsing.InvocationSourceKinesis, kinesisHandler)
handling.Register(parsing.InvocationSourceS3, s3Handler)

lambda.Start(handleRequest(cfg))
}

func handleRequest(cfg *config.Config) func(ctx context.Context, event json.RawMessage) error {
return func(ctx context.Context, event json.RawMessage) error {
invocation := parsing.DetectInvocationSource(event)
if invocation == parsing.InvocationSourceUnknown {
return errors.New("unknown invocation")
parsed, err := parsing.Parse(event)
if err != nil {
return fmt.Errorf("parse: %w", err)
}

run := pipeline.NewRun(cfg, handling.Handlers[invocation], forwarding.Storage(invocation))
return pipeline.Start(ctx, event, run)
return pipeline.Start(ctx, parsed, cfg)
}
}
8 changes: 4 additions & 4 deletions aws/logs_monitoring_go/internal/forwarding/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ const (
s3Storage = "s3"
)

func Storage(source parsing.InvocationSource) string {
switch source {
case parsing.InvocationSourceS3:
func StorageFromContentType(contentType parsing.ContentType) string {
switch contentType {
case parsing.ContentTypeS3:
return s3Storage
case parsing.InvocationSourceCloudwatchLogs, parsing.InvocationSourceKinesis:
case parsing.ContentTypeCloudwatchLogs, parsing.ContentTypeKinesis:
return cloudwatchStorage
default:
return ""
Expand Down
2 changes: 1 addition & 1 deletion aws/logs_monitoring_go/internal/handling/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func NewCloudwatch(cfg *config.Config) *CloudwatchHandler {
}
}

func (h CloudwatchHandler) Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error {
func (h *CloudwatchHandler) Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error {
var cwEvent events.CloudwatchLogsEvent
if err := json.Unmarshal(event, &cwEvent); err != nil {
return fmt.Errorf("unmarshal: %w", err)
Expand Down
94 changes: 94 additions & 0 deletions aws/logs_monitoring_go/internal/handling/eventbridge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2026-Present Datadog, Inc.

package handling

import (
"bytes"
"cmp"
"context"
"encoding/json"
"errors"
"fmt"
"strings"

"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/concurrent"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model"
)

type EventBridgeHandler struct {
cfg *config.Config
}

func NewEventBridge(cfg *config.Config) *EventBridgeHandler {
return &EventBridgeHandler{
cfg: cfg,
}
}

func (h *EventBridgeHandler) Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error {
lambdaOrigin, err := model.GetLambdaOrigin(ctx)
if err != nil {
return fmt.Errorf("get lambda origin: %w", err)
}

ebSource, err := decodeEventBridgeSource(event)
if err != nil {
return err
}
source := cmp.Or(h.cfg.Source, ebSource)
service := cmp.Or(h.cfg.Service, source)

entry := model.NewLogEntry()
entry.Message = string(event)
entry.Source = source
entry.Service = service
entry.Tags = h.cfg.Tags
entry.Metadata = lambdaOrigin

if h.cfg.Filter.ShouldExclude(entry.Message) {
return nil
}

entry.Message = h.cfg.Scrubber.Scrub(entry.Message)
return concurrent.SafeSender(ctx, out, entry)
}

func decodeEventBridgeSource(event json.RawMessage) (string, error) {
dec := json.NewDecoder(bytes.NewReader(event))

if t, err := dec.Token(); err != nil || t != json.Delim('{') {
return "", errors.New("decode eventbridge source: expected '{'")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return "", errors.New("decode eventbridge source: expected '{'")
return "", errors.New("decode source: expected '{'")

}

for dec.More() {
key, err := dec.Token()
if err != nil {
return "", fmt.Errorf("decode eventbridge source: read key: %w", err)
}
if key == "source" {
var source string
if err := dec.Decode(&source); err != nil {
return "", fmt.Errorf("decode eventbridge source: %w", err)
}
return eventBridgeSource(source), nil
}
var skip json.RawMessage
if err := dec.Decode(&skip); err != nil {
return "", fmt.Errorf("decode eventbridge source: skip field: %w", err)
}
}

return "", nil
}

func eventBridgeSource(source string) string {
_, after, found := strings.Cut(source, ".")
if found {
return after
}
return sourceCloudwatch
}
132 changes: 132 additions & 0 deletions aws/logs_monitoring_go/internal/handling/eventbridge_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2026-Present Datadog, Inc.

package handling

import (
"encoding/json"
"testing"

"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/testutil"
"github.com/google/go-cmp/cmp"
)

func TestEventBridgeHandler_Handle(t *testing.T) {
t.Parallel()

ctx := testutil.LambdaContext(t)

tests := map[string]struct {
event json.RawMessage
cfg *config.Config
want []model.LogEntry
wantErr bool
}{
"scheduled event": {
event: json.RawMessage(`{"version":"0","id":"abc","detail-type":"Scheduled Event","source":"aws.events","account":"123456789012","time":"1970-01-01T00:00:00Z","region":"us-east-1","resources":[],"detail":{}}`),
cfg: testutil.EmptyConfig(),
want: []model.LogEntry{
{
Message: `{"version":"0","id":"abc","detail-type":"Scheduled Event","source":"aws.events","account":"123456789012","time":"1970-01-01T00:00:00Z","region":"us-east-1","resources":[],"detail":{}}`,
Source: "events",
SourceCategory: "aws",
Service: "events",
Metadata: testutil.LambdaOrigin(),
},
},
},
"ec2 event": {
event: json.RawMessage(`{"version":"0","id":"abc","detail-type":"EC2 Instance State-change Notification","source":"aws.ec2","account":"123456789012","time":"1970-01-01T00:00:00Z","region":"us-east-1","resources":[],"detail":{"instance-id":"i-123","state":"running"}}`),
cfg: testutil.EmptyConfig(),
want: []model.LogEntry{
{
Message: `{"version":"0","id":"abc","detail-type":"EC2 Instance State-change Notification","source":"aws.ec2","account":"123456789012","time":"1970-01-01T00:00:00Z","region":"us-east-1","resources":[],"detail":{"instance-id":"i-123","state":"running"}}`,
Source: "ec2",
SourceCategory: "aws",
Service: "ec2",
Metadata: testutil.LambdaOrigin(),
},
},
},
"custom source override": {
event: json.RawMessage(`{"version":"0","id":"abc","detail-type":"Scheduled Event","source":"aws.events","account":"123456789012","time":"1970-01-01T00:00:00Z","region":"us-east-1","resources":[],"detail":{}}`),
cfg: &config.Config{Source: "custom-source"},
want: []model.LogEntry{
{
Message: `{"version":"0","id":"abc","detail-type":"Scheduled Event","source":"aws.events","account":"123456789012","time":"1970-01-01T00:00:00Z","region":"us-east-1","resources":[],"detail":{}}`,
Source: "custom-source",
SourceCategory: "aws",
Service: "custom-source",
Metadata: testutil.LambdaOrigin(),
},
},
},
"invalid JSON": {
event: json.RawMessage(`not json`),
cfg: testutil.EmptyConfig(),
wantErr: true,
},
}

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
t.Parallel()

handler := NewEventBridge(tc.cfg)
out := make(chan model.LogEntry, len(tc.want))

err := handler.Handle(ctx, tc.event, out)
close(out)

if tc.wantErr {
if err == nil {
t.Fatal("expected error, got nil")
}
return
}

if err != nil {
t.Fatalf("unexpected error: %v", err)
}

var got []model.LogEntry
for entry := range out {
got = append(got, entry)
}

if diff := cmp.Diff(tc.want, got); diff != "" {
t.Errorf("entries mismatch (-want +got):\n%s", diff)
}
})
}
}

func TestEventBridgeSource(t *testing.T) {
t.Parallel()

tests := map[string]struct {
source string
want string
}{
"aws.events": {source: "aws.events", want: "events"},
"aws.ec2": {source: "aws.ec2", want: "ec2"},
"aws.s3": {source: "aws.s3", want: "s3"},
"custom.app": {source: "custom.app", want: "app"},
"no dot": {source: "nodot", want: "cloudwatch"},
"empty string": {source: "", want: "cloudwatch"},
}

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
t.Parallel()
got := eventBridgeSource(tc.source)
if got != tc.want {
t.Errorf("got %q, want %q", got, tc.want)
}
})
}
}
19 changes: 15 additions & 4 deletions aws/logs_monitoring_go/internal/handling/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,28 @@ package handling
import (
"context"
"encoding/json"
"fmt"

"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/parsing"
)

var Handlers = make(map[parsing.InvocationSource]Handler)

type Handler interface {
Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error
}

func Register(invocation parsing.InvocationSource, handler Handler) {
Handlers[invocation] = handler
func NewHandler(ct parsing.ContentType, cfg *config.Config) (Handler, error) {
switch ct {
case parsing.ContentTypeCloudwatchLogs:
return NewCloudwatch(cfg), nil
case parsing.ContentTypeS3:
return NewS3(cfg), nil
case parsing.ContentTypeKinesis:
return NewKinesis(cfg), nil
case parsing.ContentTypeEventBridge:
return NewEventBridge(cfg), nil
default:
return nil, fmt.Errorf("unsupported content type: %v", ct)
}
}
4 changes: 2 additions & 2 deletions aws/logs_monitoring_go/internal/handling/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ func NewKinesis(cfg *config.Config) *KinesisHandler {
}
}

func (h KinesisHandler) Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error {
func (h *KinesisHandler) Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error {
var kinesisEvent events.KinesisEvent
if err := json.Unmarshal(event, &kinesisEvent); err != nil {
return fmt.Errorf("unmarshal: %w", err)
}

cw := CloudwatchHandler(h)
cw := CloudwatchHandler(*h)
for i, record := range kinesisEvent.Records {
cwData, err := decompressCloudwatchLogs(record.Kinesis.Data)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion aws/logs_monitoring_go/internal/handling/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func NewS3(cfg *config.Config) *S3Handler {
}
}

func (h S3Handler) Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error {
func (h *S3Handler) Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error {
var s3Event events.S3Event
if err := json.Unmarshal(event, &s3Event); err != nil {
return fmt.Errorf("unmarshal: %w", err)
Expand Down
24 changes: 24 additions & 0 deletions aws/logs_monitoring_go/internal/parsing/content.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2026-Present Datadog, Inc.

package parsing

import "encoding/json"

//go:generate stringer -type ContentType -trimprefix ContentType -output content_string.go
type ContentType int

const (
ContentTypeUnknown ContentType = iota
ContentTypeCloudwatchLogs
ContentTypeS3
ContentTypeKinesis
ContentTypeEventBridge
)

type ParsedEvent struct {
ContentType ContentType
Payload json.RawMessage
}
Loading
Loading