Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 26 additions & 17 deletions aws/logs_monitoring_go/cmd/forwarder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,40 +8,49 @@ package main
import (
"context"
"encoding/json"
"log/slog"
"errors"

"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/forwarding"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/handling"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/parsing"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/pipeline"

"github.com/aws/aws-lambda-go/lambda"
)

func main() {
ctx := context.Background()
cfg, err := config.Load(ctx)
cfg, err := config.Load()
if err != nil {
slog.Error("config load failed", slog.Any("error", err))
return
panic(err)
}
err = cfg.ResolveAPIKey(context.Background())
if err != nil {
panic(err)
}
err = cfg.ValidateAPIKey(context.Background())
if err != nil {
panic(err)
}

cwHandler := handling.NewCloudwatch(cfg)
kinesisHandler := handling.NewKinesis(cfg)
s3Handler := handling.NewS3(cfg)
handling.Register(parsing.InvocationSourceCloudwatchLogs, cwHandler)
handling.Register(parsing.InvocationSourceKinesis, kinesisHandler)
handling.Register(parsing.InvocationSourceS3, s3Handler)

lambda.Start(handleRequest(cfg))
}

func handleRequest(cfg *config.Config) func(context.Context, json.RawMessage) error {
func handleRequest(cfg *config.Config) func(ctx context.Context, event json.RawMessage) error {
return func(ctx context.Context, event json.RawMessage) error {
invocationSource := parsing.DetectInvocationSource(event)
switch invocationSource {
case parsing.InvocationSourceCloudwatchLogs:
return pipeline.Run(ctx, event, cfg, forwarding.CloudwatchStorage, parsing.HandleCloudwatch)
case parsing.InvocationSourceKinesis:
return pipeline.Run(ctx, event, cfg, forwarding.CloudwatchStorage, parsing.HandleKinesis)
case parsing.InvocationSourceS3:
return pipeline.Run(ctx, event, cfg, forwarding.S3Storage, parsing.HandleS3)
default:
slog.Error("unsupported invocation source", slog.String("source", invocationSource.String()))
return nil
invocation := parsing.DetectInvocationSource(event)
if invocation == parsing.InvocationSourceUnknown {
return errors.New("unknown invocation")
}

run := pipeline.NewRun(cfg, handling.Handlers[invocation], forwarding.Storage(invocation))
return pipeline.Start(ctx, event, run)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2026-Present Datadog, Inc.

package processing
package batching

import (
"bytes"
Expand All @@ -12,6 +12,7 @@ import (
"log/slog"

"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/concurrent"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model"
)

const (
Expand All @@ -20,18 +21,18 @@ const (
maxItemsPerBatch = 1000
)

type Batcher[T any] struct {
type Batcher struct {
batch [][]byte
batchSize int
}

func NewBatcher[T any]() *Batcher[T] {
return &Batcher[T]{
func NewBatcher() *Batcher {
return &Batcher{
batch: make([][]byte, 0, maxItemsPerBatch),
}
}

func (b *Batcher[T]) Batch(ctx context.Context, in <-chan T, out chan<- []byte) error {
func (b *Batcher) Batch(ctx context.Context, in <-chan model.LogEntry, out chan<- []byte) error {
for {
entry, ok, err := concurrent.SafeReader(ctx, in)
if err != nil {
Expand Down Expand Up @@ -66,7 +67,7 @@ func (b *Batcher[T]) Batch(ctx context.Context, in <-chan T, out chan<- []byte)
}
}

func (b *Batcher[T]) flush(ctx context.Context, out chan<- []byte) error {
func (b *Batcher) flush(ctx context.Context, out chan<- []byte) error {
if len(b.batch) == 0 {
return nil
}
Expand Down
92 changes: 92 additions & 0 deletions aws/logs_monitoring_go/internal/batching/batcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2026-Present Datadog, Inc.

package batching

import (
"encoding/json"
"strings"
"testing"

"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model"
)

func TestBatch(t *testing.T) {
t.Parallel()

tests := map[string]struct {
entries []model.LogEntry
wantBatchCount int
wantEntryCounts []int
}{
"empty": {
entries: nil,
wantBatchCount: 0,
},
"single entry": {
entries: []model.LogEntry{model.NewLogEntry()},
wantBatchCount: 1,
wantEntryCounts: []int{1},
},
"multiple entries, one batch": {
entries: []model.LogEntry{model.NewLogEntry(), model.NewLogEntry(), model.NewLogEntry()},
wantBatchCount: 1,
wantEntryCounts: []int{3},
},
"drop oversized entry": {
entries: func() []model.LogEntry {
entry := model.NewLogEntry()
entry.Message = strings.Repeat("a", maxItemSize+1)
return []model.LogEntry{entry}
}(),
wantBatchCount: 0,
wantEntryCounts: nil,
},
"split": {
entries: make([]model.LogEntry, maxItemsPerBatch+1),
wantBatchCount: 2,
wantEntryCounts: []int{maxItemsPerBatch, 1},
},
}

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
t.Parallel()

in := make(chan model.LogEntry, len(tc.entries))
out := make(chan []byte, len(tc.wantEntryCounts))
for _, entry := range tc.entries {
in <- entry
}
close(in)

batcher := NewBatcher()
err := batcher.Batch(t.Context(), in, out)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
close(out)

var batches [][]byte
for b := range out {
batches = append(batches, b)
}

if len(batches) != tc.wantBatchCount {
t.Fatalf("Batch() got %d batches, want %d", len(batches), tc.wantBatchCount)
}

for i, wantCount := range tc.wantEntryCounts {
var entries []model.LogEntry
if err := json.Unmarshal(batches[i], &entries); err != nil {
t.Fatalf("Batch() failed to unmarshal batch %d: %v", i, err)
}
if len(entries) != wantCount {
t.Errorf("Batch() batch %d: got %d entries, want %d", i, len(entries), wantCount)
}
}
})
}
}
4 changes: 2 additions & 2 deletions aws/logs_monitoring_go/internal/config/apikey.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type apiKeyResolver struct {
resolve func(ctx context.Context, value string) (string, error)
}

func (c *Config) resolveAPIKey(ctx context.Context) error {
func (c *Config) ResolveAPIKey(ctx context.Context) error {
resolvers := []apiKeyResolver{
{"DD_API_KEY_SECRET_ARN", c.resolveAPIKeyFromSecretsManager},
{"DD_API_KEY_SSM_NAME", c.resolveAPIKeyFromSSM},
Expand All @@ -55,7 +55,7 @@ func (c *Config) resolveAPIKey(ctx context.Context) error {
return errors.New("no API key configured: set DD_API_KEY_SECRET_ARN, DD_API_KEY_SSM_NAME or DD_KMS_API_KEY. See: https://docs.datadoghq.com/serverless/forwarder/")
}

func (c *Config) validateAPIKey(ctx context.Context) error {
func (c *Config) ValidateAPIKey(ctx context.Context) error {
if c.APIKey == "" {
return fmt.Errorf("set DD_API_KEY_SECRET_ARN, DD_API_KEY_SSM_NAME or DD_KMS_API_KEY. See: https://docs.datadoghq.com/serverless/forwarder/: %w", ErrMissingAPIKey)
}
Expand Down
120 changes: 46 additions & 74 deletions aws/logs_monitoring_go/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@
package config

import (
"context"
"errors"
"fmt"
"log/slog"
"regexp"

"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/filtering"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/scrubbing"
)

const ForwarderVersion = "6.0"
Expand All @@ -23,96 +26,65 @@ type Config struct {
UseFIPS bool
Source string
Host string
CustomTags string
Scrubbing ScrubbingConfig
Filtering FilteringConfig
Tags model.Tags
Service string
Scrubber *scrubbing.Scrubber
Filter *filtering.Filter
S3MultilineLogRegex *regexp.Regexp
}

type ScrubbingConfig struct {
ScrubIP bool
ScrubEmail bool
CustomRule string
CustomReplacement string
}

type FilteringConfig struct {
IncludePattern string
ExcludePattern string
}

func Load(ctx context.Context) (*Config, error) {
initLogger(envOrDefault("DD_LOG_LEVEL", "INFO"))
func Load() (*Config, error) {
logLevel := envOrDefault("DD_LOG_LEVEL", "INFO")
initLogger(logLevel)
logDroppedEnvVars()

cfg := loadConfig()
slog.Debug("config loaded", "config", cfg)
var cfg Config
cfg.LogLevel = logLevel
cfg.loadEnv()
cfg.extractFromEnv()

if err := cfg.resolveAPIKey(ctx); err != nil {
return nil, fmt.Errorf("resolving API key: %w", err)
}
err := cfg.compileS3MultilineLogRegex()

scrubber, scrubbingErr := scrubbing.NewScrubber(
envOrDefault("DD_SCRUBBING_RULE", ""),
envOrDefault("DD_SCRUBBING_RULE_REPLACEMENT", ""),
envOrDefaultBool("REDACT_IP", false),
envOrDefaultBool("REDACT_EMAIL", false),
)
err = errors.Join(err, scrubbingErr)

if err := cfg.validateAPIKey(ctx); err != nil {
return nil, fmt.Errorf("validating API key: %w", err)
filter, filteringErr := filtering.NewFilter(
envOrDefault("INCLUDE_AT_MATCH", ""),
envOrDefault("EXCLUDE_AT_MATCH", ""),
)
err = errors.Join(err, filteringErr)
if err != nil {
return nil, err
}

return cfg, nil
cfg.Scrubber = scrubber
cfg.Filter = filter
return &cfg, nil
}

func loadConfig() *Config {
S3MultilineLogRegex := loadS3MultilineLogRegex()
site := envOrDefault("DD_SITE", "datadoghq.com")

return &Config{
Site: site,
IntakeURL: envOrDefault("DD_URL", "https://http-intake.logs."+site+"/api/v2/logs"),
APIURL: envOrDefault("DD_API_URL", "https://api."+site),
LogLevel: envOrDefault("DD_LOG_LEVEL", "INFO"),
UseFIPS: envOrDefaultBool("DD_USE_FIPS", false),
Source: envOrDefault("DD_SOURCE", ""),
Host: envOrDefault("DD_HOST", ""),
CustomTags: envOrDefault("DD_TAGS", ""),
Scrubbing: ScrubbingConfig{
ScrubIP: envOrDefaultBool("REDACT_IP", false),
ScrubEmail: envOrDefaultBool("REDACT_EMAIL", false),
CustomRule: envOrDefault("DD_SCRUBBING_RULE", ""),
CustomReplacement: envOrDefault("DD_SCRUBBING_RULE_REPLACEMENT", ""),
},
Filtering: FilteringConfig{
IncludePattern: envOrDefault("INCLUDE_AT_MATCH", ""),
ExcludePattern: envOrDefault("EXCLUDE_AT_MATCH", ""),
},
S3MultilineLogRegex: S3MultilineLogRegex,
}
func (c *Config) loadEnv() {
c.Site = envOrDefault("DD_SITE", "datadoghq.com")
c.IntakeURL = envOrDefault("DD_URL", "https://http-intake.logs."+c.Site+"/api/v2/logs")
c.APIURL = envOrDefault("DD_API_URL", "https://api."+c.Site)
c.UseFIPS = envOrDefaultBool("DD_USE_FIPS", false)
c.Source = envOrDefault("DD_SOURCE", "")
c.Host = envOrDefault("DD_HOST", "")
}

func loadS3MultilineLogRegex() *regexp.Regexp {
func (c *Config) compileS3MultilineLogRegex() error {
pattern := envOrDefault("DD_MULTILINE_LOG_REGEX_PATTERN", "")
if pattern == "" {
return nil
}

re, err := regexp.Compile(pattern)
if err != nil {
slog.Error("invalid multiline log pattern", slog.String("pattern", pattern), slog.Any("error", err))
return nil
return fmt.Errorf("compile multiline log regex: %w", err)
}

return re
}

func (c Config) LogValue() slog.Value {
return slog.GroupValue(
slog.String("site", c.Site),
slog.String("intakeUrl", c.IntakeURL),
slog.String("apiUrl", c.APIURL),
slog.String("loglevel", c.LogLevel),
slog.Bool("fips", c.UseFIPS),
slog.Bool("redactIP", c.Scrubbing.ScrubIP),
slog.Bool("redactEmail", c.Scrubbing.ScrubEmail),
slog.Bool("customScrubbing", c.Scrubbing.CustomRule != ""),
slog.Bool("includeFilter", c.Filtering.IncludePattern != ""),
slog.Bool("excludeFilter", c.Filtering.ExcludePattern != ""),
slog.Bool("multilineRegex", c.S3MultilineLogRegex != nil),
)
c.S3MultilineLogRegex = re
return nil
}
Loading
Loading