Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion aws/logs_monitoring_go/cmd/forwarder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func handleRequest(cfg *config.Config) func(context.Context, json.RawMessage) er
invocationSource := parsing.DetectInvocationSource(event)
switch invocationSource {
case parsing.InvocationSourceCloudwatchLogs:
return pipeline.Run(ctx, event, cfg, forwarding.CloudwatchStorage, parsing.HandleCloudwatchLogs)
return pipeline.Run(ctx, event, cfg, forwarding.CloudwatchStorage, parsing.HandleCloudwatch)
case parsing.InvocationSourceKinesis:
return pipeline.Run(ctx, event, cfg, forwarding.CloudwatchStorage, parsing.HandleKinesis)
case parsing.InvocationSourceS3:
Expand Down
2 changes: 1 addition & 1 deletion aws/logs_monitoring_go/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"regexp"
)

var ForwarderVersion = "6.0"
const ForwarderVersion = "6.0"

type Config struct {
APIKey string
Expand Down
18 changes: 9 additions & 9 deletions aws/logs_monitoring_go/internal/config/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,16 @@ func TestEnvOrDefaultBool(t *testing.T) {
fallback bool
want bool
}{
"env_not_set": {value: "", set: false, fallback: false, want: false},
"env_not_set_fallback": {value: "", set: false, fallback: true, want: true},
"true_lowercase": {value: "true", set: true, fallback: false, want: true},
"true_uppercase": {value: "TRUE", set: true, fallback: false, want: true},
"true_mixed_case": {value: "True", set: true, fallback: false, want: true},
"false_value": {value: "false", set: true, fallback: true, want: false},
"one_is_true": {value: "1", set: true, fallback: false, want: true},
"zero_is_false": {value: "0", set: true, fallback: true, want: false},
"env_not_set": {value: "", set: false, fallback: false, want: false},
"env_not_set_fallback": {value: "", set: false, fallback: true, want: true},
"true_lowercase": {value: "true", set: true, fallback: false, want: true},
"true_uppercase": {value: "TRUE", set: true, fallback: false, want: true},
"true_mixed_case": {value: "True", set: true, fallback: false, want: true},
"false_value": {value: "false", set: true, fallback: true, want: false},
"one_is_true": {value: "1", set: true, fallback: false, want: true},
"zero_is_false": {value: "0", set: true, fallback: true, want: false},
"invalid_uses_fallback": {value: "yes", set: true, fallback: true, want: true},
"empty_uses_fallback": {value: "", set: true, fallback: true, want: true},
"empty_uses_fallback": {value: "", set: true, fallback: true, want: true},
}

for name, tc := range tests {
Expand Down
3 changes: 1 addition & 2 deletions aws/logs_monitoring_go/internal/config/kms_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package config

import (
"context"
"encoding/base64"
"errors"
"testing"
Expand Down Expand Up @@ -80,7 +79,7 @@ func TestResolveFromKMS(t *testing.T) {
mock := NewMockKMSAPIClient(ctrl)
tc.mockSetup(mock)

got, err := decryptKMSCiphertext(context.Background(), mock, tc.ciphertext)
got, err := decryptKMSCiphertext(t.Context(), mock, tc.ciphertext)
if tc.wantErr {
if err == nil {
t.Fatal("expected error, got nil")
Expand Down
5 changes: 5 additions & 0 deletions aws/logs_monitoring_go/internal/config/logger.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2026-Present Datadog, Inc.

package config

import (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package config

import (
"context"
"errors"
"testing"

Expand Down Expand Up @@ -72,7 +71,7 @@ func TestResolveFromSecretsManager(t *testing.T) {
mock := NewMockSecretsManagerAPIClient(ctrl)
tc.mockSetup(mock)

got, err := fetchSecret(context.Background(), mock, tc.arn)
got, err := fetchSecret(t.Context(), mock, tc.arn)
if tc.wantErr {
if err == nil {
t.Fatal("expected error, got nil")
Expand Down
3 changes: 1 addition & 2 deletions aws/logs_monitoring_go/internal/config/ssm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package config

import (
"context"
"errors"
"testing"

Expand Down Expand Up @@ -90,7 +89,7 @@ func TestResolveFromSSM(t *testing.T) {
mock := NewMockSSMAPIClient(ctrl)
tc.mockSetup(mock)

got, err := fetchSSMParameter(context.Background(), mock, tc.name)
got, err := fetchSSMParameter(t.Context(), mock, tc.name)
if tc.wantErr {
if err == nil {
t.Fatal("expected error, got nil")
Expand Down
3 changes: 3 additions & 0 deletions aws/logs_monitoring_go/internal/forwarding/forwarding.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"io"
"log/slog"
"net/http"
"time"

"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/concurrent"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config"
Expand All @@ -25,6 +26,8 @@ const (
S3Storage = "s3"
)

var Client *http.Client = &http.Client{Timeout: 10 * time.Second}

type Forwarder struct {
config *config.Config
client *http.Client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,12 @@ func TestForward(t *testing.T) {

rw.WriteHeader(tc.statusCode)
}))
defer server.Close()
t.Cleanup(server.Close)

f := NewForwarder(&config.Config{IntakeURL: server.URL, APIKey: "test-api-key"}, server.Client(), tc.storage)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx, cancel := context.WithCancel(t.Context())
t.Cleanup(cancel)

if tc.cancelCtx {
cancel()
Expand Down
24 changes: 24 additions & 0 deletions aws/logs_monitoring_go/internal/model/cloudwatch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2026-Present Datadog, Inc.

package model

type CloudwatchLogEntry struct {
LogEntry
ID string `json:"id"`
Timestamp int64 `json:"timestamp"`
Host string `json:"hostname"`
}

type CloudwatchMetadata struct {
LambdaOrigin
Origin CloudwatchOrigin `json:"awslogs"`
}

type CloudwatchOrigin struct {
LogGroup string `json:"logGroup"`
LogStream string `json:"logStream"`
Owner string `json:"owner"`
}
29 changes: 0 additions & 29 deletions aws/logs_monitoring_go/internal/model/cloudwatchlogs.go

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,24 @@ import (

var ErrLambdaContextMissing = errors.New("lambda context not found")

type Metadata struct {
type LambdaOrigin struct {
ARN string `json:"invoked_function_arn"`
Version string `json:"function_version,omitempty"`
}

func GetMetadata(ctx context.Context) (Metadata, error) {
var metadata Metadata
func GetLambdaOrigin(ctx context.Context) (LambdaOrigin, error) {
var origin LambdaOrigin

if lambdacontext.FunctionVersion != "$LATEST" {
metadata.Version = lambdacontext.FunctionVersion
origin.Version = lambdacontext.FunctionVersion
}

lc, ok := lambdacontext.FromContext(ctx)
if !ok {
return Metadata{}, ErrLambdaContextMissing
return LambdaOrigin{}, ErrLambdaContextMissing
}

metadata.ARN = lc.InvokedFunctionArn
origin.ARN = lc.InvokedFunctionArn

return metadata, nil
return origin, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,28 @@ import (
"strings"
)

const sourceCategory = "aws"

type LogEntry struct {
Message string `json:"message"`
Source string `json:"ddsource"`
SourceCategory string `json:"ddsourcecategory"`
Service string `json:"service"`
Tags Tags `json:"ddtags"`
Metadata any `json:"aws"`
}

func NewLogEntry(metadata any, tags Tags, message, source, service string) LogEntry {
return LogEntry{
Message: message,
Source: source,
SourceCategory: sourceCategory,
Service: service,
Tags: tags,
Metadata: metadata,
}
}

type Tags []string

func (t Tags) MarshalJSON() ([]byte, error) {
Expand Down
15 changes: 3 additions & 12 deletions aws/logs_monitoring_go/internal/model/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,12 @@

package model

type S3LogEntry struct {
Message string `json:"message"`
Source string `json:"ddsource"`
SourceCategory string `json:"ddsourcecategory"`
Service string `json:"service"`
Tags Tags `json:"ddtags"`
Metadata S3Metadata `json:"aws"`
}

type S3Metadata struct {
Metadata
S3Context S3Context `json:"s3"`
LambdaOrigin
Origin S3Origin `json:"s3"`
}

type S3Context struct {
type S3Origin struct {
Bucket string `json:"bucket"`
Key string `json:"key"`
}
Loading
Loading