Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions aws/logs_monitoring_go/cmd/forwarder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ func handleRequest(cfg *config.Config) func(context.Context, json.RawMessage) er
return pipeline.Run(ctx, event, cfg, forwarding.CloudwatchStorage, parsing.HandleCloudwatchLogs)
case parsing.InvocationSourceKinesis:
return pipeline.Run(ctx, event, cfg, forwarding.CloudwatchStorage, parsing.HandleKinesis)
case parsing.InvocationSourceSNS:
return pipeline.Run(ctx, event, cfg, forwarding.S3Storage, parsing.HandleSNS)
case parsing.InvocationSourceS3:
return pipeline.Run(ctx, event, cfg, forwarding.S3Storage, parsing.HandleS3)
default:
Expand Down
43 changes: 43 additions & 0 deletions aws/logs_monitoring_go/internal/parsing/helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2026-Present Datadog, Inc.

package parsing

import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"testing"

"github.com/aws/aws-lambda-go/lambdacontext"
)

const testARN = "arn:aws:lambda:us-east-1:123456789012:function:forwarder"

var testLambdaCtx = lambdacontext.NewContext(context.Background(), &lambdacontext.LambdaContext{
InvokedFunctionArn: testARN,
})

func mustGzipJSON(t *testing.T, v any) []byte {
t.Helper()

raw, err := json.Marshal(v)
if err != nil {
t.Fatalf("marshal: %v", err)
}

var buf bytes.Buffer
w := gzip.NewWriter(&buf)
if _, err := w.Write(raw); err != nil {
t.Fatalf("gzip write: %v", err)
}

if err := w.Close(); err != nil {
t.Fatalf("gzip close: %v", err)
}

return buf.Bytes()
}
39 changes: 5 additions & 34 deletions aws/logs_monitoring_go/internal/parsing/kinesis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,18 @@
package parsing

import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"testing"

"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambdacontext"
"github.com/google/go-cmp/cmp"
)

func TestHandleKinesis(t *testing.T) {
t.Parallel()

ctx := lambdacontext.NewContext(context.Background(), &lambdacontext.LambdaContext{
InvokedFunctionArn: "arn:aws:lambda:us-east-1:123456789012:function:forwarder",
})

tests := map[string]struct {
event json.RawMessage
config *config.Config
Expand Down Expand Up @@ -66,7 +58,7 @@ func TestHandleKinesis(t *testing.T) {
Host: "/aws/lambda/testing-datadog",
Tags: model.Tags{"service:lambda", "forwardername:", "forwarder_version:6.0"},
AWS: model.CloudwatchMetadata{
Metadata: model.Metadata{ARN: "arn:aws:lambda:us-east-1:123456789012:function:forwarder"},
Metadata: model.Metadata{ARN: testARN},
Logs: model.CloudwatchLogsContext{
LogGroup: "/aws/lambda/testing-datadog",
LogStream: "2024/10/10/[$LATEST]20bddfd5a2dc4c6b97ac02800eae90d0",
Expand Down Expand Up @@ -106,7 +98,7 @@ func TestHandleKinesis(t *testing.T) {
Host: "/aws/lambda/fn-a",
Tags: model.Tags{"service:lambda", "forwardername:", "forwarder_version:6.0"},
AWS: model.CloudwatchMetadata{
Metadata: model.Metadata{ARN: "arn:aws:lambda:us-east-1:123456789012:function:forwarder"},
Metadata: model.Metadata{ARN: testARN},
Logs: model.CloudwatchLogsContext{LogGroup: "/aws/lambda/fn-a", LogStream: "stream-a", Owner: "111111111111"},
},
},
Expand All @@ -116,7 +108,7 @@ func TestHandleKinesis(t *testing.T) {
Host: "/aws/rds/cluster",
Tags: model.Tags{"service:cloudwatch", "forwardername:", "forwarder_version:6.0"},
AWS: model.CloudwatchMetadata{
Metadata: model.Metadata{ARN: "arn:aws:lambda:us-east-1:123456789012:function:forwarder"},
Metadata: model.Metadata{ARN: testARN},
Logs: model.CloudwatchLogsContext{LogGroup: "/aws/rds/cluster", LogStream: "stream-b", Owner: "222222222222"},
},
},
Expand Down Expand Up @@ -144,7 +136,7 @@ func TestHandleKinesis(t *testing.T) {
Host: "/aws/lambda/good",
Tags: model.Tags{"service:lambda", "forwardername:", "forwarder_version:6.0"},
AWS: model.CloudwatchMetadata{
Metadata: model.Metadata{ARN: "arn:aws:lambda:us-east-1:123456789012:function:forwarder"},
Metadata: model.Metadata{ARN: testARN},
Logs: model.CloudwatchLogsContext{LogGroup: "/aws/lambda/good", LogStream: "stream", Owner: "123456789012"},
},
},
Expand All @@ -158,7 +150,7 @@ func TestHandleKinesis(t *testing.T) {

out := make(chan model.CloudwatchLogEntry, tc.chanSize)

err := HandleKinesis(ctx, tc.event, tc.config, out)
err := HandleKinesis(testLambdaCtx, tc.event, tc.config, out)
close(out)

var got []model.CloudwatchLogEntry
Expand All @@ -183,27 +175,6 @@ func TestHandleKinesis(t *testing.T) {
}
}

func mustGzipJSON(t *testing.T, v any) []byte {
t.Helper()

raw, err := json.Marshal(v)
if err != nil {
t.Fatalf("marshal: %v", err)
}

var buf bytes.Buffer
w := gzip.NewWriter(&buf)
if _, err := w.Write(raw); err != nil {
t.Fatalf("gzip write: %v", err)
}

if err := w.Close(); err != nil {
t.Fatalf("gzip close: %v", err)
}

return buf.Bytes()
}

func mustKinesisEvent(t *testing.T, records ...[]byte) json.RawMessage {
t.Helper()

Expand Down
65 changes: 41 additions & 24 deletions aws/logs_monitoring_go/internal/parsing/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
package parsing

import (
"cmp"
"context"
"encoding/json"
"fmt"
"log/slog"
"regexp"
"slices"
"strings"

"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/concurrent"
Expand All @@ -19,7 +21,7 @@ import (
"github.com/aws/aws-lambda-go/events"
)

type s3RecordContext struct {
type s3Record struct {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small refactor to not confuse this the context package.

metadata model.Metadata
tags model.Tags
source string
Expand All @@ -30,19 +32,31 @@ type s3RecordContext struct {
}

func HandleS3(ctx context.Context, event json.RawMessage, cfg *config.Config, out chan<- model.S3LogEntry) error {
var s3Event events.S3Event
if err := json.Unmarshal(event, &s3Event); err != nil {
return fmt.Errorf("unmarshal: %w", err)
client, metadata, err := setupS3(ctx, cfg)
if err != nil {
return fmt.Errorf("setup s3: %w", err)
}
return handleS3Event(ctx, event, cfg, client, metadata, out)
}

func setupS3(ctx context.Context, cfg *config.Config) (S3APIClient, model.Metadata, error) {
client, err := createS3APIClient(ctx, cfg.UseFIPS)
if err != nil {
return fmt.Errorf("create S3 client: %w", err)
return nil, model.Metadata{}, fmt.Errorf("create S3 client: %w", err)
}

forwarderMetadata, err := model.GetMetadata(ctx)
metadata, err := model.GetMetadata(ctx)
if err != nil {
return err
return nil, model.Metadata{}, fmt.Errorf("get metadata: %w", err)
}

return client, metadata, nil
}

func handleS3Event(ctx context.Context, event json.RawMessage, cfg *config.Config, client S3APIClient, metadata model.Metadata, out chan<- model.S3LogEntry) error {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handleS3Event will be called for each events.SNSEvent.Records entry and we don't want to create multiple S3 clients : we initialise it outside

var s3Event events.S3Event
if err := json.Unmarshal(event, &s3Event); err != nil {
return fmt.Errorf("unmarshal: %w", err)
}

for _, record := range s3Event.Records {
Expand All @@ -51,30 +65,36 @@ func HandleS3(ctx context.Context, event json.RawMessage, cfg *config.Config, ou

tags, service := getTagsAndService(cfg)
source := getS3Source(cfg.Source, key)
if service == "" {
service = source
}

rc := s3RecordContext{
forwarderMetadata, tags, source, service, bucket, key, cfg.S3MultilineLogRegex,
service = cmp.Or(service, source)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More elegant solution. Will update the Cloudwatch flow also in a next PR (avoids cluttering this one).


rc := s3Record{
metadata: metadata,
tags: tags,
source: source,
service: service,
bucket: bucket,
key: key,
multilineRegex: cfg.S3MultilineLogRegex,
}
if err := processS3Record(ctx, client, out, rc); err != nil {
return fmt.Errorf("process S3 record: %w", err)
slog.WarnContext(ctx, "skipping s3 record",
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Standardizing the behavior between handlers.

"bucket", bucket, "key", key, "error", err)
continue
}
}

return nil
}

func processS3Record(ctx context.Context, client S3APIClient, out chan<- model.S3LogEntry, rc s3RecordContext) error {
func processS3Record(ctx context.Context, client S3APIClient, out chan<- model.S3LogEntry, rc s3Record) error {
body, err := getS3Object(ctx, client, rc.bucket, rc.key)
if err != nil {
return err
}

defer func() {
if err := body.Close(); err != nil {
slog.Warn("failed to close response body", slog.Any("error", err))
slog.WarnContext(ctx, "failed to close response body", slog.Any("error", err))
}
}()

Expand All @@ -87,21 +107,18 @@ func processS3Record(ctx context.Context, client S3APIClient, out chan<- model.S
}

if err := scanner.Err(); err != nil {
return fmt.Errorf("scan s3://%s/%s: %w", rc.bucket, rc.key, err)
return fmt.Errorf("scan: %w", err)
}

return nil
}

func makeS3Entry(rc s3RecordContext, message string) model.S3LogEntry {
func makeS3Entry(rc s3Record, message string) model.S3LogEntry {
ddtags, ddtagsService, message := extractFromMessage(message)

entryService := rc.service
if ddtagsService != "" {
entryService = ddtagsService
}
entryService := cmp.Or(ddtagsService, rc.service)

ddtags = append(ddtags, "service:"+entryService)
tags := slices.Concat(ddtags, model.Tags{"service:" + entryService}, rc.tags)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More elegant and performant solution since it is capped at most 1 allocation.

metadata := model.S3Metadata{
Metadata: rc.metadata,
S3Context: model.S3Context{
Expand All @@ -115,7 +132,7 @@ func makeS3Entry(rc s3RecordContext, message string) model.S3LogEntry {
Source: rc.source,
SourceCategory: sourceCategory,
Service: entryService,
Tags: append(ddtags, rc.tags...),
Tags: tags,
Metadata: metadata,
}
}
Expand Down
22 changes: 11 additions & 11 deletions aws/logs_monitoring_go/internal/parsing/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestProcessS3Record(t *testing.T) {
tests := map[string]struct {
mockSetup func(m *MockS3APIClient)
chanSize int
rc s3RecordContext
rc s3Record
want []model.S3LogEntry
wantErr bool
}{
Expand All @@ -35,7 +35,7 @@ func TestProcessS3Record(t *testing.T) {
}, nil)
},
chanSize: 1,
rc: s3RecordContext{tags: model.Tags{}, source: "s3", service: "s3", bucket: "b", key: "k"},
rc: s3Record{tags: model.Tags{}, source: "s3", service: "s3", bucket: "b", key: "k"},
want: []model.S3LogEntry{{
Message: "line1",
Source: "s3",
Expand All @@ -55,7 +55,7 @@ func TestProcessS3Record(t *testing.T) {
}, nil)
},
chanSize: 3,
rc: s3RecordContext{tags: model.Tags{}, source: "s3", service: "s3", bucket: "b", key: "k"},
rc: s3Record{tags: model.Tags{}, source: "s3", service: "s3", bucket: "b", key: "k"},
want: []model.S3LogEntry{
{Message: "line1", Source: "s3", SourceCategory: "aws", Service: "s3", Tags: model.Tags{"service:s3"}, Metadata: model.S3Metadata{S3Context: model.S3Context{Bucket: "b", Key: "k"}}},
{Message: "line2", Source: "s3", SourceCategory: "aws", Service: "s3", Tags: model.Tags{"service:s3"}, Metadata: model.S3Metadata{S3Context: model.S3Context{Bucket: "b", Key: "k"}}},
Expand All @@ -70,7 +70,7 @@ func TestProcessS3Record(t *testing.T) {
}, nil)
},
chanSize: 0,
rc: s3RecordContext{tags: model.Tags{}, source: "s3", service: "s3", bucket: "b", key: "k"},
rc: s3Record{tags: model.Tags{}, source: "s3", service: "s3", bucket: "b", key: "k"},
want: nil,
},
"s3_error": {
Expand All @@ -79,7 +79,7 @@ func TestProcessS3Record(t *testing.T) {
Return(nil, errors.New("access denied"))
},
chanSize: 1,
rc: s3RecordContext{bucket: "b", key: "k"},
rc: s3Record{bucket: "b", key: "k"},
wantErr: true,
},
"ddtags_extraction": {
Expand All @@ -90,7 +90,7 @@ func TestProcessS3Record(t *testing.T) {
}, nil)
},
chanSize: 1,
rc: s3RecordContext{tags: model.Tags{}, source: "s3", service: "s3", bucket: "b", key: "k"},
rc: s3Record{tags: model.Tags{}, source: "s3", service: "s3", bucket: "b", key: "k"},
want: []model.S3LogEntry{{
Message: `{"msg":"hello"}`,
Source: "s3",
Expand All @@ -108,7 +108,7 @@ func TestProcessS3Record(t *testing.T) {
}, nil)
},
chanSize: 1,
rc: s3RecordContext{tags: model.Tags{}, source: "s3", service: "s3", bucket: "b", key: "k"},
rc: s3Record{tags: model.Tags{}, source: "s3", service: "s3", bucket: "b", key: "k"},
want: []model.S3LogEntry{{
Message: "helloworld",
Source: "s3",
Expand All @@ -126,7 +126,7 @@ func TestProcessS3Record(t *testing.T) {
}, nil)
},
chanSize: 2,
rc: s3RecordContext{
rc: s3Record{
tags: model.Tags{}, source: "s3", service: "s3", bucket: "b", key: "k",
multilineRegex: regexp.MustCompile(`\d{4}-\d{2}-\d{2}`),
},
Expand All @@ -143,7 +143,7 @@ func TestProcessS3Record(t *testing.T) {
}, nil)
},
chanSize: 1,
rc: s3RecordContext{
rc: s3Record{
tags: model.Tags{}, source: "s3", service: "s3", bucket: "b", key: "k",
multilineRegex: regexp.MustCompile(`\d{4}-\d{2}-\d{2}`),
},
Expand All @@ -164,7 +164,7 @@ func TestProcessS3Record(t *testing.T) {
}, nil)
},
chanSize: 3,
rc: s3RecordContext{
rc: s3Record{
tags: model.Tags{}, source: "s3", service: "s3", bucket: "b", key: "k",
multilineRegex: regexp.MustCompile(`\d{4}-\d{2}-\d{2}`),
},
Expand All @@ -182,7 +182,7 @@ func TestProcessS3Record(t *testing.T) {
}, nil)
},
chanSize: 1,
rc: s3RecordContext{tags: model.Tags{"env:prod", "team:aws"}, source: "s3", service: "s3", bucket: "b", key: "k"},
rc: s3Record{tags: model.Tags{"env:prod", "team:aws"}, source: "s3", service: "s3", bucket: "b", key: "k"},
want: []model.S3LogEntry{{
Message: "line1",
Source: "s3",
Expand Down
Loading
Loading