diff --git a/aws/logs_monitoring_go/internal/parsing/cloudwatchlogs.go b/aws/logs_monitoring_go/internal/parsing/cloudwatchlogs.go index 0a80a5f91..3c07d0a6f 100644 --- a/aws/logs_monitoring_go/internal/parsing/cloudwatchlogs.go +++ b/aws/logs_monitoring_go/internal/parsing/cloudwatchlogs.go @@ -58,14 +58,21 @@ func parseCloudwatchLogs(ctx context.Context, event json.RawMessage, cfg *config var entries []model.CloudwatchLogEntry for _, le := range data.LogEvents { + ddtags, ddtagsService, message := extractFromMessage(le.Message) + entryService := service + if ddtagsService != "" { + entryService = ddtagsService + } + ddtags = append(ddtags, "service:"+entryService) + entry := model.CloudwatchLogEntry{ ID: le.ID, Timestamp: le.Timestamp, - Message: le.Message, + Message: message, Source: source, - Service: service, + Service: entryService, Host: host, - Tags: tags, + Tags: append(ddtags, tags...), AWS: metadata, } entries = append(entries, entry) diff --git a/aws/logs_monitoring_go/internal/parsing/tags.go b/aws/logs_monitoring_go/internal/parsing/tags.go index 6530c647a..6c3b94f77 100644 --- a/aws/logs_monitoring_go/internal/parsing/tags.go +++ b/aws/logs_monitoring_go/internal/parsing/tags.go @@ -6,6 +6,7 @@ package parsing import ( + "encoding/json" "strings" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" @@ -13,17 +14,19 @@ import ( "github.com/aws/aws-lambda-go/lambdacontext" ) +const DdtagsKey = "ddtags" + func getTagsAndService(cfg config.Config) (model.Tags, string) { var tags model.Tags var service string if cfg.CustomTags != "" { - for _, tag := range strings.Split(cfg.CustomTags, ",") { + for tag := range strings.SplitSeq(cfg.CustomTags, ",") { if strings.HasPrefix(tag, "service:") { - if service != "" { - continue + if service == "" { + service = tag[8:] } - service = tag[8:] + continue } tags = append(tags, tag) @@ -41,3 +44,45 @@ func getTagsAndService(cfg config.Config) (model.Tags, string) { return tags, service } + +func extractFromMessage(message string) (model.Tags, string, string) { + var tags model.Tags + var service string + + var jsonMessage map[string]any + if err := json.Unmarshal([]byte(message), &jsonMessage); err != nil { + return nil, service, message + } + + ddtagsRaw, ok := jsonMessage[DdtagsKey] + if !ok { + return nil, service, message + } + + ddtagsStr, ok := ddtagsRaw.(string) + if !ok { + return nil, service, message + } + + ddtagsStr = strings.ReplaceAll(ddtagsStr, " ", "") + + for tag := range strings.SplitSeq(ddtagsStr, ",") { + if strings.HasPrefix(tag, "service:") { + if service == "" { + service = tag[8:] + } + continue + } + + tags = append(tags, tag) + } + + delete(jsonMessage, DdtagsKey) + + newMessage, err := json.Marshal(jsonMessage) + if err != nil { + return nil, service, message + } + + return tags, service, string(newMessage) +} diff --git a/aws/logs_monitoring_go/internal/parsing/tags_test.go b/aws/logs_monitoring_go/internal/parsing/tags_test.go new file mode 100644 index 000000000..dfa873892 --- /dev/null +++ b/aws/logs_monitoring_go/internal/parsing/tags_test.go @@ -0,0 +1,164 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package parsing + +import ( + "encoding/json" + "slices" + "strings" + "testing" + + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" +) + +func TestExtractFromMessage(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + message string + wantTags model.Tags + wantService string + wantMessage string + }{ + "empty string": { + message: "", + wantTags: nil, + wantService: "", + wantMessage: "", + }, + "plain text": { + message: "ERROR something went wrong", + wantTags: nil, + wantService: "", + wantMessage: "ERROR something went wrong", + }, + "invalid json": { + message: `{not valid json}`, + wantTags: nil, + wantService: "", + wantMessage: `{not valid json}`, + }, + "json without ddtags": { + message: `{"level":"INFO","msg":"hello"}`, + wantTags: nil, + wantService: "", + wantMessage: `{"level":"INFO","msg":"hello"}`, + }, + "ddtags is not a string": { + message: `{"ddtags":["tag1","tag2"]}`, + wantTags: nil, + wantService: "", + wantMessage: `{"ddtags":["tag1","tag2"]}`, + }, + "single tag": { + message: `{"msg":"hello","ddtags":"env:prod"}`, + wantTags: model.Tags{"env:prod"}, + wantService: "", + wantMessage: `{"msg":"hello"}`, + }, + "multiple tags": { + message: `{"msg":"hello","ddtags":"env:prod,team:backend"}`, + wantTags: model.Tags{"env:prod", "team:backend"}, + wantService: "", + wantMessage: `{"msg":"hello"}`, + }, + "tags with spaces are cleaned": { + message: `{"msg":"hello","ddtags":"env:prod, team:backend, version:1.0"}`, + wantTags: model.Tags{"env:prod", "team:backend", "version:1.0"}, + wantService: "", + wantMessage: `{"msg":"hello"}`, + }, + "service tag extracted": { + message: `{"msg":"hello","ddtags":"service:my-app,env:prod"}`, + wantTags: model.Tags{"env:prod"}, + wantService: "my-app", + wantMessage: `{"msg":"hello"}`, + }, + "service only": { + message: `{"msg":"hello","ddtags":"service:my-app"}`, + wantTags: nil, + wantService: "my-app", + wantMessage: `{"msg":"hello"}`, + }, + "first service wins": { + message: `{"msg":"hello","ddtags":"service:first,service:second,env:prod"}`, + wantTags: model.Tags{"env:prod"}, + wantService: "first", + wantMessage: `{"msg":"hello"}`, + }, + "ddtags is empty string": { + message: `{"msg":"hello","ddtags":""}`, + wantTags: model.Tags{""}, + wantService: "", + wantMessage: `{"msg":"hello"}`, + }, + "ddtags only field in json": { + message: `{"ddtags":"env:prod"}`, + wantTags: model.Tags{"env:prod"}, + wantService: "", + wantMessage: `{}`, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + t.Parallel() + + gotTags, gotService, gotMessage := extractFromMessage(tc.message) + + if !slices.Equal(gotTags, tc.wantTags) { + t.Errorf("tags: got %v, want %v", gotTags, tc.wantTags) + } + if gotService != tc.wantService { + t.Errorf("service: got %q, want %q", gotService, tc.wantService) + } + if gotMessage != tc.wantMessage { + t.Errorf("message: got %q, want %q", gotMessage, tc.wantMessage) + } + }) + } +} + +func FuzzExtractFromMessage(f *testing.F) { + seeds := []string{ + "", + "plain text, not json", + `{not valid json}`, + `{"msg":"hello"}`, + `{"ddtags":["tag1","tag2"]}`, + `{"ddtags":42}`, + `{"msg":"hello","ddtags":"env:prod"}`, + `{"msg":"hello","ddtags":"env:prod, team:backend, version:1.0"}`, + `{"msg":"hello","ddtags":"service:my-app,env:prod"}`, + `{"msg":"hello","ddtags":"service:my-app"}`, + `{"msg":"hello","ddtags":"service:first,service:second,env:prod"}`, + `{"msg":"hello","ddtags":""}`, + `{"ddtags":"env:prod"}`, + } + for _, seed := range seeds { + f.Add(seed) + } + + f.Fuzz(func(t *testing.T, message string) { + tags, _, outMessage := extractFromMessage(message) + + if outMessage != message { + var parsed map[string]any + if err := json.Unmarshal([]byte(outMessage), &parsed); err != nil { + t.Errorf("output message is not valid JSON: %v", err) + } + if _, ok := parsed[DdtagsKey]; ok { + t.Errorf("output message still contains %q key", DdtagsKey) + } + } + + for _, tag := range tags { + if strings.HasPrefix(tag, "service:") { + t.Errorf("tag %q should have been extracted as service, not returned in tags", tag) + } + } + }) +}