Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 6 additions & 13 deletions aws/logs_monitoring_go/internal/handling/eventbridge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import (
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/testutil"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestEventBridgeHandler_Handle(t *testing.T) {
Expand Down Expand Up @@ -83,24 +84,18 @@ func TestEventBridgeHandler_Handle(t *testing.T) {
close(out)

if tc.wantErr {
if err == nil {
t.Fatal("expected error, got nil")
}
require.Error(t, err)
return
}

if err != nil {
t.Fatalf("unexpected error: %v", err)
}
require.NoError(t, err)

var got []model.LogEntry
for entry := range out {
got = append(got, entry)
}

if diff := cmp.Diff(tc.want, got); diff != "" {
t.Errorf("entries mismatch (-want +got):\n%s", diff)
}
assert.Equal(t, tc.want, got)
})
}
}
Expand All @@ -124,9 +119,7 @@ func TestEventBridgeSource(t *testing.T) {
t.Run(name, func(t *testing.T) {
t.Parallel()
got := eventBridgeSource(tc.source)
if got != tc.want {
t.Errorf("got %q, want %q", got, tc.want)
}
assert.Equal(t, tc.want, got)
})
}
}
2 changes: 2 additions & 0 deletions aws/logs_monitoring_go/internal/handling/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ func NewHandler(ct parsing.ContentType, cfg *config.Config) (Handler, error) {
return NewKinesis(cfg), nil
case parsing.ContentTypeEventBridge:
return NewEventBridge(cfg), nil
case parsing.ContentTypeSNS:
return NewSNS(cfg), nil
default:
return nil, fmt.Errorf("unsupported content type: %v", ct)
}
Expand Down
52 changes: 52 additions & 0 deletions aws/logs_monitoring_go/internal/handling/sns.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2026-Present Datadog, Inc.

package handling

import (
"cmp"
"context"
"encoding/json"
"fmt"

"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/concurrent"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model"
)

type SNSHandler struct {
cfg *config.Config
}

func NewSNS(cfg *config.Config) *SNSHandler {
return &SNSHandler{
cfg: cfg,
}
}

func (h *SNSHandler) Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error {
lambdaOrigin, err := model.GetLambdaOrigin(ctx)
if err != nil {
return fmt.Errorf("get lambda origin: %w", err)
}

message := string(event)
if h.cfg.Filter.ShouldExclude(message) {
return nil
}

source := cmp.Or(h.cfg.Source, sourceSNS)
service := cmp.Or(h.cfg.Service, source)

entry := model.NewLogEntry()
entry.Message = message
entry.Source = source
entry.Service = service
entry.Tags = h.cfg.Tags
entry.Metadata = lambdaOrigin

entry.Message = h.cfg.Scrubber.Scrub(entry.Message)
return concurrent.SafeSender(ctx, out, entry)
}
92 changes: 92 additions & 0 deletions aws/logs_monitoring_go/internal/handling/sns_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2026-Present Datadog, Inc.

package handling

import (
"encoding/json"
"testing"

"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestSNSHandler_Handle(t *testing.T) {
t.Parallel()

ctx := testutil.LambdaContext(t)

snsEvent := json.RawMessage(`{"EventSource":"aws:sns","Sns":{"Type":"Notification","Message":"hello world","TopicArn":"arn:aws:sns:us-east-1:123456789012:my-topic"}}`)

tests := map[string]struct {
event json.RawMessage
cfg *config.Config
want []model.LogEntry
}{
"basic event": {
event: snsEvent,
cfg: testutil.EmptyConfig(),
want: []model.LogEntry{
{
Message: string(snsEvent),
Source: "sns",
SourceCategory: "aws",
Service: "sns",
Metadata: testutil.LambdaOrigin(),
},
},
},
"custom source override": {
event: snsEvent,
cfg: &config.Config{Source: "custom-source"},
want: []model.LogEntry{
{
Message: string(snsEvent),
Source: "custom-source",
SourceCategory: "aws",
Service: "custom-source",
Metadata: testutil.LambdaOrigin(),
},
},
},
"custom service override": {
event: snsEvent,
cfg: &config.Config{Service: "my-svc"},
want: []model.LogEntry{
{
Message: string(snsEvent),
Source: "sns",
SourceCategory: "aws",
Service: "my-svc",
Metadata: testutil.LambdaOrigin(),
},
},
},
}

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
t.Parallel()

handler := NewSNS(tc.cfg)
out := make(chan model.LogEntry, len(tc.want))

err := handler.Handle(ctx, tc.event, out)
close(out)

require.NoError(t, err)

var got []model.LogEntry
for entry := range out {
got = append(got, entry)
}

assert.Equal(t, tc.want, got)
})
}
}
1 change: 1 addition & 0 deletions aws/logs_monitoring_go/internal/parsing/content.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const (
ContentTypeS3
ContentTypeKinesis
ContentTypeEventBridge
ContentTypeSNS
)

type ParsedEvent struct {
Expand Down
5 changes: 3 additions & 2 deletions aws/logs_monitoring_go/internal/parsing/content_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions aws/logs_monitoring_go/internal/parsing/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2026-Present Datadog, Inc.

package parsing

import "fmt"

type KeyNotFoundError struct {
Key string
}

func (e *KeyNotFoundError) Error() string {
return fmt.Sprintf("%s key not found", e.Key)
}

func (e *KeyNotFoundError) Is(target error) bool {
_, ok := target.(*KeyNotFoundError)
return ok
}
20 changes: 8 additions & 12 deletions aws/logs_monitoring_go/internal/parsing/eventbridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package parsing
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"strings"

Expand Down Expand Up @@ -39,9 +38,8 @@ func parseEventBridge(event json.RawMessage) ([]ParsedEvent, error) {

func decodeEventBridgeEnvelope(event json.RawMessage) (source, detailType string, detail json.RawMessage, err error) {
dec := json.NewDecoder(bytes.NewReader(event))

if t, err := dec.Token(); err != nil || t != json.Delim('{') {
return "", "", nil, errors.New("expected '{'")
if err := skipBrace(dec); err != nil {
return "", "", nil, err
}

for dec.More() {
Expand All @@ -64,9 +62,8 @@ func decodeEventBridgeEnvelope(event json.RawMessage) (source, detailType string
return "", "", nil, fmt.Errorf("detail: %w", err)
}
default:
var skip json.RawMessage
if err := dec.Decode(&skip); err != nil {
return "", "", nil, fmt.Errorf("skip field: %w", err)
if err := skip(dec); err != nil {
return "", "", nil, err
}
}
}
Expand Down Expand Up @@ -99,8 +96,8 @@ func buildS3EventFromEventBridge(detail json.RawMessage) (json.RawMessage, error
func decodeEventBridgeS3Detail(detail json.RawMessage) (bucket, key string, err error) {
dec := json.NewDecoder(bytes.NewReader(detail))

if t, err := dec.Token(); err != nil || t != json.Delim('{') {
return "", "", errors.New("expected '{'")
if err := skipBrace(dec); err != nil {
return "", "", err
}

for dec.More() {
Expand All @@ -127,9 +124,8 @@ func decodeEventBridgeS3Detail(detail json.RawMessage) (bucket, key string, err
}
key = o.Key
default:
var skip json.RawMessage
if err := dec.Decode(&skip); err != nil {
return "", "", fmt.Errorf("skip field: %w", err)
if err := skip(dec); err != nil {
return "", "", err
}
}
}
Expand Down
Loading
Loading