Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions aws/logs_monitoring_go/cmd/forwarder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"log/slog"

"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/parsing"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/pipeline"

"github.com/aws/aws-lambda-go/lambda"
)
Expand All @@ -26,10 +28,15 @@ func main() {
lambda.Start(handleRequest(cfg))
}

// cfg not used for now, will be when forwarding logic added
func handleRequest(cfg *config.Config) func(context.Context, json.RawMessage) error {
return func(ctx context.Context, event json.RawMessage) error {
slog.Info("received event", slog.String("event", string(event)))
return nil
invocationSource := parsing.DetectInvocationSource(event)
switch invocationSource {
case parsing.InvocationSourceCloudwatchLogs:
return pipeline.Run(ctx, event, cfg, parsing.HandleCloudwatchLogs)
default:
slog.Error("unsupported invocation source", slog.String("source", invocationSource.String()))
return nil
}
}
}
52 changes: 35 additions & 17 deletions aws/logs_monitoring_go/internal/forwarding/forwarding.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,48 @@ import (

"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/concurrent"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config"
"golang.org/x/sync/errgroup"
)

const numWorkers = 3

type Forwarder struct {
Config config.Config
Client *http.Client
config *config.Config
client *http.Client
}

func NewForwarder(config *config.Config, client *http.Client) Forwarder {
return Forwarder{
config: config,
client: client,
}
}

func (f Forwarder) Forward(ctx context.Context, in <-chan []byte) error {
for {
body, ok, err := concurrent.SafeReader(ctx, in)
if err != nil {
return err
}
if !ok {
break
}
g, ctx := errgroup.WithContext(ctx)

if err := f.send(ctx, body); err != nil {
return err
}
for range numWorkers {
g.Go(func() error {
for {
body, ok, err := concurrent.SafeReader(ctx, in)
if err != nil {
return err
}
if !ok {
return nil

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ issue: ‏There is a concurrency issue in this function.

We have 3 workers. In any worker encounter an error, it exits and stop, stopping to dequeue from the input channel.

If the 3 workers fail, the in chan will be full, nothing will dequeue it, introducing a deadlock for the upstream goroutine.

The only call of Forward is here but the ctx is not managed by the errgroup.Group. So if the goroutine fails, there is no signal sent to other goroutine to stop.

We should fix the context cancellation in the pipeline file and maybe leave a TODO here to handle the retry mechanism (later in the migration).

}

if err := f.send(ctx, body); err != nil {
return err
}
}
})
}

return nil
return g.Wait()
}

// TODO: add retry mechanism for resiliency
func (f Forwarder) send(ctx context.Context, body []byte) error {
var compressedBody bytes.Buffer
zw := gzip.NewWriter(&compressedBody)
Expand All @@ -51,18 +68,19 @@ func (f Forwarder) send(ctx context.Context, body []byte) error {
return fmt.Errorf("closing gzip writer: %w", err)
}

req, err := http.NewRequestWithContext(ctx, http.MethodPost, f.Config.IntakeURL, bytes.NewReader(compressedBody.Bytes()))
req, err := http.NewRequestWithContext(ctx, http.MethodPost, f.config.IntakeURL, bytes.NewReader(compressedBody.Bytes()))
if err != nil {
return err
}

req.Header.Set("DD-API-KEY", f.Config.APIKey)
req.Header.Set("DD-API-KEY", f.config.APIKey)
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Content-Encoding", "gzip")
req.Header.Set("DD-EVP-ORIGIN", "aws_forwarder")
req.Header.Set("DD-EVP-ORIGIN-VERSION", config.ForwarderVersion)
req.Header.Set("DD-STORAGE-TAG", "cloudwatch")

resp, err := f.Client.Do(req)
resp, err := f.client.Do(req)
if err != nil {
return fmt.Errorf("sending to intake: %w", err)
}
Expand Down
11 changes: 4 additions & 7 deletions aws/logs_monitoring_go/internal/forwarding/forwarding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,10 @@ func TestForward(t *testing.T) {
}))
defer server.Close()

f := Forwarder{
Config: config.Config{
IntakeURL: server.URL,
APIKey: "test-api-key",
},
Client: server.Client(),
}
f := NewForwarder(&config.Config{
IntakeURL: server.URL,
APIKey: "test-api-key",
}, server.Client())

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
148 changes: 148 additions & 0 deletions aws/logs_monitoring_go/internal/parsing/cloudwatchlogs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2026-Present Datadog, Inc.

package parsing

import (
"context"
"encoding/json"
"fmt"
"log/slog"
"strings"

"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/concurrent"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambdacontext"
)

func HandleCloudwatchLogs(ctx context.Context, event json.RawMessage, cfg *config.Config, out chan<- model.CloudwatchLogEntry) error {
logEntries, err := parseCloudwatchLogs(ctx, event, cfg)
if err != nil {
return err
}

for _, logEntry := range logEntries {
if err := concurrent.SafeSender(ctx, out, logEntry); err != nil {
return err
}
}

return nil
}

func parseCloudwatchLogs(ctx context.Context, event json.RawMessage, cfg *config.Config) ([]model.CloudwatchLogEntry, error) {
var cwEvent events.CloudwatchLogsEvent
if err := json.Unmarshal(event, &cwEvent); err != nil {
return nil, fmt.Errorf("unmarshal: %w", err)
}

data, err := cwEvent.AWSLogs.Parse()
if err != nil {
return nil, fmt.Errorf("parse: %w", err)
}

if data.MessageType == "CONTROL_MESSAGE" {
return nil, nil
}
Comment on lines +48 to +50

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not present in the Python implementation. the control message are probing messages from cloudwatch to verify that the lambda is reachable => noise.


source := getCloudwatchSource(cfg.Source, data.LogGroup, data.LogStream)
metadata := getCloudwatchMetadata(ctx, data)
host := getCloudwatchHost(cfg.Host, data.LogGroup)
tags, service := getTagsAndService(*cfg)
if service == "" {
service = source
}

var entries []model.CloudwatchLogEntry
for _, le := range data.LogEvents {
ddtags, ddtagsService, message := extractFromMessage(le.Message)
entryService := service
if ddtagsService != "" {
entryService = ddtagsService
}
ddtags = append(ddtags, "service:"+entryService)

entry := model.CloudwatchLogEntry{
ID: le.ID,
Timestamp: le.Timestamp,
Message: message,
Source: source,
Service: entryService,
Host: host,
Tags: append(ddtags, tags...),
AWS: metadata,
}
entries = append(entries, entry)
}

return entries, nil
}

func getCloudwatchSource(sourceOverride, logGroup, logStream string) string {
if sourceOverride != "" {
return sourceOverride
}

if strings.HasPrefix(logStream, "states/") {
return "stepfunction"
}

if strings.Contains(logStream, "_CloudTrail_") {
return "cloudtrail"
}

return getSourceFromLogGroup(strings.ToLower(logGroup))
}

func getSourceFromLogGroup(logGroupLower string) string {
if strings.HasPrefix(logGroupLower, "_cloudtrail_") {
return "cloudtrail"
}
if strings.HasPrefix(logGroupLower, "/aws/kinesis") {
return "kinesis"
}
if strings.HasPrefix(logGroupLower, "/aws/lambda") {
return "lambda"
}
if strings.HasPrefix(logGroupLower, "sns/") {
return "sns"
}
if strings.Contains(logGroupLower, "cloudtrail") {
return "cloudtrail"
}
return "cloudwatch"
}

func getCloudwatchMetadata(ctx context.Context, data events.CloudwatchLogsData) model.CloudwatchMetadata {
metadata := model.CloudwatchMetadata{
Logs: model.CloudwatchLogsContext{
LogGroup: data.LogGroup,
LogStream: data.LogStream,
Owner: data.Owner,
},
}

if lambdacontext.FunctionVersion != "$LATEST" {
metadata.FunctionVersion = lambdacontext.FunctionVersion
}

if lc, ok := lambdacontext.FromContext(ctx); ok {
metadata.InvokedFunctionARN = lc.InvokedFunctionArn
} else {
slog.Warn("failed to load lambda context, this should not happen in production. The code is either not running from AWS Lambda or context is broken.")
}

return metadata
}

func getCloudwatchHost(hostOverride, logGroup string) string {
if hostOverride != "" {
return hostOverride
}

return logGroup
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading