Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions aws/logs_monitoring_go/internal/handling/cloudtrail.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package handling

import (
"compress/gzip"
"encoding/json"
"fmt"
"io"
Expand All @@ -25,14 +24,7 @@ var (

func decodeCloudTrail(r io.Reader) iter.Seq2[string, error] {
return func(yield func(string, error) bool) {
gz, err := gzip.NewReader(r)
Comment thread
ge0Aja marked this conversation as resolved.
if err != nil {
yield("", fmt.Errorf("gzip: %w", err))
return
}
defer gz.Close() //nolint:errcheck

dec := json.NewDecoder(gz)
dec := json.NewDecoder(r)
if err := parsing.SkipToRecords(dec); err != nil {
yield("", fmt.Errorf("cloudtrail: %w", err))
return
Expand Down
31 changes: 5 additions & 26 deletions aws/logs_monitoring_go/internal/handling/cloudtrail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"bytes"
"testing"

"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -117,44 +116,24 @@ func TestDecodeCloudTrail(t *testing.T) {
wantErr bool
}{
"single record": {
input: testutil.MustGzipJSON(t, map[string]any{
"Records": []any{
map[string]any{
"eventName": "DescribeTable",
"userIdentity": map[string]any{
"arn": "arn:aws:sts::601427279990:assumed-role/MyRole/i-08014e4f62ccf762d",
},
},
},
}),
input: []byte(`{"Records":[{"eventName":"DescribeTable","userIdentity":{"arn":"arn:aws:sts::601427279990:assumed-role/MyRole/i-08014e4f62ccf762d"}}]}`),
want: []string{
`{"eventName":"DescribeTable","userIdentity":{"arn":"arn:aws:sts::601427279990:assumed-role/MyRole/i-08014e4f62ccf762d"}}`,
},
},
"multiple records": {
input: testutil.MustGzipJSON(t, map[string]any{
"Records": []any{
map[string]any{"eventName": "event1"},
map[string]any{"eventName": "event2"},
},
}),
input: []byte(`{"Records":[{"eventName":"event1"},{"eventName":"event2"}]}`),
want: []string{
`{"eventName":"event1"}`,
`{"eventName":"event2"}`,
},
},
"empty records array": {
input: testutil.MustGzipJSON(t, map[string]any{
"Records": []any{},
}),
want: nil,
},
"invalid gzip": {
input: []byte("not gzip"),
wantErr: true,
input: []byte(`{"Records":[]}`),
want: nil,
},
"invalid json": {
input: testutil.MustGzipJSON(t, "not an object"),
input: []byte("not json"),
wantErr: true,
},
}
Expand Down
94 changes: 57 additions & 37 deletions aws/logs_monitoring_go/internal/handling/eventbridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ import (
"cmp"
"context"
"encoding/json"
"errors"
"fmt"
"strings"

"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/concurrent"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/parsing"
)

type EventBridgeHandler struct {
Expand All @@ -35,60 +35,80 @@ func (h *EventBridgeHandler) Handle(ctx context.Context, event json.RawMessage,
return fmt.Errorf("get lambda origin: %w", err)
}

ebSource, err := decodeEventBridgeSource(event)
source, err := eventBridgeSource(event)
if err != nil {
return err
return fmt.Errorf("source: %w", err)
}
source := cmp.Or(h.cfg.Source, ebSource)
service := cmp.Or(h.cfg.Service, source)

entry := model.NewLogEntry()
entry.Message = string(event)
entry.Source = source
entry.Service = service
entry.Tags = h.cfg.Tags
entry.Metadata = lambdaOrigin
switch source {
case sourceSecurityHub:
return h.securityHub(ctx, event, source, out, lambdaOrigin)
default:
return h.eventBridge(ctx, event, source, out, lambdaOrigin)
}
}

if h.cfg.Filter.ShouldExclude(entry.Message) {
func (h *EventBridgeHandler) eventBridge(ctx context.Context, event json.RawMessage, source string, out chan<- model.LogEntry, lambdaOrigin model.LambdaOrigin) error {
message := string(event)
if h.cfg.Filter.ShouldExclude(message) {
return nil
}

entry.Message = h.cfg.Scrubber.Scrub(entry.Message)
entry := h.newEntry(source, lambdaOrigin)
entry.Message = h.cfg.Scrubber.Scrub(message)

return concurrent.SafeSender(ctx, out, entry)
}

func decodeEventBridgeSource(event json.RawMessage) (string, error) {
dec := json.NewDecoder(bytes.NewReader(event))

if t, err := dec.Token(); err != nil || t != json.Delim('{') {
return "", errors.New("decode eventbridge source: expected '{'")
func (h *EventBridgeHandler) securityHub(ctx context.Context, event json.RawMessage, source string, out chan<- model.LogEntry, lambdaOrigin model.LambdaOrigin) error {
messages := separateFindings(event)
if len(messages) == 0 {
return h.eventBridge(ctx, event, source, out, lambdaOrigin)
}

for dec.More() {
key, err := dec.Token()
if err != nil {
return "", fmt.Errorf("decode eventbridge source: read key: %w", err)
}
if key == "source" {
var source string
if err := dec.Decode(&source); err != nil {
return "", fmt.Errorf("decode eventbridge source: %w", err)
}
return eventBridgeSource(source), nil
base := h.newEntry(source, lambdaOrigin)
for _, message := range messages {
if h.cfg.Filter.ShouldExclude(message) {
continue
}
var skip json.RawMessage
if err := dec.Decode(&skip); err != nil {
return "", fmt.Errorf("decode eventbridge source: skip field: %w", err)

entry := base
entry.Message = h.cfg.Scrubber.Scrub(message)

if err := concurrent.SafeSender(ctx, out, entry); err != nil {
return err
}
}
return nil
}

return "", nil
func (h *EventBridgeHandler) newEntry(source string, lambdaOrigin model.LambdaOrigin) model.LogEntry {
entry := model.NewLogEntry()
entry.Source = cmp.Or(h.cfg.Source, source)
entry.Service = cmp.Or(h.cfg.Service, entry.Source)
entry.Tags = h.cfg.Tags
entry.Metadata = lambdaOrigin
return entry
}

func eventBridgeSource(source string) string {
_, after, found := strings.Cut(source, ".")
func eventBridgeSource(event json.RawMessage) (string, error) {
dec := json.NewDecoder(bytes.NewReader(event))
if err := parsing.SkipBrace(dec); err != nil {
return "", err
}

if err := parsing.SkipToKey(dec, "source"); err != nil {
return "", err
}

var raw string
if err := dec.Decode(&raw); err != nil {
return "", fmt.Errorf("decode: %w", err)
}

_, after, found := strings.Cut(raw, ".")
if found {
return after
return after, nil
}
return sourceCloudwatch
return sourceCloudwatch, nil
}
68 changes: 56 additions & 12 deletions aws/logs_monitoring_go/internal/handling/eventbridge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,27 @@ func TestEventBridgeHandler_Handle(t *testing.T) {
cfg: testutil.EmptyConfig(),
wantErr: true,
},
"securityhub no findings falls back": {
event: json.RawMessage(`{"source":"aws.securityhub","detail":{}}`),
cfg: testutil.EmptyConfig(),
want: []model.LogEntry{
{
Message: `{"source":"aws.securityhub","detail":{}}`,
Source: sourceSecurityHub,
SourceCategory: "aws",
Service: sourceSecurityHub,
Metadata: testutil.LambdaOrigin(),
},
},
},
}

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
t.Parallel()

handler := NewEventBridge(tc.cfg)
out := make(chan model.LogEntry, len(tc.want))
out := make(chan model.LogEntry, 10)

err := handler.Handle(ctx, tc.event, out)
close(out)
Expand All @@ -100,26 +113,57 @@ func TestEventBridgeHandler_Handle(t *testing.T) {
}
}

func TestEventBridgeSource(t *testing.T) {
func TestEventBridgeHandler_SecurityHub(t *testing.T) {
t.Parallel()

ctx := testutil.LambdaContext(t)

tests := map[string]struct {
source string
want string
event json.RawMessage
cfg *config.Config
want []string
}{
"aws.events": {source: "aws.events", want: "events"},
"aws.ec2": {source: "aws.ec2", want: "ec2"},
"aws.s3": {source: "aws.s3", want: "s3"},
"custom.app": {source: "custom.app", want: "app"},
"no dot": {source: "nodot", want: "cloudwatch"},
"empty string": {source: "", want: "cloudwatch"},
"one finding": {
event: json.RawMessage(`{"source":"aws.securityhub","detail-type":"Security Hub Findings - Imported","detail":{"findings":[{"myattribute":"somevalue","Resources":[{"Region":"us-east-1","Type":"AwsEc2SecurityGroup"}]}]}}`),
cfg: testutil.EmptyConfig(),
want: []string{`{"source":"aws.securityhub","detail-type":"Security Hub Findings - Imported","detail":{"finding":{"myattribute":"somevalue","resources":{"AwsEc2SecurityGroup":{"Region":"us-east-1"}}}}}`},
},
"multiple findings": {
event: json.RawMessage(`{"source":"aws.securityhub","detail":{"findings":[{"id":"f1","Resources":[{"Type":"AwsEc2SecurityGroup","Region":"us-east-1"}]},{"id":"f2","Resources":[{"Type":"AwsIamRole","Region":"us-west-2"}]}]}}`),
cfg: testutil.EmptyConfig(),
want: []string{
`{"source":"aws.securityhub","detail":{"finding":{"id":"f1","resources":{"AwsEc2SecurityGroup":{"Region":"us-east-1"}}}}}`,
`{"source":"aws.securityhub","detail":{"finding":{"id":"f2","resources":{"AwsIamRole":{"Region":"us-west-2"}}}}}`,
},
},
"with filtering": {
event: json.RawMessage(`{"source":"aws.securityhub","detail":{"findings":[{"id":"keep","Resources":[]},{"id":"drop","Resources":[]}]}}`),
cfg: testutil.Config(t, testutil.WithExcludeFilter(`"id":"drop"`)),
want: []string{`{"source":"aws.securityhub","detail":{"finding":{"id":"keep","resources":{}}}}`},
},
}

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
t.Parallel()
got := eventBridgeSource(tc.source)
assert.Equal(t, tc.want, got)

handler := NewEventBridge(tc.cfg)
out := make(chan model.LogEntry, 10)

err := handler.Handle(ctx, tc.event, out)
close(out)

require.NoError(t, err)

var got []model.LogEntry
for entry := range out {
got = append(got, entry)
}

require.Len(t, got, len(tc.want))
for i := range tc.want {
assert.JSONEq(t, tc.want[i], got[i].Message)
}
})
}
}
30 changes: 30 additions & 0 deletions aws/logs_monitoring_go/internal/handling/gunzip.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2026-Present Datadog, Inc.

package handling

import (
"bufio"
"bytes"
"compress/gzip"
"fmt"
"io"
)

var gzipMagic = []byte{0x1f, 0x8b}

func gunzip(r io.Reader) (io.Reader, func() error, error) {
buf := bufio.NewReaderSize(r, len(gzipMagic))
header, err := buf.Peek(len(gzipMagic))
if err != nil || !bytes.Equal(header, gzipMagic) {
return buf, func() error { return nil }, nil
}

gz, err := gzip.NewReader(buf)
if err != nil {
return nil, nil, fmt.Errorf("gzip: %w", err)
}
return gz, func() error { return gz.Close() }, nil
}
Loading
Loading