-
Notifications
You must be signed in to change notification settings - Fork 390
[AWSINTS-3450] feat(go-forwarder): add CloudwatchLogEntry creation logic #1094
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
363def9
62971ce
87f74ed
1757e45
d760674
4dbd0e9
8795b6b
c4d9c81
d05ffdb
9dd1b5a
d543cfa
5349a82
aba33a6
d9eb163
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,148 @@ | ||
| // Unless explicitly stated otherwise all files in this repository are licensed | ||
| // under the Apache License Version 2.0. | ||
| // This product includes software developed at Datadog (https://www.datadoghq.com/). | ||
| // Copyright 2026-Present Datadog, Inc. | ||
|
|
||
| package parsing | ||
|
|
||
| import ( | ||
| "context" | ||
| "encoding/json" | ||
| "fmt" | ||
| "log/slog" | ||
| "strings" | ||
|
|
||
| "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/concurrent" | ||
| "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" | ||
| "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" | ||
| "github.com/aws/aws-lambda-go/events" | ||
| "github.com/aws/aws-lambda-go/lambdacontext" | ||
| ) | ||
|
|
||
| func HandleCloudwatchLogs(ctx context.Context, event json.RawMessage, cfg *config.Config, out chan<- model.CloudwatchLogEntry) error { | ||
| logEntries, err := parseCloudwatchLogs(ctx, event, cfg) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| for _, logEntry := range logEntries { | ||
| if err := concurrent.SafeSender(ctx, out, logEntry); err != nil { | ||
| return err | ||
| } | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| func parseCloudwatchLogs(ctx context.Context, event json.RawMessage, cfg *config.Config) ([]model.CloudwatchLogEntry, error) { | ||
| var cwEvent events.CloudwatchLogsEvent | ||
| if err := json.Unmarshal(event, &cwEvent); err != nil { | ||
| return nil, fmt.Errorf("unmarshal: %w", err) | ||
| } | ||
|
|
||
| data, err := cwEvent.AWSLogs.Parse() | ||
| if err != nil { | ||
| return nil, fmt.Errorf("parse: %w", err) | ||
| } | ||
|
|
||
| if data.MessageType == "CONTROL_MESSAGE" { | ||
| return nil, nil | ||
| } | ||
|
Comment on lines
+48
to
+50
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not present in the Python implementation. the control message are probing messages from cloudwatch to verify that the lambda is reachable => noise. |
||
|
|
||
| source := getCloudwatchSource(cfg.Source, data.LogGroup, data.LogStream) | ||
| metadata := getCloudwatchMetadata(ctx, data) | ||
| host := getCloudwatchHost(cfg.Host, data.LogGroup) | ||
| tags, service := getTagsAndService(*cfg) | ||
| if service == "" { | ||
| service = source | ||
| } | ||
|
|
||
| var entries []model.CloudwatchLogEntry | ||
| for _, le := range data.LogEvents { | ||
| ddtags, ddtagsService, message := extractFromMessage(le.Message) | ||
| entryService := service | ||
| if ddtagsService != "" { | ||
| entryService = ddtagsService | ||
| } | ||
| ddtags = append(ddtags, "service:"+entryService) | ||
|
|
||
| entry := model.CloudwatchLogEntry{ | ||
| ID: le.ID, | ||
| Timestamp: le.Timestamp, | ||
| Message: message, | ||
| Source: source, | ||
| Service: entryService, | ||
| Host: host, | ||
| Tags: append(ddtags, tags...), | ||
| AWS: metadata, | ||
| } | ||
| entries = append(entries, entry) | ||
| } | ||
|
|
||
| return entries, nil | ||
| } | ||
|
|
||
| func getCloudwatchSource(sourceOverride, logGroup, logStream string) string { | ||
| if sourceOverride != "" { | ||
| return sourceOverride | ||
| } | ||
|
|
||
| if strings.HasPrefix(logStream, "states/") { | ||
| return "stepfunction" | ||
| } | ||
|
|
||
| if strings.Contains(logStream, "_CloudTrail_") { | ||
| return "cloudtrail" | ||
| } | ||
|
|
||
| return getSourceFromLogGroup(strings.ToLower(logGroup)) | ||
| } | ||
|
|
||
| func getSourceFromLogGroup(logGroupLower string) string { | ||
| if strings.HasPrefix(logGroupLower, "_cloudtrail_") { | ||
| return "cloudtrail" | ||
| } | ||
| if strings.HasPrefix(logGroupLower, "/aws/kinesis") { | ||
| return "kinesis" | ||
| } | ||
| if strings.HasPrefix(logGroupLower, "/aws/lambda") { | ||
| return "lambda" | ||
| } | ||
| if strings.HasPrefix(logGroupLower, "sns/") { | ||
| return "sns" | ||
| } | ||
| if strings.Contains(logGroupLower, "cloudtrail") { | ||
| return "cloudtrail" | ||
| } | ||
| return "cloudwatch" | ||
| } | ||
|
|
||
| func getCloudwatchMetadata(ctx context.Context, data events.CloudwatchLogsData) model.CloudwatchMetadata { | ||
| metadata := model.CloudwatchMetadata{ | ||
| Logs: model.CloudwatchLogsContext{ | ||
| LogGroup: data.LogGroup, | ||
| LogStream: data.LogStream, | ||
| Owner: data.Owner, | ||
| }, | ||
| } | ||
|
|
||
| if lambdacontext.FunctionVersion != "$LATEST" { | ||
| metadata.FunctionVersion = lambdacontext.FunctionVersion | ||
| } | ||
|
|
||
| if lc, ok := lambdacontext.FromContext(ctx); ok { | ||
| metadata.InvokedFunctionARN = lc.InvokedFunctionArn | ||
| } else { | ||
| slog.Warn("failed to load lambda context, this should not happen in production. The code is either not running from AWS Lambda or context is broken.") | ||
| } | ||
|
|
||
| return metadata | ||
| } | ||
|
|
||
| func getCloudwatchHost(hostOverride, logGroup string) string { | ||
| if hostOverride != "" { | ||
| return hostOverride | ||
| } | ||
|
|
||
| return logGroup | ||
| } | ||
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have 3 workers. In any worker encounter an error, it exits and stop, stopping to dequeue from the input channel.
If the 3 workers fail, the
inchan will be full, nothing will dequeue it, introducing a deadlock for the upstream goroutine.The only call of Forward is here but the
ctxis not managed by theerrgroup.Group. So if the goroutine fails, there is no signal sent to other goroutine to stop.We should fix the context cancellation in the pipeline file and maybe leave a TODO here to handle the retry mechanism (later in the migration).