Skip to content

Commit e58bcc6

Browse files
committed
[AWSINTS-3450] feat(go-forwarder): add CloudwatchLogEntry creation logic
1 parent 5a38306 commit e58bcc6

3 files changed

Lines changed: 173 additions & 4 deletions

File tree

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed
2+
// under the Apache License Version 2.0.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
// Copyright 2026-Present Datadog, Inc.
5+
6+
package parsing
7+
8+
import (
9+
"context"
10+
"encoding/json"
11+
"log/slog"
12+
"strings"
13+
14+
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config"
15+
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model"
16+
"github.com/aws/aws-lambda-go/events"
17+
"github.com/aws/aws-lambda-go/lambdacontext"
18+
)
19+
20+
func parseCloudwatchLogs(ctx context.Context, event json.RawMessage, cfg *config.Config, out chan<- model.CloudwatchLogEntry) {
21+
var cwEvent events.CloudwatchLogsEvent
22+
if err := json.Unmarshal(event, &cwEvent); err != nil {
23+
slog.Error("failed to unmarshal cloudwatch event", slog.Any("error", err))
24+
return
25+
}
26+
27+
data, err := cwEvent.AWSLogs.Parse()
28+
if err != nil {
29+
slog.Error("failed to decompress cloudwatch data", slog.Any("error", err))
30+
return
31+
}
32+
33+
if data.MessageType == "CONTROL_MESSAGE" {
34+
return
35+
}
36+
37+
source := getCloudwatchSource(cfg.Source, data.LogGroup, data.LogStream)
38+
metadata := getCloudwatchMetadata(ctx, data)
39+
host := getCloudwatchHost(cfg.Host, data.LogGroup)
40+
tags, service := getTagsAndService(*cfg)
41+
if service == "" {
42+
service = source
43+
}
44+
45+
for _, le := range data.LogEvents {
46+
entry := model.CloudwatchLogEntry{
47+
ID: le.ID,
48+
Timestamp: le.Timestamp,
49+
Message: le.Message,
50+
Source: source,
51+
Service: service,
52+
Host: host,
53+
Tags: tags,
54+
AWS: metadata,
55+
}
56+
57+
select {
58+
case out <- entry:
59+
case <-ctx.Done():
60+
return
61+
}
62+
}
63+
}
64+
65+
func getCloudwatchSource(sourceOverride, logGroup, logStream string) string {
66+
if sourceOverride != "" {
67+
return sourceOverride
68+
}
69+
70+
var source string
71+
if strings.Contains(logStream, "_CloudTrail_") {
72+
source = "cloudtrail"
73+
} else {
74+
source = getSourceFromLogGroup(strings.ToLower(logGroup))
75+
}
76+
77+
if strings.HasPrefix(logStream, "states/") {
78+
source = "stepfunction"
79+
}
80+
81+
return source
82+
}
83+
84+
func getSourceFromLogGroup(logGroupLower string) string {
85+
if strings.HasPrefix(logGroupLower, "_cloudtrail_") {
86+
return "cloudtrail"
87+
}
88+
if strings.HasPrefix(logGroupLower, "/aws/kinesis") {
89+
return "kinesis"
90+
}
91+
if strings.HasPrefix(logGroupLower, "/aws/lambda") {
92+
return "lambda"
93+
}
94+
if strings.HasPrefix(logGroupLower, "sns/") {
95+
return "sns"
96+
}
97+
if strings.Contains(logGroupLower, "cloudtrail") {
98+
return "cloudtrail"
99+
}
100+
return "cloudwatch"
101+
}
102+
103+
func getCloudwatchMetadata(ctx context.Context, data events.CloudwatchLogsData) model.CloudwatchMetadata {
104+
metadata := model.CloudwatchMetadata{
105+
Logs: model.CloudwatchLogsContext{
106+
LogGroup: data.LogGroup,
107+
LogStream: data.LogStream,
108+
Owner: data.Owner,
109+
},
110+
}
111+
112+
if lambdacontext.FunctionVersion != "$LATEST" {
113+
metadata.FunctionVersion = lambdacontext.FunctionVersion
114+
}
115+
116+
if lc, ok := lambdacontext.FromContext(ctx); ok {
117+
metadata.InvokedFunctionARN = lc.InvokedFunctionArn
118+
} else {
119+
slog.Warn("failed lambda context loading")
120+
}
121+
122+
return metadata
123+
}
124+
125+
func getCloudwatchHost(hostOverride, logGroup string) string {
126+
if hostOverride != "" {
127+
return hostOverride
128+
}
129+
130+
return logGroup
131+
}

aws/logs_monitoring_go/internal/parsing/parsing.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,3 @@ func detectFromRecords(dec *json.Decoder) invocationSource {
100100

101101
return invocationSourceUnknown
102102
}
103-
104-
func parseCloudwatchLogs(ctx context.Context, event json.RawMessage, cfg *config.Config, out chan<- model.CloudwatchLogEntry) {
105-
106-
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed
2+
// under the Apache License Version 2.0.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
// Copyright 2026-Present Datadog, Inc.
5+
6+
package parsing
7+
8+
import (
9+
"strings"
10+
11+
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config"
12+
"github.com/aws/aws-lambda-go/lambdacontext"
13+
)
14+
15+
func getTagsAndService(cfg config.Config) ([]string, string) {
16+
var tags []string
17+
var service string
18+
19+
if cfg.CustomTags != "" {
20+
for _, tag := range strings.Split(cfg.CustomTags, ",") {
21+
if strings.HasPrefix(tag, "service:") {
22+
if service != "" {
23+
continue
24+
}
25+
service = tag[8:]
26+
}
27+
28+
tags = append(tags, tag)
29+
}
30+
}
31+
32+
if cfg.Source != "" {
33+
tags = append(tags, "source_overridden:true")
34+
}
35+
36+
tags = append(tags,
37+
"forwardername:"+strings.ToLower(lambdacontext.FunctionName),
38+
"forwarder_version:"+config.ForwarderVersion,
39+
)
40+
41+
return tags, service
42+
}

0 commit comments

Comments
 (0)