From 363def9519183ac6c3d7d39895b79adc70737b51 Mon Sep 17 00:00:00 2001 From: Nabil Dakkoune Date: Wed, 1 Apr 2026 18:02:03 +0200 Subject: [PATCH 01/14] [AWSINTS-3450] feat(go-forwarder): add CloudwatchLogEntry creation logic --- .../internal/parsing/cloudwatchlogs.go | 131 ++++++++++++++++++ .../internal/parsing/parsing.go | 4 - .../internal/parsing/tags.go | 42 ++++++ 3 files changed, 173 insertions(+), 4 deletions(-) create mode 100644 aws/logs_monitoring_go/internal/parsing/cloudwatchlogs.go create mode 100644 aws/logs_monitoring_go/internal/parsing/tags.go diff --git a/aws/logs_monitoring_go/internal/parsing/cloudwatchlogs.go b/aws/logs_monitoring_go/internal/parsing/cloudwatchlogs.go new file mode 100644 index 000000000..7f85e1bbc --- /dev/null +++ b/aws/logs_monitoring_go/internal/parsing/cloudwatchlogs.go @@ -0,0 +1,131 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package parsing + +import ( + "context" + "encoding/json" + "log/slog" + "strings" + + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" + "github.com/aws/aws-lambda-go/events" + "github.com/aws/aws-lambda-go/lambdacontext" +) + +func parseCloudwatchLogs(ctx context.Context, event json.RawMessage, cfg *config.Config, out chan<- model.CloudwatchLogEntry) { + var cwEvent events.CloudwatchLogsEvent + if err := json.Unmarshal(event, &cwEvent); err != nil { + slog.Error("failed to unmarshal cloudwatch event", slog.Any("error", err)) + return + } + + data, err := cwEvent.AWSLogs.Parse() + if err != nil { + slog.Error("failed to decompress cloudwatch data", slog.Any("error", err)) + return + } + + if data.MessageType == "CONTROL_MESSAGE" { + return + } + + source := getCloudwatchSource(cfg.Source, data.LogGroup, data.LogStream) + metadata := getCloudwatchMetadata(ctx, data) + host := getCloudwatchHost(cfg.Host, data.LogGroup) + tags, service := getTagsAndService(*cfg) + if service == "" { + service = source + } + + for _, le := range data.LogEvents { + entry := model.CloudwatchLogEntry{ + ID: le.ID, + Timestamp: le.Timestamp, + Message: le.Message, + Source: source, + Service: service, + Host: host, + Tags: tags, + AWS: metadata, + } + + select { + case out <- entry: + case <-ctx.Done(): + return + } + } +} + +func getCloudwatchSource(sourceOverride, logGroup, logStream string) string { + if sourceOverride != "" { + return sourceOverride + } + + var source string + if strings.Contains(logStream, "_CloudTrail_") { + source = "cloudtrail" + } else { + source = getSourceFromLogGroup(strings.ToLower(logGroup)) + } + + if strings.HasPrefix(logStream, "states/") { + source = "stepfunction" + } + + return source +} + +func getSourceFromLogGroup(logGroupLower string) string { + if strings.HasPrefix(logGroupLower, "_cloudtrail_") { + return "cloudtrail" + } + if strings.HasPrefix(logGroupLower, "/aws/kinesis") { + return "kinesis" + } + if strings.HasPrefix(logGroupLower, "/aws/lambda") { + return "lambda" + } + if strings.HasPrefix(logGroupLower, "sns/") { + return "sns" + } + if strings.Contains(logGroupLower, "cloudtrail") { + return "cloudtrail" + } + return "cloudwatch" +} + +func getCloudwatchMetadata(ctx context.Context, data events.CloudwatchLogsData) model.CloudwatchMetadata { + metadata := model.CloudwatchMetadata{ + Logs: model.CloudwatchLogsContext{ + LogGroup: data.LogGroup, + LogStream: data.LogStream, + Owner: data.Owner, + }, + } + + if lambdacontext.FunctionVersion != "$LATEST" { + metadata.FunctionVersion = lambdacontext.FunctionVersion + } + + if lc, ok := lambdacontext.FromContext(ctx); ok { + metadata.InvokedFunctionARN = lc.InvokedFunctionArn + } else { + slog.Warn("failed lambda context loading") + } + + return metadata +} + +func getCloudwatchHost(hostOverride, logGroup string) string { + if hostOverride != "" { + return hostOverride + } + + return logGroup +} diff --git a/aws/logs_monitoring_go/internal/parsing/parsing.go b/aws/logs_monitoring_go/internal/parsing/parsing.go index fe8b41461..aa7684bc4 100644 --- a/aws/logs_monitoring_go/internal/parsing/parsing.go +++ b/aws/logs_monitoring_go/internal/parsing/parsing.go @@ -100,7 +100,3 @@ func detectFromRecords(dec *json.Decoder) invocationSource { return invocationSourceUnknown } - -func parseCloudwatchLogs(ctx context.Context, event json.RawMessage, cfg *config.Config, out chan<- model.CloudwatchLogEntry) { - -} diff --git a/aws/logs_monitoring_go/internal/parsing/tags.go b/aws/logs_monitoring_go/internal/parsing/tags.go new file mode 100644 index 000000000..64efbe048 --- /dev/null +++ b/aws/logs_monitoring_go/internal/parsing/tags.go @@ -0,0 +1,42 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package parsing + +import ( + "strings" + + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" + "github.com/aws/aws-lambda-go/lambdacontext" +) + +func getTagsAndService(cfg config.Config) ([]string, string) { + var tags []string + var service string + + if cfg.CustomTags != "" { + for _, tag := range strings.Split(cfg.CustomTags, ",") { + if strings.HasPrefix(tag, "service:") { + if service != "" { + continue + } + service = tag[8:] + } + + tags = append(tags, tag) + } + } + + if cfg.Source != "" { + tags = append(tags, "source_overridden:true") + } + + tags = append(tags, + "forwardername:"+strings.ToLower(lambdacontext.FunctionName), + "forwarder_version:"+config.ForwarderVersion, + ) + + return tags, service +} From 62971ce91039b4575d924462051f7759554ad047 Mon Sep 17 00:00:00 2001 From: Nabil Dakkoune Date: Thu, 2 Apr 2026 17:50:41 +0200 Subject: [PATCH 02/14] Add SafeXXX in concurrent.go, pipeline.go with generics, make the Handler return the error and update main with source detection + pipeline trigger --- aws/logs_monitoring_go/cmd/forwarder/main.go | 12 +++- .../internal/parsing/cloudwatchlogs.go | 45 +++++++++----- .../internal/parsing/parsing.go | 62 +++++++------------ .../internal/parsing/parsing_bench_test.go | 2 +- .../internal/parsing/parsing_test.go | 14 ++--- .../internal/pipeline/pipeline.go | 41 ++++++++++++ 6 files changed, 110 insertions(+), 66 deletions(-) create mode 100644 aws/logs_monitoring_go/internal/pipeline/pipeline.go diff --git a/aws/logs_monitoring_go/cmd/forwarder/main.go b/aws/logs_monitoring_go/cmd/forwarder/main.go index 5b8f8649b..31ba4442f 100644 --- a/aws/logs_monitoring_go/cmd/forwarder/main.go +++ b/aws/logs_monitoring_go/cmd/forwarder/main.go @@ -11,6 +11,8 @@ import ( "log/slog" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/parsing" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/pipeline" "github.com/aws/aws-lambda-go/lambda" ) @@ -26,10 +28,14 @@ func main() { lambda.Start(handleRequest(cfg)) } -// cfg not used for now, will be when forwarding logic added func handleRequest(cfg *config.Config) func(context.Context, json.RawMessage) error { return func(ctx context.Context, event json.RawMessage) error { - slog.Info("received event", slog.String("event", string(event))) - return nil + switch parsing.DetectInvocationSource(event) { + case parsing.InvocationSourceCloudwatchLogs: + return pipeline.Run(ctx, event, cfg, parsing.HandleCloudwatchLogs) + default: + slog.Error("unsupported invocation source") + return nil + } } } diff --git a/aws/logs_monitoring_go/internal/parsing/cloudwatchlogs.go b/aws/logs_monitoring_go/internal/parsing/cloudwatchlogs.go index 7f85e1bbc..0a80a5f91 100644 --- a/aws/logs_monitoring_go/internal/parsing/cloudwatchlogs.go +++ b/aws/logs_monitoring_go/internal/parsing/cloudwatchlogs.go @@ -11,27 +11,41 @@ import ( "log/slog" "strings" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/concurrent" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambdacontext" ) -func parseCloudwatchLogs(ctx context.Context, event json.RawMessage, cfg *config.Config, out chan<- model.CloudwatchLogEntry) { +func HandleCloudwatchLogs(ctx context.Context, event json.RawMessage, cfg *config.Config, out chan<- model.CloudwatchLogEntry) error { + logEntries, err := parseCloudwatchLogs(ctx, event, cfg) + if err != nil { + slog.Error("failed parse cloudwatch logs", slog.Any("error", err)) + } + + for _, logEntry := range logEntries { + if err := concurrent.SafeSender(ctx, out, logEntry); err != nil { + return err + } + } + + return nil +} + +func parseCloudwatchLogs(ctx context.Context, event json.RawMessage, cfg *config.Config) ([]model.CloudwatchLogEntry, error) { var cwEvent events.CloudwatchLogsEvent if err := json.Unmarshal(event, &cwEvent); err != nil { - slog.Error("failed to unmarshal cloudwatch event", slog.Any("error", err)) - return + return nil, err } data, err := cwEvent.AWSLogs.Parse() if err != nil { - slog.Error("failed to decompress cloudwatch data", slog.Any("error", err)) - return + return nil, err } if data.MessageType == "CONTROL_MESSAGE" { - return + return nil, nil } source := getCloudwatchSource(cfg.Source, data.LogGroup, data.LogStream) @@ -42,6 +56,7 @@ func parseCloudwatchLogs(ctx context.Context, event json.RawMessage, cfg *config service = source } + var entries []model.CloudwatchLogEntry for _, le := range data.LogEvents { entry := model.CloudwatchLogEntry{ ID: le.ID, @@ -53,13 +68,10 @@ func parseCloudwatchLogs(ctx context.Context, event json.RawMessage, cfg *config Tags: tags, AWS: metadata, } - - select { - case out <- entry: - case <-ctx.Done(): - return - } + entries = append(entries, entry) } + + return entries, nil } func getCloudwatchSource(sourceOverride, logGroup, logStream string) string { @@ -67,17 +79,16 @@ func getCloudwatchSource(sourceOverride, logGroup, logStream string) string { return sourceOverride } + if strings.HasPrefix(logStream, "states/") { + return "stepfunction" + } + var source string if strings.Contains(logStream, "_CloudTrail_") { source = "cloudtrail" } else { source = getSourceFromLogGroup(strings.ToLower(logGroup)) } - - if strings.HasPrefix(logStream, "states/") { - source = "stepfunction" - } - return source } diff --git a/aws/logs_monitoring_go/internal/parsing/parsing.go b/aws/logs_monitoring_go/internal/parsing/parsing.go index aa7684bc4..cf5b24ae3 100644 --- a/aws/logs_monitoring_go/internal/parsing/parsing.go +++ b/aws/logs_monitoring_go/internal/parsing/parsing.go @@ -7,96 +7,82 @@ package parsing import ( "bytes" - "context" "encoding/json" - "log/slog" - - "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" - "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" ) -type invocationSource int +type InvocationSource int const ( - invocationSourceUnknown invocationSource = iota - invocationSourceCloudwatchLogs - invocationSourceS3 - invocationSourceSNS - invocationSourceSQS - invocationSourceKinesis + InvocationSourceUnknown InvocationSource = iota + InvocationSourceCloudwatchLogs + InvocationSourceS3 + InvocationSourceSNS + InvocationSourceSQS + InvocationSourceKinesis ) -func Parse(ctx context.Context, event json.RawMessage, cfg *config.Config, out chan<- model.CloudwatchLogEntry) { - switch detectInvocationSource(event) { - case invocationSourceCloudwatchLogs: - parseCloudwatchLogs(ctx, event, cfg, out) - default: - slog.Error("unsupported invocation source", slog.String("event", string(event))) - } -} - -func detectInvocationSource(event json.RawMessage) invocationSource { +func DetectInvocationSource(event json.RawMessage) InvocationSource { dec := json.NewDecoder(bytes.NewReader(event)) t, err := dec.Token() if err != nil || t != json.Delim('{') { - return invocationSourceUnknown + return InvocationSourceUnknown } key, err := dec.Token() if err != nil { - return invocationSourceUnknown + return InvocationSourceUnknown } if key == "awslogs" { - return invocationSourceCloudwatchLogs + return InvocationSourceCloudwatchLogs } if key == "Records" { return detectFromRecords(dec) } - return invocationSourceUnknown + return InvocationSourceUnknown } -func detectFromRecords(dec *json.Decoder) invocationSource { +func detectFromRecords(dec *json.Decoder) InvocationSource { t, err := dec.Token() if err != nil || t != json.Delim('[') { - return invocationSourceUnknown + return InvocationSourceUnknown } t, err = dec.Token() if err != nil || t != json.Delim('{') { - return invocationSourceUnknown + return InvocationSourceUnknown } for dec.More() { key, err := dec.Token() if err != nil { - return invocationSourceUnknown + return InvocationSourceUnknown } if key == "eventSource" { val, err := dec.Token() if err != nil { - return invocationSourceUnknown + return InvocationSourceUnknown } switch val { case "aws:s3": - return invocationSourceS3 + return InvocationSourceS3 case "aws:sns": - return invocationSourceSNS + return InvocationSourceSNS case "aws:sqs": - return invocationSourceSQS + return InvocationSourceSQS case "aws:kinesis": - return invocationSourceKinesis + return InvocationSourceKinesis default: - return invocationSourceUnknown + return InvocationSourceUnknown } } var skip json.RawMessage if err := dec.Decode(&skip); err != nil { - return invocationSourceUnknown + return InvocationSourceUnknown } } - return invocationSourceUnknown + return InvocationSourceUnknown } diff --git a/aws/logs_monitoring_go/internal/parsing/parsing_bench_test.go b/aws/logs_monitoring_go/internal/parsing/parsing_bench_test.go index ff119f61b..ecd5c3765 100644 --- a/aws/logs_monitoring_go/internal/parsing/parsing_bench_test.go +++ b/aws/logs_monitoring_go/internal/parsing/parsing_bench_test.go @@ -34,7 +34,7 @@ func BenchmarkDetectInvocationSource(b *testing.B) { for _, tc := range benchCases { b.Run(tc.name, func(b *testing.B) { for b.Loop() { - detectInvocationSource(tc.event) + DetectInvocationSource(tc.event) } }) } diff --git a/aws/logs_monitoring_go/internal/parsing/parsing_test.go b/aws/logs_monitoring_go/internal/parsing/parsing_test.go index 8072f532d..687edab57 100644 --- a/aws/logs_monitoring_go/internal/parsing/parsing_test.go +++ b/aws/logs_monitoring_go/internal/parsing/parsing_test.go @@ -13,33 +13,33 @@ import ( func TestDetectInvocationSource(t *testing.T) { tests := map[string]struct { event json.RawMessage - wantKey invocationSource + wantKey InvocationSource }{ "cloudwatch logs": { event: json.RawMessage(`{"awslogs":{"data":"dGVzdA=="}}`), - wantKey: invocationSourceCloudwatchLogs, + wantKey: InvocationSourceCloudwatchLogs, }, "empty object": { event: json.RawMessage(`{}`), - wantKey: invocationSourceUnknown, + wantKey: InvocationSourceUnknown, }, "s3 not yet implemented": { event: json.RawMessage(`{"Records":[{"s3":{"bucket":{"name":"my-bucket"},"object":{"key":"my-key"}}}]}`), - wantKey: invocationSourceUnknown, + wantKey: InvocationSourceUnknown, }, "not JSON": { event: json.RawMessage(`not a json`), - wantKey: invocationSourceUnknown, + wantKey: InvocationSourceUnknown, }, "empty input": { event: json.RawMessage(``), - wantKey: invocationSourceUnknown, + wantKey: InvocationSourceUnknown, }, } for name, tc := range tests { t.Run(name, func(t *testing.T) { - got := detectInvocationSource(tc.event) + got := DetectInvocationSource(tc.event) if got != tc.wantKey { t.Errorf("got %d, want %d", got, tc.wantKey) } diff --git a/aws/logs_monitoring_go/internal/pipeline/pipeline.go b/aws/logs_monitoring_go/internal/pipeline/pipeline.go new file mode 100644 index 000000000..5aa3fd387 --- /dev/null +++ b/aws/logs_monitoring_go/internal/pipeline/pipeline.go @@ -0,0 +1,41 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package pipeline + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" + "golang.org/x/sync/errgroup" +) + +func Run[T any]( + ctx context.Context, + event json.RawMessage, + cfg *config.Config, + handler func(context.Context, json.RawMessage, *config.Config, chan<- T) error, +) error { + var g errgroup.Group + + entries := make(chan T) + + g.Go(func() error { + defer close(entries) + handler(ctx, event, cfg, entries) + return nil + }) + + g.Go(func() error { + for entry := range entries { + fmt.Println(entry) + } + return nil + }) + + return g.Wait() +} From 87f74ed762de57806749eb8678d61fc98d5c43d4 Mon Sep 17 00:00:00 2001 From: Nabil Dakkoune Date: Thu, 2 Apr 2026 17:59:42 +0200 Subject: [PATCH 03/14] propagate error --- aws/logs_monitoring_go/internal/pipeline/pipeline.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/aws/logs_monitoring_go/internal/pipeline/pipeline.go b/aws/logs_monitoring_go/internal/pipeline/pipeline.go index 5aa3fd387..eb16c891d 100644 --- a/aws/logs_monitoring_go/internal/pipeline/pipeline.go +++ b/aws/logs_monitoring_go/internal/pipeline/pipeline.go @@ -26,8 +26,7 @@ func Run[T any]( g.Go(func() error { defer close(entries) - handler(ctx, event, cfg, entries) - return nil + return handler(ctx, event, cfg, entries) }) g.Go(func() error { From 1757e457ce889676c1b863df56dcdd2d1e34165c Mon Sep 17 00:00:00 2001 From: Nabil Dakkoune Date: Thu, 9 Apr 2026 15:57:34 +0200 Subject: [PATCH 04/14] refactor: add Tags type to prevent Unmarshal to create an array --- aws/logs_monitoring_go/internal/model/tags.go | 15 --------------- .../internal/model/tags_test.go | 14 ++------------ aws/logs_monitoring_go/internal/parsing/tags.go | 5 +++-- 3 files changed, 5 insertions(+), 29 deletions(-) diff --git a/aws/logs_monitoring_go/internal/model/tags.go b/aws/logs_monitoring_go/internal/model/tags.go index 0c3cc4094..8871e8a3d 100644 --- a/aws/logs_monitoring_go/internal/model/tags.go +++ b/aws/logs_monitoring_go/internal/model/tags.go @@ -15,18 +15,3 @@ type Tags []string func (t Tags) MarshalJSON() ([]byte, error) { return json.Marshal(strings.Join(t, ",")) } - -func (t *Tags) UnmarshalJSON(data []byte) error { - var s string - - if err := json.Unmarshal(data, &s); err != nil { - return err - } - if s == "" { - *t = nil - return nil - } - - *t = strings.Split(s, ",") - return nil -} diff --git a/aws/logs_monitoring_go/internal/model/tags_test.go b/aws/logs_monitoring_go/internal/model/tags_test.go index 4b184e220..88ef157a2 100644 --- a/aws/logs_monitoring_go/internal/model/tags_test.go +++ b/aws/logs_monitoring_go/internal/model/tags_test.go @@ -7,11 +7,10 @@ package model import ( "encoding/json" - "slices" "testing" ) -func TestTags(t *testing.T) { +func TestTagsMarshalJSON(t *testing.T) { t.Parallel() tests := map[string]struct { @@ -41,20 +40,11 @@ func TestTags(t *testing.T) { t.Parallel() got, err := json.Marshal(tc.tags) if err != nil { - t.Fatalf("unexpected marshal error: %v", err) + t.Fatalf("unexpected error: %v", err) } if string(got) != tc.want { t.Errorf("got %s, want %s", got, tc.want) } - - var tags Tags - err = json.Unmarshal(got, &tags) - if err != nil { - t.Fatalf("unexpected unmarshal error: %v", err) - } - if !slices.Equal(tc.tags, tags) { - t.Errorf("expected %v, got %v", tc.tags, tags) - } }) } } diff --git a/aws/logs_monitoring_go/internal/parsing/tags.go b/aws/logs_monitoring_go/internal/parsing/tags.go index 64efbe048..6530c647a 100644 --- a/aws/logs_monitoring_go/internal/parsing/tags.go +++ b/aws/logs_monitoring_go/internal/parsing/tags.go @@ -9,11 +9,12 @@ import ( "strings" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" "github.com/aws/aws-lambda-go/lambdacontext" ) -func getTagsAndService(cfg config.Config) ([]string, string) { - var tags []string +func getTagsAndService(cfg config.Config) (model.Tags, string) { + var tags model.Tags var service string if cfg.CustomTags != "" { From d7606745414adc17591d735cecf7fb513f477dab Mon Sep 17 00:00:00 2001 From: Nabil Dakkoune Date: Fri, 10 Apr 2026 17:43:37 +0200 Subject: [PATCH 05/14] add unmarshal --- aws/logs_monitoring_go/internal/model/tags.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/aws/logs_monitoring_go/internal/model/tags.go b/aws/logs_monitoring_go/internal/model/tags.go index 8871e8a3d..0c3cc4094 100644 --- a/aws/logs_monitoring_go/internal/model/tags.go +++ b/aws/logs_monitoring_go/internal/model/tags.go @@ -15,3 +15,18 @@ type Tags []string func (t Tags) MarshalJSON() ([]byte, error) { return json.Marshal(strings.Join(t, ",")) } + +func (t *Tags) UnmarshalJSON(data []byte) error { + var s string + + if err := json.Unmarshal(data, &s); err != nil { + return err + } + if s == "" { + *t = nil + return nil + } + + *t = strings.Split(s, ",") + return nil +} From 4dbd0e9c20246bd301398ffec9693e071ddc3ab2 Mon Sep 17 00:00:00 2001 From: Nabil Dakkoune Date: Fri, 10 Apr 2026 18:01:39 +0200 Subject: [PATCH 06/14] concurrent forawrding + update pipeline --- .../internal/forwarding/forwarding.go | 54 ++++++++++++------- .../internal/forwarding/forwarding_test.go | 4 +- .../internal/pipeline/pipeline.go | 21 ++++++-- 3 files changed, 53 insertions(+), 26 deletions(-) diff --git a/aws/logs_monitoring_go/internal/forwarding/forwarding.go b/aws/logs_monitoring_go/internal/forwarding/forwarding.go index d49ce98d9..fb6ce9227 100644 --- a/aws/logs_monitoring_go/internal/forwarding/forwarding.go +++ b/aws/logs_monitoring_go/internal/forwarding/forwarding.go @@ -16,32 +16,48 @@ import ( "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/concurrent" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" + "golang.org/x/sync/errgroup" ) +const numWorkers = 3 + type Forwarder struct { - Config config.Config - Client *http.Client + config *config.Config + client *http.Client } -func (f Forwarder) Forward(ctx context.Context, in <-chan []byte) error { - for { - body, ok, err := concurrent.SafeReader(ctx, in) - if err != nil { - return err - } - if !ok { - break - } +func NewForwarder(config *config.Config, client *http.Client) *Forwarder { + return &Forwarder{ + config: config, + client: client, + } +} - if err := f.send(ctx, body); err != nil { - return err - } +func (f *Forwarder) Forward(ctx context.Context, in <-chan []byte) error { + g, ctx := errgroup.WithContext(ctx) + + for range numWorkers { + g.Go(func() error { + for { + body, ok, err := concurrent.SafeReader(ctx, in) + if err != nil { + return err + } + if !ok { + return nil + } + + if err := f.send(ctx, body); err != nil { + return err + } + } + }) } - return nil + return g.Wait() } -func (f Forwarder) send(ctx context.Context, body []byte) error { +func (f *Forwarder) send(ctx context.Context, body []byte) error { var compressedBody bytes.Buffer zw := gzip.NewWriter(&compressedBody) if _, err := zw.Write(body); err != nil { @@ -51,18 +67,18 @@ func (f Forwarder) send(ctx context.Context, body []byte) error { return fmt.Errorf("closing gzip writer: %w", err) } - req, err := http.NewRequestWithContext(ctx, http.MethodPost, f.Config.IntakeURL, bytes.NewReader(compressedBody.Bytes())) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, f.config.IntakeURL, bytes.NewReader(compressedBody.Bytes())) if err != nil { return err } - req.Header.Set("DD-API-KEY", f.Config.APIKey) + req.Header.Set("DD-API-KEY", f.config.APIKey) req.Header.Set("Content-Type", "application/json") req.Header.Set("Content-Encoding", "gzip") req.Header.Set("DD-EVP-ORIGIN", "aws_forwarder") req.Header.Set("DD-EVP-ORIGIN-VERSION", config.ForwarderVersion) - resp, err := f.Client.Do(req) + resp, err := f.client.Do(req) if err != nil { return fmt.Errorf("sending to intake: %w", err) } diff --git a/aws/logs_monitoring_go/internal/forwarding/forwarding_test.go b/aws/logs_monitoring_go/internal/forwarding/forwarding_test.go index 864c21db4..649d14a30 100644 --- a/aws/logs_monitoring_go/internal/forwarding/forwarding_test.go +++ b/aws/logs_monitoring_go/internal/forwarding/forwarding_test.go @@ -112,11 +112,11 @@ func TestForward(t *testing.T) { defer server.Close() f := Forwarder{ - Config: config.Config{ + config: &config.Config{ IntakeURL: server.URL, APIKey: "test-api-key", }, - Client: server.Client(), + client: server.Client(), } ctx, cancel := context.WithCancel(context.Background()) diff --git a/aws/logs_monitoring_go/internal/pipeline/pipeline.go b/aws/logs_monitoring_go/internal/pipeline/pipeline.go index eb16c891d..d0c32e257 100644 --- a/aws/logs_monitoring_go/internal/pipeline/pipeline.go +++ b/aws/logs_monitoring_go/internal/pipeline/pipeline.go @@ -8,9 +8,12 @@ package pipeline import ( "context" "encoding/json" - "fmt" + "net/http" + "time" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/forwarding" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/processing" "golang.org/x/sync/errgroup" ) @@ -23,6 +26,12 @@ func Run[T any]( var g errgroup.Group entries := make(chan T) + batches := make(chan []byte) + + batcher := processing.NewBatcher[T]() + forwarder := forwarding.NewForwarder(cfg, &http.Client{ + Timeout: 10 * time.Second, + }) g.Go(func() error { defer close(entries) @@ -30,10 +39,12 @@ func Run[T any]( }) g.Go(func() error { - for entry := range entries { - fmt.Println(entry) - } - return nil + defer close(batches) + return batcher.Batch(ctx, entries, batches) + }) + + g.Go(func() error { + return forwarder.Forward(ctx, batches) }) return g.Wait() From 8795b6b1b8c5d571b4b63afe19ababc30f5660b0 Mon Sep 17 00:00:00 2001 From: Nabil Dakkoune Date: Mon, 13 Apr 2026 10:04:42 +0200 Subject: [PATCH 07/14] refactor(go-forwarder): Tags struct to marshall properly (#1105) --- aws/logs_monitoring_go/internal/model/tags_test.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/aws/logs_monitoring_go/internal/model/tags_test.go b/aws/logs_monitoring_go/internal/model/tags_test.go index 88ef157a2..4b184e220 100644 --- a/aws/logs_monitoring_go/internal/model/tags_test.go +++ b/aws/logs_monitoring_go/internal/model/tags_test.go @@ -7,10 +7,11 @@ package model import ( "encoding/json" + "slices" "testing" ) -func TestTagsMarshalJSON(t *testing.T) { +func TestTags(t *testing.T) { t.Parallel() tests := map[string]struct { @@ -40,11 +41,20 @@ func TestTagsMarshalJSON(t *testing.T) { t.Parallel() got, err := json.Marshal(tc.tags) if err != nil { - t.Fatalf("unexpected error: %v", err) + t.Fatalf("unexpected marshal error: %v", err) } if string(got) != tc.want { t.Errorf("got %s, want %s", got, tc.want) } + + var tags Tags + err = json.Unmarshal(got, &tags) + if err != nil { + t.Fatalf("unexpected unmarshal error: %v", err) + } + if !slices.Equal(tc.tags, tags) { + t.Errorf("expected %v, got %v", tc.tags, tags) + } }) } } From c4d9c8156421c43d56a5393b1a42407fba960073 Mon Sep 17 00:00:00 2001 From: Nabil Dakkoune Date: Mon, 13 Apr 2026 11:07:52 +0200 Subject: [PATCH 08/14] Use pointer field, but value struct. --- .../internal/forwarding/forwarding.go | 8 ++++---- .../internal/forwarding/forwarding_test.go | 11 ++++------- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/aws/logs_monitoring_go/internal/forwarding/forwarding.go b/aws/logs_monitoring_go/internal/forwarding/forwarding.go index fb6ce9227..3e4edd6be 100644 --- a/aws/logs_monitoring_go/internal/forwarding/forwarding.go +++ b/aws/logs_monitoring_go/internal/forwarding/forwarding.go @@ -26,14 +26,14 @@ type Forwarder struct { client *http.Client } -func NewForwarder(config *config.Config, client *http.Client) *Forwarder { - return &Forwarder{ +func NewForwarder(config *config.Config, client *http.Client) Forwarder { + return Forwarder{ config: config, client: client, } } -func (f *Forwarder) Forward(ctx context.Context, in <-chan []byte) error { +func (f Forwarder) Forward(ctx context.Context, in <-chan []byte) error { g, ctx := errgroup.WithContext(ctx) for range numWorkers { @@ -57,7 +57,7 @@ func (f *Forwarder) Forward(ctx context.Context, in <-chan []byte) error { return g.Wait() } -func (f *Forwarder) send(ctx context.Context, body []byte) error { +func (f Forwarder) send(ctx context.Context, body []byte) error { var compressedBody bytes.Buffer zw := gzip.NewWriter(&compressedBody) if _, err := zw.Write(body); err != nil { diff --git a/aws/logs_monitoring_go/internal/forwarding/forwarding_test.go b/aws/logs_monitoring_go/internal/forwarding/forwarding_test.go index 649d14a30..20468371a 100644 --- a/aws/logs_monitoring_go/internal/forwarding/forwarding_test.go +++ b/aws/logs_monitoring_go/internal/forwarding/forwarding_test.go @@ -111,13 +111,10 @@ func TestForward(t *testing.T) { })) defer server.Close() - f := Forwarder{ - config: &config.Config{ - IntakeURL: server.URL, - APIKey: "test-api-key", - }, - client: server.Client(), - } + f := NewForwarder(&config.Config{ + IntakeURL: server.URL, + APIKey: "test-api-key", + }, server.Client()) ctx, cancel := context.WithCancel(context.Background()) defer cancel() From d05ffdb1e6f5e634c5a97075e2c07b97afd8ef27 Mon Sep 17 00:00:00 2001 From: Nabil Dakkoune Date: Tue, 14 Apr 2026 11:04:48 +0200 Subject: [PATCH 09/14] update with vincent comments --- aws/logs_monitoring_go/cmd/forwarder/main.go | 5 ++-- .../internal/forwarding/forwarding.go | 1 + .../internal/parsing/cloudwatchlogs.go | 2 +- .../parsing/invocationsource_string.go | 29 +++++++++++++++++++ .../internal/parsing/parsing.go | 1 + .../internal/pipeline/pipeline.go | 6 ++-- 6 files changed, 39 insertions(+), 5 deletions(-) create mode 100644 aws/logs_monitoring_go/internal/parsing/invocationsource_string.go diff --git a/aws/logs_monitoring_go/cmd/forwarder/main.go b/aws/logs_monitoring_go/cmd/forwarder/main.go index 31ba4442f..76eee1c5c 100644 --- a/aws/logs_monitoring_go/cmd/forwarder/main.go +++ b/aws/logs_monitoring_go/cmd/forwarder/main.go @@ -30,11 +30,12 @@ func main() { func handleRequest(cfg *config.Config) func(context.Context, json.RawMessage) error { return func(ctx context.Context, event json.RawMessage) error { - switch parsing.DetectInvocationSource(event) { + invocationSource := parsing.DetectInvocationSource(event) + switch invocationSource { case parsing.InvocationSourceCloudwatchLogs: return pipeline.Run(ctx, event, cfg, parsing.HandleCloudwatchLogs) default: - slog.Error("unsupported invocation source") + slog.Error("unsupported invocation source", slog.String("source", invocationSource.String())) return nil } } diff --git a/aws/logs_monitoring_go/internal/forwarding/forwarding.go b/aws/logs_monitoring_go/internal/forwarding/forwarding.go index 3e4edd6be..a68901dab 100644 --- a/aws/logs_monitoring_go/internal/forwarding/forwarding.go +++ b/aws/logs_monitoring_go/internal/forwarding/forwarding.go @@ -57,6 +57,7 @@ func (f Forwarder) Forward(ctx context.Context, in <-chan []byte) error { return g.Wait() } +// TODO: add retry mechanism for resiliency func (f Forwarder) send(ctx context.Context, body []byte) error { var compressedBody bytes.Buffer zw := gzip.NewWriter(&compressedBody) diff --git a/aws/logs_monitoring_go/internal/parsing/cloudwatchlogs.go b/aws/logs_monitoring_go/internal/parsing/cloudwatchlogs.go index 0a80a5f91..30feb74de 100644 --- a/aws/logs_monitoring_go/internal/parsing/cloudwatchlogs.go +++ b/aws/logs_monitoring_go/internal/parsing/cloudwatchlogs.go @@ -127,7 +127,7 @@ func getCloudwatchMetadata(ctx context.Context, data events.CloudwatchLogsData) if lc, ok := lambdacontext.FromContext(ctx); ok { metadata.InvokedFunctionARN = lc.InvokedFunctionArn } else { - slog.Warn("failed lambda context loading") + slog.Warn("failed to load lambda context, this should not happen in production. The code is either not running from AWS Lambda or context is broken.") } return metadata diff --git a/aws/logs_monitoring_go/internal/parsing/invocationsource_string.go b/aws/logs_monitoring_go/internal/parsing/invocationsource_string.go new file mode 100644 index 000000000..7d7b15b95 --- /dev/null +++ b/aws/logs_monitoring_go/internal/parsing/invocationsource_string.go @@ -0,0 +1,29 @@ +// Code generated by "stringer -type InvocationSource -trimprefix InvocationSource"; DO NOT EDIT. + +package parsing + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[InvocationSourceUnknown-0] + _ = x[InvocationSourceCloudwatchLogs-1] + _ = x[InvocationSourceS3-2] + _ = x[InvocationSourceSNS-3] + _ = x[InvocationSourceSQS-4] + _ = x[InvocationSourceKinesis-5] +} + +const _InvocationSource_name = "UnknownCloudwatchLogsS3SNSSQSKinesis" + +var _InvocationSource_index = [...]uint8{0, 7, 21, 23, 26, 29, 36} + +func (i InvocationSource) String() string { + idx := int(i) - 0 + if i < 0 || idx >= len(_InvocationSource_index)-1 { + return "InvocationSource(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _InvocationSource_name[_InvocationSource_index[idx]:_InvocationSource_index[idx+1]] +} diff --git a/aws/logs_monitoring_go/internal/parsing/parsing.go b/aws/logs_monitoring_go/internal/parsing/parsing.go index cf5b24ae3..f80440735 100644 --- a/aws/logs_monitoring_go/internal/parsing/parsing.go +++ b/aws/logs_monitoring_go/internal/parsing/parsing.go @@ -10,6 +10,7 @@ import ( "encoding/json" ) +//go:generate stringer -type InvocationSource -trimprefix InvocationSource type InvocationSource int const ( diff --git a/aws/logs_monitoring_go/internal/pipeline/pipeline.go b/aws/logs_monitoring_go/internal/pipeline/pipeline.go index d0c32e257..9817d562e 100644 --- a/aws/logs_monitoring_go/internal/pipeline/pipeline.go +++ b/aws/logs_monitoring_go/internal/pipeline/pipeline.go @@ -17,20 +17,22 @@ import ( "golang.org/x/sync/errgroup" ) +const timeout = 10 * time.Second + func Run[T any]( ctx context.Context, event json.RawMessage, cfg *config.Config, handler func(context.Context, json.RawMessage, *config.Config, chan<- T) error, ) error { - var g errgroup.Group + g, ctx := errgroup.WithContext(ctx) // Fragile because if one goroutine fails, all stages will stop gracefully entries := make(chan T) batches := make(chan []byte) batcher := processing.NewBatcher[T]() forwarder := forwarding.NewForwarder(cfg, &http.Client{ - Timeout: 10 * time.Second, + Timeout: timeout, }) g.Go(func() error { From 9dd1b5a34d80afa4207a66fa641eab039bcd5122 Mon Sep 17 00:00:00 2001 From: Nabil Dakkoune Date: Tue, 14 Apr 2026 15:31:53 +0200 Subject: [PATCH 10/14] Update aws/logs_monitoring_go/internal/parsing/cloudwatchlogs.go Co-authored-by: Vincent Boutour --- aws/logs_monitoring_go/internal/parsing/cloudwatchlogs.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws/logs_monitoring_go/internal/parsing/cloudwatchlogs.go b/aws/logs_monitoring_go/internal/parsing/cloudwatchlogs.go index 30feb74de..a257a3332 100644 --- a/aws/logs_monitoring_go/internal/parsing/cloudwatchlogs.go +++ b/aws/logs_monitoring_go/internal/parsing/cloudwatchlogs.go @@ -36,7 +36,7 @@ func HandleCloudwatchLogs(ctx context.Context, event json.RawMessage, cfg *confi func parseCloudwatchLogs(ctx context.Context, event json.RawMessage, cfg *config.Config) ([]model.CloudwatchLogEntry, error) { var cwEvent events.CloudwatchLogsEvent if err := json.Unmarshal(event, &cwEvent); err != nil { - return nil, err + return nil, fmt.Errorf("unmarshal: %w", err) } data, err := cwEvent.AWSLogs.Parse() From d543cfab6d5b745c1e0d3c29c53865006a9c5bb5 Mon Sep 17 00:00:00 2001 From: Nabil Dakkoune Date: Tue, 14 Apr 2026 15:32:50 +0200 Subject: [PATCH 11/14] Update aws/logs_monitoring_go/internal/parsing/cloudwatchlogs.go Co-authored-by: Vincent Boutour --- aws/logs_monitoring_go/internal/parsing/cloudwatchlogs.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws/logs_monitoring_go/internal/parsing/cloudwatchlogs.go b/aws/logs_monitoring_go/internal/parsing/cloudwatchlogs.go index a257a3332..eb2e14db5 100644 --- a/aws/logs_monitoring_go/internal/parsing/cloudwatchlogs.go +++ b/aws/logs_monitoring_go/internal/parsing/cloudwatchlogs.go @@ -41,7 +41,7 @@ func parseCloudwatchLogs(ctx context.Context, event json.RawMessage, cfg *config data, err := cwEvent.AWSLogs.Parse() if err != nil { - return nil, err + return nil, fmt.Errorf("parse: %w", err) } if data.MessageType == "CONTROL_MESSAGE" { From 5349a82e5607a00c9644d5a961529aca5420f618 Mon Sep 17 00:00:00 2001 From: Nabil Dakkoune Date: Tue, 14 Apr 2026 15:50:15 +0200 Subject: [PATCH 12/14] splitseq and error message clarification --- .../internal/parsing/cloudwatchlogs.go | 11 +++++------ aws/logs_monitoring_go/internal/parsing/tags.go | 2 +- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/aws/logs_monitoring_go/internal/parsing/cloudwatchlogs.go b/aws/logs_monitoring_go/internal/parsing/cloudwatchlogs.go index eb2e14db5..34851130a 100644 --- a/aws/logs_monitoring_go/internal/parsing/cloudwatchlogs.go +++ b/aws/logs_monitoring_go/internal/parsing/cloudwatchlogs.go @@ -8,6 +8,7 @@ package parsing import ( "context" "encoding/json" + "fmt" "log/slog" "strings" @@ -21,7 +22,7 @@ import ( func HandleCloudwatchLogs(ctx context.Context, event json.RawMessage, cfg *config.Config, out chan<- model.CloudwatchLogEntry) error { logEntries, err := parseCloudwatchLogs(ctx, event, cfg) if err != nil { - slog.Error("failed parse cloudwatch logs", slog.Any("error", err)) + return err } for _, logEntry := range logEntries { @@ -83,13 +84,11 @@ func getCloudwatchSource(sourceOverride, logGroup, logStream string) string { return "stepfunction" } - var source string if strings.Contains(logStream, "_CloudTrail_") { - source = "cloudtrail" - } else { - source = getSourceFromLogGroup(strings.ToLower(logGroup)) + return "cloudtrail" } - return source + + return getSourceFromLogGroup(strings.ToLower(logGroup)) } func getSourceFromLogGroup(logGroupLower string) string { diff --git a/aws/logs_monitoring_go/internal/parsing/tags.go b/aws/logs_monitoring_go/internal/parsing/tags.go index 6530c647a..b6889e4a8 100644 --- a/aws/logs_monitoring_go/internal/parsing/tags.go +++ b/aws/logs_monitoring_go/internal/parsing/tags.go @@ -18,7 +18,7 @@ func getTagsAndService(cfg config.Config) (model.Tags, string) { var service string if cfg.CustomTags != "" { - for _, tag := range strings.Split(cfg.CustomTags, ",") { + for tag := range strings.SplitSeq(cfg.CustomTags, ",") { if strings.HasPrefix(tag, "service:") { if service != "" { continue From aba33a66eed4073095c26f68e1563fb43d7878db Mon Sep 17 00:00:00 2001 From: Nabil Dakkoune Date: Tue, 14 Apr 2026 15:52:12 +0200 Subject: [PATCH 13/14] [AWSINTS-3493] fix(go-forwarder): add ddtags extraction from message (#1106) --- .../internal/parsing/cloudwatchlogs.go | 13 +- .../internal/parsing/tags.go | 51 +++++- .../internal/parsing/tags_test.go | 164 ++++++++++++++++++ 3 files changed, 222 insertions(+), 6 deletions(-) create mode 100644 aws/logs_monitoring_go/internal/parsing/tags_test.go diff --git a/aws/logs_monitoring_go/internal/parsing/cloudwatchlogs.go b/aws/logs_monitoring_go/internal/parsing/cloudwatchlogs.go index 34851130a..9e9383b91 100644 --- a/aws/logs_monitoring_go/internal/parsing/cloudwatchlogs.go +++ b/aws/logs_monitoring_go/internal/parsing/cloudwatchlogs.go @@ -59,14 +59,21 @@ func parseCloudwatchLogs(ctx context.Context, event json.RawMessage, cfg *config var entries []model.CloudwatchLogEntry for _, le := range data.LogEvents { + ddtags, ddtagsService, message := extractFromMessage(le.Message) + entryService := service + if ddtagsService != "" { + entryService = ddtagsService + } + ddtags = append(ddtags, "service:"+entryService) + entry := model.CloudwatchLogEntry{ ID: le.ID, Timestamp: le.Timestamp, - Message: le.Message, + Message: message, Source: source, - Service: service, + Service: entryService, Host: host, - Tags: tags, + Tags: append(ddtags, tags...), AWS: metadata, } entries = append(entries, entry) diff --git a/aws/logs_monitoring_go/internal/parsing/tags.go b/aws/logs_monitoring_go/internal/parsing/tags.go index b6889e4a8..6c3b94f77 100644 --- a/aws/logs_monitoring_go/internal/parsing/tags.go +++ b/aws/logs_monitoring_go/internal/parsing/tags.go @@ -6,6 +6,7 @@ package parsing import ( + "encoding/json" "strings" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" @@ -13,6 +14,8 @@ import ( "github.com/aws/aws-lambda-go/lambdacontext" ) +const DdtagsKey = "ddtags" + func getTagsAndService(cfg config.Config) (model.Tags, string) { var tags model.Tags var service string @@ -20,10 +23,10 @@ func getTagsAndService(cfg config.Config) (model.Tags, string) { if cfg.CustomTags != "" { for tag := range strings.SplitSeq(cfg.CustomTags, ",") { if strings.HasPrefix(tag, "service:") { - if service != "" { - continue + if service == "" { + service = tag[8:] } - service = tag[8:] + continue } tags = append(tags, tag) @@ -41,3 +44,45 @@ func getTagsAndService(cfg config.Config) (model.Tags, string) { return tags, service } + +func extractFromMessage(message string) (model.Tags, string, string) { + var tags model.Tags + var service string + + var jsonMessage map[string]any + if err := json.Unmarshal([]byte(message), &jsonMessage); err != nil { + return nil, service, message + } + + ddtagsRaw, ok := jsonMessage[DdtagsKey] + if !ok { + return nil, service, message + } + + ddtagsStr, ok := ddtagsRaw.(string) + if !ok { + return nil, service, message + } + + ddtagsStr = strings.ReplaceAll(ddtagsStr, " ", "") + + for tag := range strings.SplitSeq(ddtagsStr, ",") { + if strings.HasPrefix(tag, "service:") { + if service == "" { + service = tag[8:] + } + continue + } + + tags = append(tags, tag) + } + + delete(jsonMessage, DdtagsKey) + + newMessage, err := json.Marshal(jsonMessage) + if err != nil { + return nil, service, message + } + + return tags, service, string(newMessage) +} diff --git a/aws/logs_monitoring_go/internal/parsing/tags_test.go b/aws/logs_monitoring_go/internal/parsing/tags_test.go new file mode 100644 index 000000000..dfa873892 --- /dev/null +++ b/aws/logs_monitoring_go/internal/parsing/tags_test.go @@ -0,0 +1,164 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package parsing + +import ( + "encoding/json" + "slices" + "strings" + "testing" + + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" +) + +func TestExtractFromMessage(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + message string + wantTags model.Tags + wantService string + wantMessage string + }{ + "empty string": { + message: "", + wantTags: nil, + wantService: "", + wantMessage: "", + }, + "plain text": { + message: "ERROR something went wrong", + wantTags: nil, + wantService: "", + wantMessage: "ERROR something went wrong", + }, + "invalid json": { + message: `{not valid json}`, + wantTags: nil, + wantService: "", + wantMessage: `{not valid json}`, + }, + "json without ddtags": { + message: `{"level":"INFO","msg":"hello"}`, + wantTags: nil, + wantService: "", + wantMessage: `{"level":"INFO","msg":"hello"}`, + }, + "ddtags is not a string": { + message: `{"ddtags":["tag1","tag2"]}`, + wantTags: nil, + wantService: "", + wantMessage: `{"ddtags":["tag1","tag2"]}`, + }, + "single tag": { + message: `{"msg":"hello","ddtags":"env:prod"}`, + wantTags: model.Tags{"env:prod"}, + wantService: "", + wantMessage: `{"msg":"hello"}`, + }, + "multiple tags": { + message: `{"msg":"hello","ddtags":"env:prod,team:backend"}`, + wantTags: model.Tags{"env:prod", "team:backend"}, + wantService: "", + wantMessage: `{"msg":"hello"}`, + }, + "tags with spaces are cleaned": { + message: `{"msg":"hello","ddtags":"env:prod, team:backend, version:1.0"}`, + wantTags: model.Tags{"env:prod", "team:backend", "version:1.0"}, + wantService: "", + wantMessage: `{"msg":"hello"}`, + }, + "service tag extracted": { + message: `{"msg":"hello","ddtags":"service:my-app,env:prod"}`, + wantTags: model.Tags{"env:prod"}, + wantService: "my-app", + wantMessage: `{"msg":"hello"}`, + }, + "service only": { + message: `{"msg":"hello","ddtags":"service:my-app"}`, + wantTags: nil, + wantService: "my-app", + wantMessage: `{"msg":"hello"}`, + }, + "first service wins": { + message: `{"msg":"hello","ddtags":"service:first,service:second,env:prod"}`, + wantTags: model.Tags{"env:prod"}, + wantService: "first", + wantMessage: `{"msg":"hello"}`, + }, + "ddtags is empty string": { + message: `{"msg":"hello","ddtags":""}`, + wantTags: model.Tags{""}, + wantService: "", + wantMessage: `{"msg":"hello"}`, + }, + "ddtags only field in json": { + message: `{"ddtags":"env:prod"}`, + wantTags: model.Tags{"env:prod"}, + wantService: "", + wantMessage: `{}`, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + t.Parallel() + + gotTags, gotService, gotMessage := extractFromMessage(tc.message) + + if !slices.Equal(gotTags, tc.wantTags) { + t.Errorf("tags: got %v, want %v", gotTags, tc.wantTags) + } + if gotService != tc.wantService { + t.Errorf("service: got %q, want %q", gotService, tc.wantService) + } + if gotMessage != tc.wantMessage { + t.Errorf("message: got %q, want %q", gotMessage, tc.wantMessage) + } + }) + } +} + +func FuzzExtractFromMessage(f *testing.F) { + seeds := []string{ + "", + "plain text, not json", + `{not valid json}`, + `{"msg":"hello"}`, + `{"ddtags":["tag1","tag2"]}`, + `{"ddtags":42}`, + `{"msg":"hello","ddtags":"env:prod"}`, + `{"msg":"hello","ddtags":"env:prod, team:backend, version:1.0"}`, + `{"msg":"hello","ddtags":"service:my-app,env:prod"}`, + `{"msg":"hello","ddtags":"service:my-app"}`, + `{"msg":"hello","ddtags":"service:first,service:second,env:prod"}`, + `{"msg":"hello","ddtags":""}`, + `{"ddtags":"env:prod"}`, + } + for _, seed := range seeds { + f.Add(seed) + } + + f.Fuzz(func(t *testing.T, message string) { + tags, _, outMessage := extractFromMessage(message) + + if outMessage != message { + var parsed map[string]any + if err := json.Unmarshal([]byte(outMessage), &parsed); err != nil { + t.Errorf("output message is not valid JSON: %v", err) + } + if _, ok := parsed[DdtagsKey]; ok { + t.Errorf("output message still contains %q key", DdtagsKey) + } + } + + for _, tag := range tags { + if strings.HasPrefix(tag, "service:") { + t.Errorf("tag %q should have been extracted as service, not returned in tags", tag) + } + } + }) +} From d9eb163d6206fcf7128cb4361cb703ee93754e62 Mon Sep 17 00:00:00 2001 From: Nabil Dakkoune Date: Tue, 14 Apr 2026 17:51:43 +0200 Subject: [PATCH 14/14] fix missing dd-storage-tag header --- aws/logs_monitoring_go/internal/forwarding/forwarding.go | 1 + 1 file changed, 1 insertion(+) diff --git a/aws/logs_monitoring_go/internal/forwarding/forwarding.go b/aws/logs_monitoring_go/internal/forwarding/forwarding.go index a68901dab..fc2b3b195 100644 --- a/aws/logs_monitoring_go/internal/forwarding/forwarding.go +++ b/aws/logs_monitoring_go/internal/forwarding/forwarding.go @@ -78,6 +78,7 @@ func (f Forwarder) send(ctx context.Context, body []byte) error { req.Header.Set("Content-Encoding", "gzip") req.Header.Set("DD-EVP-ORIGIN", "aws_forwarder") req.Header.Set("DD-EVP-ORIGIN-VERSION", config.ForwarderVersion) + req.Header.Set("DD-STORAGE-TAG", "cloudwatch") resp, err := f.client.Do(req) if err != nil {