diff --git a/aws/logs_monitoring_go/cmd/forwarder/main.go b/aws/logs_monitoring_go/cmd/forwarder/main.go index cc8600a62..d31ad219a 100644 --- a/aws/logs_monitoring_go/cmd/forwarder/main.go +++ b/aws/logs_monitoring_go/cmd/forwarder/main.go @@ -8,10 +8,11 @@ package main import ( "context" "encoding/json" - "log/slog" + "errors" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/forwarding" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/handling" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/parsing" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/pipeline" @@ -19,29 +20,37 @@ import ( ) func main() { - ctx := context.Background() - cfg, err := config.Load(ctx) + cfg, err := config.Load() if err != nil { - slog.Error("config load failed", slog.Any("error", err)) - return + panic(err) } + err = cfg.ResolveAPIKey(context.Background()) + if err != nil { + panic(err) + } + err = cfg.ValidateAPIKey(context.Background()) + if err != nil { + panic(err) + } + + cwHandler := handling.NewCloudwatch(cfg) + kinesisHandler := handling.NewKinesis(cfg) + s3Handler := handling.NewS3(cfg) + handling.Register(parsing.InvocationSourceCloudwatchLogs, cwHandler) + handling.Register(parsing.InvocationSourceKinesis, kinesisHandler) + handling.Register(parsing.InvocationSourceS3, s3Handler) lambda.Start(handleRequest(cfg)) } -func handleRequest(cfg *config.Config) func(context.Context, json.RawMessage) error { +func handleRequest(cfg *config.Config) func(ctx context.Context, event json.RawMessage) error { return func(ctx context.Context, event json.RawMessage) error { - invocationSource := parsing.DetectInvocationSource(event) - switch invocationSource { - case parsing.InvocationSourceCloudwatchLogs: - return pipeline.Run(ctx, event, cfg, forwarding.CloudwatchStorage, parsing.HandleCloudwatch) - case parsing.InvocationSourceKinesis: - return pipeline.Run(ctx, event, cfg, forwarding.CloudwatchStorage, parsing.HandleKinesis) - case parsing.InvocationSourceS3: - return pipeline.Run(ctx, event, cfg, forwarding.S3Storage, parsing.HandleS3) - default: - slog.Error("unsupported invocation source", slog.String("source", invocationSource.String())) - return nil + invocation := parsing.DetectInvocationSource(event) + if invocation == parsing.InvocationSourceUnknown { + return errors.New("unknown invocation") } + + run := pipeline.NewRun(cfg, handling.Handlers[invocation], forwarding.Storage(invocation)) + return pipeline.Start(ctx, event, run) } } diff --git a/aws/logs_monitoring_go/internal/processing/batching.go b/aws/logs_monitoring_go/internal/batching/batcher.go similarity index 82% rename from aws/logs_monitoring_go/internal/processing/batching.go rename to aws/logs_monitoring_go/internal/batching/batcher.go index 28b7fa14e..1150aebb2 100644 --- a/aws/logs_monitoring_go/internal/processing/batching.go +++ b/aws/logs_monitoring_go/internal/batching/batcher.go @@ -3,7 +3,7 @@ // This product includes software developed at Datadog (https://www.datadoghq.com/). // Copyright 2026-Present Datadog, Inc. -package processing +package batching import ( "bytes" @@ -12,6 +12,7 @@ import ( "log/slog" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/concurrent" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" ) const ( @@ -20,18 +21,18 @@ const ( maxItemsPerBatch = 1000 ) -type Batcher[T any] struct { +type Batcher struct { batch [][]byte batchSize int } -func NewBatcher[T any]() *Batcher[T] { - return &Batcher[T]{ +func NewBatcher() *Batcher { + return &Batcher{ batch: make([][]byte, 0, maxItemsPerBatch), } } -func (b *Batcher[T]) Batch(ctx context.Context, in <-chan T, out chan<- []byte) error { +func (b *Batcher) Batch(ctx context.Context, in <-chan model.LogEntry, out chan<- []byte) error { for { entry, ok, err := concurrent.SafeReader(ctx, in) if err != nil { @@ -66,7 +67,7 @@ func (b *Batcher[T]) Batch(ctx context.Context, in <-chan T, out chan<- []byte) } } -func (b *Batcher[T]) flush(ctx context.Context, out chan<- []byte) error { +func (b *Batcher) flush(ctx context.Context, out chan<- []byte) error { if len(b.batch) == 0 { return nil } diff --git a/aws/logs_monitoring_go/internal/batching/batcher_test.go b/aws/logs_monitoring_go/internal/batching/batcher_test.go new file mode 100644 index 000000000..86303fc0a --- /dev/null +++ b/aws/logs_monitoring_go/internal/batching/batcher_test.go @@ -0,0 +1,92 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package batching + +import ( + "encoding/json" + "strings" + "testing" + + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" +) + +func TestBatch(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + entries []model.LogEntry + wantBatchCount int + wantEntryCounts []int + }{ + "empty": { + entries: nil, + wantBatchCount: 0, + }, + "single entry": { + entries: []model.LogEntry{model.NewLogEntry()}, + wantBatchCount: 1, + wantEntryCounts: []int{1}, + }, + "multiple entries, one batch": { + entries: []model.LogEntry{model.NewLogEntry(), model.NewLogEntry(), model.NewLogEntry()}, + wantBatchCount: 1, + wantEntryCounts: []int{3}, + }, + "drop oversized entry": { + entries: func() []model.LogEntry { + entry := model.NewLogEntry() + entry.Message = strings.Repeat("a", maxItemSize+1) + return []model.LogEntry{entry} + }(), + wantBatchCount: 0, + wantEntryCounts: nil, + }, + "split": { + entries: make([]model.LogEntry, maxItemsPerBatch+1), + wantBatchCount: 2, + wantEntryCounts: []int{maxItemsPerBatch, 1}, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + t.Parallel() + + in := make(chan model.LogEntry, len(tc.entries)) + out := make(chan []byte, len(tc.wantEntryCounts)) + for _, entry := range tc.entries { + in <- entry + } + close(in) + + batcher := NewBatcher() + err := batcher.Batch(t.Context(), in, out) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + close(out) + + var batches [][]byte + for b := range out { + batches = append(batches, b) + } + + if len(batches) != tc.wantBatchCount { + t.Fatalf("Batch() got %d batches, want %d", len(batches), tc.wantBatchCount) + } + + for i, wantCount := range tc.wantEntryCounts { + var entries []model.LogEntry + if err := json.Unmarshal(batches[i], &entries); err != nil { + t.Fatalf("Batch() failed to unmarshal batch %d: %v", i, err) + } + if len(entries) != wantCount { + t.Errorf("Batch() batch %d: got %d entries, want %d", i, len(entries), wantCount) + } + } + }) + } +} diff --git a/aws/logs_monitoring_go/internal/config/apikey.go b/aws/logs_monitoring_go/internal/config/apikey.go index 81e226b9d..7dbb25a4e 100644 --- a/aws/logs_monitoring_go/internal/config/apikey.go +++ b/aws/logs_monitoring_go/internal/config/apikey.go @@ -30,7 +30,7 @@ type apiKeyResolver struct { resolve func(ctx context.Context, value string) (string, error) } -func (c *Config) resolveAPIKey(ctx context.Context) error { +func (c *Config) ResolveAPIKey(ctx context.Context) error { resolvers := []apiKeyResolver{ {"DD_API_KEY_SECRET_ARN", c.resolveAPIKeyFromSecretsManager}, {"DD_API_KEY_SSM_NAME", c.resolveAPIKeyFromSSM}, @@ -55,7 +55,7 @@ func (c *Config) resolveAPIKey(ctx context.Context) error { return errors.New("no API key configured: set DD_API_KEY_SECRET_ARN, DD_API_KEY_SSM_NAME or DD_KMS_API_KEY. See: https://docs.datadoghq.com/serverless/forwarder/") } -func (c *Config) validateAPIKey(ctx context.Context) error { +func (c *Config) ValidateAPIKey(ctx context.Context) error { if c.APIKey == "" { return fmt.Errorf("set DD_API_KEY_SECRET_ARN, DD_API_KEY_SSM_NAME or DD_KMS_API_KEY. See: https://docs.datadoghq.com/serverless/forwarder/: %w", ErrMissingAPIKey) } diff --git a/aws/logs_monitoring_go/internal/config/config.go b/aws/logs_monitoring_go/internal/config/config.go index 3de062a76..c2f57701f 100644 --- a/aws/logs_monitoring_go/internal/config/config.go +++ b/aws/logs_monitoring_go/internal/config/config.go @@ -6,10 +6,13 @@ package config import ( - "context" + "errors" "fmt" - "log/slog" "regexp" + + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/filtering" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/scrubbing" ) const ForwarderVersion = "6.0" @@ -23,96 +26,65 @@ type Config struct { UseFIPS bool Source string Host string - CustomTags string - Scrubbing ScrubbingConfig - Filtering FilteringConfig + Tags model.Tags + Service string + Scrubber *scrubbing.Scrubber + Filter *filtering.Filter S3MultilineLogRegex *regexp.Regexp } -type ScrubbingConfig struct { - ScrubIP bool - ScrubEmail bool - CustomRule string - CustomReplacement string -} - -type FilteringConfig struct { - IncludePattern string - ExcludePattern string -} - -func Load(ctx context.Context) (*Config, error) { - initLogger(envOrDefault("DD_LOG_LEVEL", "INFO")) +func Load() (*Config, error) { + logLevel := envOrDefault("DD_LOG_LEVEL", "INFO") + initLogger(logLevel) logDroppedEnvVars() - cfg := loadConfig() - slog.Debug("config loaded", "config", cfg) + var cfg Config + cfg.LogLevel = logLevel + cfg.loadEnv() + cfg.extractFromEnv() - if err := cfg.resolveAPIKey(ctx); err != nil { - return nil, fmt.Errorf("resolving API key: %w", err) - } + err := cfg.compileS3MultilineLogRegex() + + scrubber, scrubbingErr := scrubbing.NewScrubber( + envOrDefault("DD_SCRUBBING_RULE", ""), + envOrDefault("DD_SCRUBBING_RULE_REPLACEMENT", ""), + envOrDefaultBool("REDACT_IP", false), + envOrDefaultBool("REDACT_EMAIL", false), + ) + err = errors.Join(err, scrubbingErr) - if err := cfg.validateAPIKey(ctx); err != nil { - return nil, fmt.Errorf("validating API key: %w", err) + filter, filteringErr := filtering.NewFilter( + envOrDefault("INCLUDE_AT_MATCH", ""), + envOrDefault("EXCLUDE_AT_MATCH", ""), + ) + err = errors.Join(err, filteringErr) + if err != nil { + return nil, err } - return cfg, nil + cfg.Scrubber = scrubber + cfg.Filter = filter + return &cfg, nil } -func loadConfig() *Config { - S3MultilineLogRegex := loadS3MultilineLogRegex() - site := envOrDefault("DD_SITE", "datadoghq.com") - - return &Config{ - Site: site, - IntakeURL: envOrDefault("DD_URL", "https://http-intake.logs."+site+"/api/v2/logs"), - APIURL: envOrDefault("DD_API_URL", "https://api."+site), - LogLevel: envOrDefault("DD_LOG_LEVEL", "INFO"), - UseFIPS: envOrDefaultBool("DD_USE_FIPS", false), - Source: envOrDefault("DD_SOURCE", ""), - Host: envOrDefault("DD_HOST", ""), - CustomTags: envOrDefault("DD_TAGS", ""), - Scrubbing: ScrubbingConfig{ - ScrubIP: envOrDefaultBool("REDACT_IP", false), - ScrubEmail: envOrDefaultBool("REDACT_EMAIL", false), - CustomRule: envOrDefault("DD_SCRUBBING_RULE", ""), - CustomReplacement: envOrDefault("DD_SCRUBBING_RULE_REPLACEMENT", ""), - }, - Filtering: FilteringConfig{ - IncludePattern: envOrDefault("INCLUDE_AT_MATCH", ""), - ExcludePattern: envOrDefault("EXCLUDE_AT_MATCH", ""), - }, - S3MultilineLogRegex: S3MultilineLogRegex, - } +func (c *Config) loadEnv() { + c.Site = envOrDefault("DD_SITE", "datadoghq.com") + c.IntakeURL = envOrDefault("DD_URL", "https://http-intake.logs."+c.Site+"/api/v2/logs") + c.APIURL = envOrDefault("DD_API_URL", "https://api."+c.Site) + c.UseFIPS = envOrDefaultBool("DD_USE_FIPS", false) + c.Source = envOrDefault("DD_SOURCE", "") + c.Host = envOrDefault("DD_HOST", "") } -func loadS3MultilineLogRegex() *regexp.Regexp { +func (c *Config) compileS3MultilineLogRegex() error { pattern := envOrDefault("DD_MULTILINE_LOG_REGEX_PATTERN", "") if pattern == "" { return nil } - re, err := regexp.Compile(pattern) if err != nil { - slog.Error("invalid multiline log pattern", slog.String("pattern", pattern), slog.Any("error", err)) - return nil + return fmt.Errorf("compile multiline log regex: %w", err) } - - return re -} - -func (c Config) LogValue() slog.Value { - return slog.GroupValue( - slog.String("site", c.Site), - slog.String("intakeUrl", c.IntakeURL), - slog.String("apiUrl", c.APIURL), - slog.String("loglevel", c.LogLevel), - slog.Bool("fips", c.UseFIPS), - slog.Bool("redactIP", c.Scrubbing.ScrubIP), - slog.Bool("redactEmail", c.Scrubbing.ScrubEmail), - slog.Bool("customScrubbing", c.Scrubbing.CustomRule != ""), - slog.Bool("includeFilter", c.Filtering.IncludePattern != ""), - slog.Bool("excludeFilter", c.Filtering.ExcludePattern != ""), - slog.Bool("multilineRegex", c.S3MultilineLogRegex != nil), - ) + c.S3MultilineLogRegex = re + return nil } diff --git a/aws/logs_monitoring_go/internal/config/config_test.go b/aws/logs_monitoring_go/internal/config/config_test.go index 7e21d6742..54dbed1a0 100644 --- a/aws/logs_monitoring_go/internal/config/config_test.go +++ b/aws/logs_monitoring_go/internal/config/config_test.go @@ -7,104 +7,62 @@ package config import ( "testing" - - "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" ) -func TestLoadConfig(t *testing.T) { +func TestLoad(t *testing.T) { tests := map[string]struct { - env map[string]string - want Config + env map[string]string + wantSite string + wantURL string + wantAPI string + wantSrc string + wantHost string + wantFIPS bool + wantRegex bool + wantErr bool }{ - "default": { - want: Config{ - Site: "datadoghq.com", - IntakeURL: "https://http-intake.logs.datadoghq.com/api/v2/logs", - APIURL: "https://api.datadoghq.com", - LogLevel: "INFO", - UseFIPS: false, - }, - }, - "eu_site": { - env: map[string]string{"DD_SITE": "datadoghq.eu"}, - want: Config{ - Site: "datadoghq.eu", - IntakeURL: "https://http-intake.logs.datadoghq.eu/api/v2/logs", - APIURL: "https://api.datadoghq.eu", - LogLevel: "INFO", - }, + "defaults": { + wantSite: "datadoghq.com", + wantURL: "https://http-intake.logs.datadoghq.com/api/v2/logs", + wantAPI: "https://api.datadoghq.com", }, - "custom_url": { - env: map[string]string{ - "DD_SITE": "datadoghq.com", - "DD_URL": "https://custom-intake.example.com", - }, - want: Config{ - Site: "datadoghq.com", - IntakeURL: "https://custom-intake.example.com", - APIURL: "https://api.datadoghq.com", - LogLevel: "INFO", - }, + "eu site": { + env: map[string]string{"DD_SITE": "datadoghq.eu"}, + wantSite: "datadoghq.eu", + wantURL: "https://http-intake.logs.datadoghq.eu/api/v2/logs", + wantAPI: "https://api.datadoghq.eu", }, - "custom_source_host_tags": { - env: map[string]string{ - "DD_SOURCE": "custom-source", - "DD_HOST": "my-host", - "DD_TAGS": "env:prod,team:aws", - }, - want: Config{ - Site: "datadoghq.com", - IntakeURL: "https://http-intake.logs.datadoghq.com/api/v2/logs", - APIURL: "https://api.datadoghq.com", - LogLevel: "INFO", - Source: "custom-source", - Host: "my-host", - CustomTags: "env:prod,team:aws", - }, + "custom url": { + env: map[string]string{"DD_URL": "https://custom.example.com"}, + wantSite: "datadoghq.com", + wantURL: "https://custom.example.com", + wantAPI: "https://api.datadoghq.com", }, - "scrubbing_ip_enabled": { - env: map[string]string{"REDACT_IP": "true"}, - want: Config{ - Site: "datadoghq.com", - IntakeURL: "https://http-intake.logs.datadoghq.com/api/v2/logs", - APIURL: "https://api.datadoghq.com", - LogLevel: "INFO", - Scrubbing: ScrubbingConfig{ScrubIP: true}, - }, + "source and host": { + env: map[string]string{"DD_SOURCE": "custom", "DD_HOST": "my-host"}, + wantSite: "datadoghq.com", + wantURL: "https://http-intake.logs.datadoghq.com/api/v2/logs", + wantAPI: "https://api.datadoghq.com", + wantSrc: "custom", + wantHost: "my-host", }, - "scrubbing_custom_rule": { - env: map[string]string{ - "DD_SCRUBBING_RULE": `\d+`, - "DD_SCRUBBING_RULE_REPLACEMENT": "X", - }, - want: Config{ - Site: "datadoghq.com", - IntakeURL: "https://http-intake.logs.datadoghq.com/api/v2/logs", - APIURL: "https://api.datadoghq.com", - LogLevel: "INFO", - Scrubbing: ScrubbingConfig{CustomRule: `\d+`, CustomReplacement: "X"}, - }, + "fips enabled": { + env: map[string]string{"DD_USE_FIPS": "true"}, + wantSite: "datadoghq.com", + wantURL: "https://http-intake.logs.datadoghq.com/api/v2/logs", + wantAPI: "https://api.datadoghq.com", + wantFIPS: true, }, - "filtering_include": { - env: map[string]string{"INCLUDE_AT_MATCH": `error|warn`}, - want: Config{ - Site: "datadoghq.com", - IntakeURL: "https://http-intake.logs.datadoghq.com/api/v2/logs", - APIURL: "https://api.datadoghq.com", - LogLevel: "INFO", - Filtering: FilteringConfig{IncludePattern: `error|warn`}, - }, + "valid multiline regex": { + env: map[string]string{"DD_MULTILINE_LOG_REGEX_PATTERN": `\d{4}-\d{2}-\d{2}`}, + wantSite: "datadoghq.com", + wantURL: "https://http-intake.logs.datadoghq.com/api/v2/logs", + wantAPI: "https://api.datadoghq.com", + wantRegex: true, }, - "filtering_exclude": { - env: map[string]string{"EXCLUDE_AT_MATCH": `DEBUG`}, - want: Config{ - Site: "datadoghq.com", - IntakeURL: "https://http-intake.logs.datadoghq.com/api/v2/logs", - APIURL: "https://api.datadoghq.com", - LogLevel: "INFO", - Filtering: FilteringConfig{ExcludePattern: `DEBUG`}, - }, + "invalid multiline regex": { + env: map[string]string{"DD_MULTILINE_LOG_REGEX_PATTERN": `[invalid`}, + wantErr: true, }, } @@ -113,49 +71,39 @@ func TestLoadConfig(t *testing.T) { for k, v := range tc.env { t.Setenv(k, v) } - got := loadConfig() - if diff := cmp.Diff(tc.want, *got, cmpopts.IgnoreFields(Config{}, "APIKey")); diff != "" { - t.Errorf("mismatch (-want +got):\n%s", diff) - } - }) - } -} - -func TestLoadS3MultilineLogRegex(t *testing.T) { - tests := map[string]struct { - env string - wantNil bool - }{ - "empty_pattern_returns_nil": { - env: "", - wantNil: true, - }, - "valid_pattern_returns_compiled_regex": { - env: `\d{4}-\d{2}-\d{2}`, - }, - "invalid_pattern_returns_nil": { - env: `[invalid`, - wantNil: true, - }, - } - - for name, tc := range tests { - t.Run(name, func(t *testing.T) { - if tc.env != "" { - t.Setenv("DD_MULTILINE_LOG_REGEX_PATTERN", tc.env) - } - got := loadS3MultilineLogRegex() + got, err := Load() - if tc.wantNil { - if got != nil { - t.Fatalf("want nil, got `%v", got) + if tc.wantErr { + if err == nil { + t.Fatal("want error, got nil") } return } + if err != nil { + t.Fatalf("unexpected error: %v", err) + } - if got == nil { - t.Fatal("want non-nil, got nil") + if got.Site != tc.wantSite { + t.Errorf("Site: got %q, want %q", got.Site, tc.wantSite) + } + if got.IntakeURL != tc.wantURL { + t.Errorf("IntakeURL: got %q, want %q", got.IntakeURL, tc.wantURL) + } + if got.APIURL != tc.wantAPI { + t.Errorf("APIURL: got %q, want %q", got.APIURL, tc.wantAPI) + } + if got.Source != tc.wantSrc { + t.Errorf("Source: got %q, want %q", got.Source, tc.wantSrc) + } + if got.Host != tc.wantHost { + t.Errorf("Host: got %q, want %q", got.Host, tc.wantHost) + } + if got.UseFIPS != tc.wantFIPS { + t.Errorf("UseFIPS: got %v, want %v", got.UseFIPS, tc.wantFIPS) + } + if (got.S3MultilineLogRegex != nil) != tc.wantRegex { + t.Errorf("S3MultilineLogRegex: got nil=%v, want nil=%v", got.S3MultilineLogRegex == nil, !tc.wantRegex) } }) } diff --git a/aws/logs_monitoring_go/internal/config/tags.go b/aws/logs_monitoring_go/internal/config/tags.go new file mode 100644 index 000000000..01a242b7b --- /dev/null +++ b/aws/logs_monitoring_go/internal/config/tags.go @@ -0,0 +1,50 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package config + +import ( + "os" + "strings" + + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" + "github.com/aws/aws-lambda-go/lambdacontext" +) + +const ( + DdtagsJSONKey = "ddtags" + ServiceKey = "service:" + TagSeparator = "," + sourceOverrideKey = "source_overridden:true" + forwarderNameKey = "forwardername:" + forwarderVersionKey = "forwarder_version:" +) + +func (c *Config) extractFromEnv() { + var tags model.Tags + + if customTags := os.Getenv("DD_TAGS"); customTags != "" { + for tag := range strings.SplitSeq(customTags, TagSeparator) { + v, found := strings.CutPrefix(tag, ServiceKey) + if found { + if c.Service == "" { + c.Service = v + } + continue + } + tags = append(tags, tag) + } + } + + if c.Source != "" { + tags = append(tags, sourceOverrideKey) + } + + tags = append(tags, + forwarderNameKey+strings.ToLower(lambdacontext.FunctionName), + forwarderVersionKey+ForwarderVersion, + ) + c.Tags = tags +} diff --git a/aws/logs_monitoring_go/internal/filtering/filter.go b/aws/logs_monitoring_go/internal/filtering/filter.go new file mode 100644 index 000000000..6e2d42a5d --- /dev/null +++ b/aws/logs_monitoring_go/internal/filtering/filter.go @@ -0,0 +1,52 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package filtering + +import ( + "errors" + "fmt" + "regexp" +) + +type Filter struct { + includeRegex *regexp.Regexp + excludeRegex *regexp.Regexp +} + +func NewFilter(includeMatch, excludeMatch string) (*Filter, error) { + includeRegex, includeErr := compilePattern(includeMatch) + excludeRegex, excludeErr := compilePattern(excludeMatch) + + if err := errors.Join(includeErr, excludeErr); err != nil { + return nil, err + } + return &Filter{includeRegex: includeRegex, excludeRegex: excludeRegex}, nil +} + +func compilePattern(pattern string) (*regexp.Regexp, error) { + if pattern == "" { + return nil, nil + } + + re, err := regexp.Compile(pattern) + if err != nil { + return nil, fmt.Errorf("compile '%s': %w", pattern, err) + } + return re, nil +} + +func (f *Filter) ShouldExclude(content string) bool { + if f == nil { + return false + } + if f.excludeRegex != nil && f.excludeRegex.MatchString(content) { + return true + } + if f.includeRegex != nil && !f.includeRegex.MatchString(content) { + return true + } + return false +} diff --git a/aws/logs_monitoring_go/internal/filtering/filter_test.go b/aws/logs_monitoring_go/internal/filtering/filter_test.go new file mode 100644 index 000000000..66e6c8ae9 --- /dev/null +++ b/aws/logs_monitoring_go/internal/filtering/filter_test.go @@ -0,0 +1,97 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package filtering + +import ( + "testing" +) + +func TestFilterShouldExclude(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + include string + exclude string + msg string + want bool + }{ + "no filter": { + msg: "hello", + want: false, + }, + "include match": { + include: `error`, + msg: "an error occurred", + want: false, + }, + "include no match": { + include: `error`, + msg: "hello", + want: true, + }, + "exclude match": { + exclude: `DEBUG`, + msg: "DEBUG message", + want: true, + }, + "exclude no match": { + exclude: `DEBUG`, + msg: "INFO message", + want: false, + }, + "exclude overrides include": { + include: `error`, + exclude: `error`, + msg: "error message", + want: true, + }, + "include match exclude not match": { + include: `error`, + exclude: `DEBUG`, + msg: "error happened", + want: false, + }, + "include and exclude neither match": { + include: `error`, + exclude: `DEBUG`, + msg: "INFO normal", + want: true, + }, + "partial match": { + include: `err`, + msg: "some error here", + want: false, + }, + "case sensitive": { + include: `ERROR`, + msg: "error", + want: true, + }, + "special chars": { + include: `\d{3}`, + msg: "error 404", + want: false, + }, + "empty message": { + include: `error`, + msg: "", + want: true, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + t.Parallel() + f, err := NewFilter(tc.include, tc.exclude) + if err != nil { + t.Fatalf("NewFilter: %v", err) + } + if got := f.ShouldExclude(tc.msg); got != tc.want { + t.Errorf("ShouldExclude(%q) = %v, want %v", tc.msg, got, tc.want) + } + }) + } +} diff --git a/aws/logs_monitoring_go/internal/forwarding/forwarding.go b/aws/logs_monitoring_go/internal/forwarding/forwarding.go index 70023024f..23a67732e 100644 --- a/aws/logs_monitoring_go/internal/forwarding/forwarding.go +++ b/aws/logs_monitoring_go/internal/forwarding/forwarding.go @@ -15,16 +15,14 @@ import ( "net/http" "time" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/batching" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/concurrent" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" "golang.org/x/sync/errgroup" ) -const ( - numWorkers = 3 - CloudwatchStorage = "cloudwatch" - S3Storage = "s3" -) +const numWorkers = 3 var Client *http.Client = &http.Client{Timeout: 10 * time.Second} @@ -34,21 +32,28 @@ type Forwarder struct { storage string } -func NewForwarder(config *config.Config, client *http.Client, storage string) Forwarder { +func NewForwarder(cfg *config.Config, client *http.Client, storage string) Forwarder { return Forwarder{ - config: config, + config: cfg, client: client, storage: storage, } } -func (f Forwarder) Forward(ctx context.Context, in <-chan []byte) error { - g, ctx := errgroup.WithContext(ctx) +func (f Forwarder) Start(ctx context.Context, in <-chan model.LogEntry) error { + eg, ctx := errgroup.WithContext(ctx) + + batches := make(chan []byte) + batcher := batching.NewBatcher() + eg.Go(func() error { + defer close(batches) + return batcher.Batch(ctx, in, batches) + }) for range numWorkers { - g.Go(func() error { + eg.Go(func() error { for { - body, ok, err := concurrent.SafeReader(ctx, in) + body, ok, err := concurrent.SafeReader(ctx, batches) if err != nil { return err } @@ -63,7 +68,7 @@ func (f Forwarder) Forward(ctx context.Context, in <-chan []byte) error { }) } - return g.Wait() + return eg.Wait() } // TODO: add retry mechanism for resiliency @@ -87,7 +92,9 @@ func (f Forwarder) send(ctx context.Context, body []byte) error { req.Header.Set("Content-Encoding", "gzip") req.Header.Set("DD-EVP-ORIGIN", "aws_forwarder") req.Header.Set("DD-EVP-ORIGIN-VERSION", config.ForwarderVersion) - req.Header.Set("DD-STORAGE-TAG", f.storage) + if f.storage != "" { + req.Header.Set("DD-STORAGE-TAG", f.storage) + } resp, err := f.client.Do(req) if err != nil { diff --git a/aws/logs_monitoring_go/internal/forwarding/forwarding_test.go b/aws/logs_monitoring_go/internal/forwarding/forwarding_test.go index 7dbe83cb3..3217e71d6 100644 --- a/aws/logs_monitoring_go/internal/forwarding/forwarding_test.go +++ b/aws/logs_monitoring_go/internal/forwarding/forwarding_test.go @@ -16,66 +16,61 @@ import ( "testing" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" ) -func TestForward(t *testing.T) { +func TestForwarder_Start(t *testing.T) { t.Parallel() tests := map[string]struct { statusCode int storage string - payloads [][]byte + entries []model.LogEntry cancelCtx bool wantErr bool wantErrMsg string wantCalls int }{ - "single_message_accepted": { + "single message accepted": { statusCode: http.StatusAccepted, - storage: CloudwatchStorage, - payloads: [][]byte{[]byte("test payload")}, + storage: cloudwatchStorage, + entries: []model.LogEntry{{Message: "test payload"}}, wantCalls: 1, }, - "multiple_messages_accepted": { + "empty channel": { statusCode: http.StatusAccepted, - storage: CloudwatchStorage, - payloads: [][]byte{[]byte("first"), []byte("second"), []byte("third")}, - wantCalls: 3, - }, - "empty_channel": { - statusCode: http.StatusAccepted, - storage: CloudwatchStorage, - payloads: [][]byte{}, + storage: cloudwatchStorage, + entries: []model.LogEntry{}, wantCalls: 0, }, - "server_returns_400": { + "server returns 400": { statusCode: http.StatusBadRequest, - storage: CloudwatchStorage, - payloads: [][]byte{[]byte("test payload")}, + storage: cloudwatchStorage, + entries: []model.LogEntry{{Message: "test payload"}}, wantErr: true, wantErrMsg: "unexpected status from intake", wantCalls: 1, }, - "server_returns_500": { + "server returns 500": { statusCode: http.StatusInternalServerError, - storage: CloudwatchStorage, - payloads: [][]byte{[]byte("test payload")}, + storage: cloudwatchStorage, + entries: []model.LogEntry{{Message: "test payload"}}, wantErr: true, wantErrMsg: "unexpected status from intake", wantCalls: 1, }, - "context_cancelled": { + "context cancelled": { statusCode: http.StatusAccepted, - storage: CloudwatchStorage, - payloads: [][]byte{[]byte("test payload")}, + storage: cloudwatchStorage, + entries: []model.LogEntry{{Message: "test payload"}}, cancelCtx: true, wantErr: true, wantCalls: 0, }, - "s3_storage": { + "s3 storage": { statusCode: http.StatusAccepted, - storage: S3Storage, - payloads: [][]byte{[]byte("test payload")}, + storage: s3Storage, + entries: []model.LogEntry{{Message: "test payload"}}, wantCalls: 1, }, } @@ -136,13 +131,13 @@ func TestForward(t *testing.T) { cancel() } - in := make(chan []byte, len(tc.payloads)) - for _, p := range tc.payloads { - in <- p + in := make(chan model.LogEntry, len(tc.entries)) + for _, e := range tc.entries { + in <- e } close(in) - err := f.Forward(ctx, in) + err := f.Start(ctx, in) if tc.wantErr { if err == nil { diff --git a/aws/logs_monitoring_go/internal/forwarding/storage.go b/aws/logs_monitoring_go/internal/forwarding/storage.go new file mode 100644 index 000000000..292ac75e9 --- /dev/null +++ b/aws/logs_monitoring_go/internal/forwarding/storage.go @@ -0,0 +1,24 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package forwarding + +import "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/parsing" + +const ( + cloudwatchStorage = "cloudwatch" + s3Storage = "s3" +) + +func Storage(source parsing.InvocationSource) string { + switch source { + case parsing.InvocationSourceS3: + return s3Storage + case parsing.InvocationSourceCloudwatchLogs, parsing.InvocationSourceKinesis: + return cloudwatchStorage + default: + return "" + } +} diff --git a/aws/logs_monitoring_go/internal/handling/cloudwatch.go b/aws/logs_monitoring_go/internal/handling/cloudwatch.go new file mode 100644 index 000000000..d96804ed0 --- /dev/null +++ b/aws/logs_monitoring_go/internal/handling/cloudwatch.go @@ -0,0 +1,163 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package handling + +import ( + "bytes" + "cmp" + "compress/gzip" + "context" + "encoding/base64" + "encoding/json" + "fmt" + "log/slog" + "slices" + "strings" + + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/concurrent" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" + "github.com/aws/aws-lambda-go/events" +) + +const ( + logGroupCloudtrail = "_cloudtrail_" + logGroupKinesis = "/aws/kinesis" + logGroupLambda = "/aws/lambda" + logGroupSNS = "sns/" + logStreamStepFunction = "states/" + logStreamCloudtrail = "_CloudTrail_" +) + +type CloudwatchHandler struct { + cfg *config.Config +} + +func NewCloudwatch(cfg *config.Config) *CloudwatchHandler { + return &CloudwatchHandler{ + cfg: cfg, + } +} + +func (h CloudwatchHandler) Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error { + var cwEvent events.CloudwatchLogsEvent + if err := json.Unmarshal(event, &cwEvent); err != nil { + return fmt.Errorf("unmarshal: %w", err) + } + + cwData, err := decodeCloudwatchLogs(cwEvent.AWSLogs.Data) + if err != nil { + return fmt.Errorf("parse: %w", err) + } + + return h.handleCloudwatchData(ctx, cwData, out) +} + +func decodeCloudwatchLogs(data string) (events.CloudwatchLogsData, error) { + raw, err := base64.StdEncoding.DecodeString(data) + if err != nil { + return events.CloudwatchLogsData{}, fmt.Errorf("base64 decode: %w", err) + } + return decompressCloudwatchLogs(raw) +} + +func decompressCloudwatchLogs(data []byte) (events.CloudwatchLogsData, error) { + zr, err := gzip.NewReader(bytes.NewBuffer(data)) + if err != nil { + return events.CloudwatchLogsData{}, fmt.Errorf("gzip: %w", err) + } + defer func() { + if err := zr.Close(); err != nil { + slog.Warn("failed to close gzip reader", slog.Any("error", err)) + } + }() + + var cwData events.CloudwatchLogsData + if err := json.NewDecoder(zr).Decode(&cwData); err != nil { + return events.CloudwatchLogsData{}, fmt.Errorf("json decode: %w", err) + } + return cwData, nil +} + +func (h CloudwatchHandler) handleCloudwatchData(ctx context.Context, cwData events.CloudwatchLogsData, out chan<- model.LogEntry) error { + if cwData.MessageType == "CONTROL_MESSAGE" { + return nil + } + + lambdaOrigin, err := model.GetLambdaOrigin(ctx) + if err != nil { + return err + } + + base := h.newCloudwatchBaseEntry(cwData, lambdaOrigin) + for _, event := range cwData.LogEvents { + entry := h.newCloudwatchLogEntry(event, base) + if h.cfg.Filter.ShouldExclude(entry.Message) { + continue + } + + entry.Message = h.cfg.Scrubber.Scrub(entry.Message) + if err := concurrent.SafeSender(ctx, out, entry); err != nil { + return err + } + } + + return nil +} + +func (h CloudwatchHandler) newCloudwatchBaseEntry(data events.CloudwatchLogsData, lambdaOrigin model.LambdaOrigin) model.LogEntry { + logGroup := data.LogGroup + logStream := data.LogStream + metadata := model.CloudwatchMetadata{ + LambdaOrigin: lambdaOrigin, + Origin: model.CloudwatchOrigin{ + LogGroup: logGroup, + LogStream: logStream, + Owner: data.Owner, + }, + } + + entry := model.NewLogEntry() + entry.Source = cmp.Or(h.cfg.Source, CloudwatchSource(strings.ToLower(logGroup), logStream)) + entry.Host = cmp.Or(h.cfg.Host, logGroup) + entry.Metadata = metadata + return entry +} + +func (h CloudwatchHandler) newCloudwatchLogEntry(event events.CloudwatchLogsLogEvent, entry model.LogEntry) model.LogEntry { + tags, service, message := extractFromMessage(event.Message) + entry.Service = cmp.Or(h.cfg.Service, service, entry.Source) + entry.Tags = slices.Concat(tags, model.Tags{"service:" + entry.Service}, h.cfg.Tags) + entry.Message = message + entry.ID = event.ID + entry.Timestamp = event.Timestamp + return entry +} + +func CloudwatchSource(logGroup, logStream string) string { + if strings.HasPrefix(logStream, logStreamStepFunction) { + return sourceStepFunction + } + if strings.Contains(logStream, logStreamCloudtrail) { + return sourceCloudtrail + } + if strings.HasPrefix(logGroup, logGroupCloudtrail) { + return sourceCloudtrail + } + if strings.HasPrefix(logGroup, logGroupKinesis) { + return sourceKinesis + } + if strings.HasPrefix(logGroup, logGroupLambda) { + return sourceLambda + } + if strings.HasPrefix(logGroup, logGroupSNS) { + return sourceSNS + } + if strings.Contains(logGroup, sourceCloudtrail) { + return sourceCloudtrail + } + return sourceCloudwatch +} diff --git a/aws/logs_monitoring_go/internal/handling/cloudwatch_test.go b/aws/logs_monitoring_go/internal/handling/cloudwatch_test.go new file mode 100644 index 000000000..2480ce177 --- /dev/null +++ b/aws/logs_monitoring_go/internal/handling/cloudwatch_test.go @@ -0,0 +1,211 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package handling + +import ( + "encoding/json" + "testing" + + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/testutil" + "github.com/google/go-cmp/cmp" +) + +func TestCloudwatchHandler_Handle(t *testing.T) { + t.Parallel() + + ctx := testutil.LambdaContext(t) + + tests := map[string]struct { + event json.RawMessage + config *config.Config + chanSize int + want []model.LogEntry + wantErr bool + }{ + "invalid JSON": { + event: json.RawMessage(`not json`), + config: testutil.EmptyConfig(), + wantErr: true, + }, + "invalid base64": { + event: json.RawMessage(`{"awslogs":{"data":"!!!not-base64!!!"}}`), + config: testutil.EmptyConfig(), + wantErr: true, + }, + "invalid gzip": { + event: testutil.MustCloudwatchEvent(t, []byte("not gzip")), + config: testutil.EmptyConfig(), + wantErr: true, + }, + "control message": { + event: testutil.MustCloudwatchEvent(t, testutil.MustGzipJSON(t, map[string]any{ + "messageType": "CONTROL_MESSAGE", + "logGroup": "/aws/lambda/test", + "logStream": "stream", + "logEvents": []map[string]any{}, + })), + config: testutil.EmptyConfig(), + want: nil, + }, + "single log": { + event: testutil.MustCloudwatchEvent(t, testutil.MustGzipJSON(t, map[string]any{ + "messageType": "DATA_MESSAGE", + "owner": "601427279990", + "logGroup": "/aws/lambda/testing-datadog", + "logStream": "2024/10/10/[$LATEST]20bddfd5a2dc4c6b97ac02800eae90d0", + "logEvents": []map[string]any{ + {"id": "ev1", "timestamp": 1583425836114, "message": "hello"}, + }, + })), + config: testutil.EmptyConfig(), + chanSize: 1, + want: []model.LogEntry{ + { + Message: "hello", + Source: "lambda", + SourceCategory: "aws", + Service: "lambda", + Tags: model.Tags{"service:lambda"}, + Host: "/aws/lambda/testing-datadog", + ID: "ev1", + Timestamp: 1583425836114, + Metadata: model.CloudwatchMetadata{ + LambdaOrigin: model.LambdaOrigin{ARN: "arn:aws:lambda:us-east-1:123456789012:function:forwarder"}, + Origin: model.CloudwatchOrigin{ + LogGroup: "/aws/lambda/testing-datadog", + LogStream: "2024/10/10/[$LATEST]20bddfd5a2dc4c6b97ac02800eae90d0", + Owner: "601427279990", + }, + }, + }, + }, + }, + "multiple log events": { + event: testutil.MustCloudwatchEvent(t, testutil.MustGzipJSON(t, map[string]any{ + "messageType": "DATA_MESSAGE", + "owner": "111111111111", + "logGroup": "/aws/lambda/fn", + "logStream": "stream", + "logEvents": []map[string]any{ + {"id": "a1", "timestamp": 1000, "message": "first"}, + {"id": "a2", "timestamp": 2000, "message": "second"}, + }, + })), + config: testutil.EmptyConfig(), + chanSize: 2, + want: []model.LogEntry{ + { + Message: "first", Source: "lambda", SourceCategory: "aws", + Service: "lambda", Tags: model.Tags{"service:lambda"}, + Host: "/aws/lambda/fn", ID: "a1", Timestamp: 1000, + Metadata: model.CloudwatchMetadata{ + LambdaOrigin: model.LambdaOrigin{ARN: "arn:aws:lambda:us-east-1:123456789012:function:forwarder"}, + Origin: model.CloudwatchOrigin{LogGroup: "/aws/lambda/fn", LogStream: "stream", Owner: "111111111111"}, + }, + }, + { + Message: "second", Source: "lambda", SourceCategory: "aws", + Service: "lambda", Tags: model.Tags{"service:lambda"}, + Host: "/aws/lambda/fn", ID: "a2", Timestamp: 2000, + Metadata: model.CloudwatchMetadata{ + LambdaOrigin: model.LambdaOrigin{ARN: "arn:aws:lambda:us-east-1:123456789012:function:forwarder"}, + Origin: model.CloudwatchOrigin{LogGroup: "/aws/lambda/fn", LogStream: "stream", Owner: "111111111111"}, + }, + }, + }, + }, + "config overrides source, host, service and tags": { + event: testutil.MustCloudwatchEvent(t, testutil.MustGzipJSON(t, map[string]any{ + "messageType": "DATA_MESSAGE", + "owner": "111111111111", + "logGroup": "/aws/lambda/fn", + "logStream": "stream", + "logEvents": []map[string]any{ + {"id": "ev1", "timestamp": 1000, "message": "hello"}, + }, + })), + config: &config.Config{ + Source: "custom-source", + Host: "custom-host", + Service: "custom-service", + Tags: model.Tags{"env:prod", "team:infra"}, + }, + chanSize: 1, + want: []model.LogEntry{ + { + Message: "hello", Source: "custom-source", SourceCategory: "aws", + Service: "custom-service", + Tags: model.Tags{"service:custom-service", "env:prod", "team:infra"}, + Host: "custom-host", ID: "ev1", Timestamp: 1000, + Metadata: model.CloudwatchMetadata{ + LambdaOrigin: model.LambdaOrigin{ARN: "arn:aws:lambda:us-east-1:123456789012:function:forwarder"}, + Origin: model.CloudwatchOrigin{LogGroup: "/aws/lambda/fn", LogStream: "stream", Owner: "111111111111"}, + }, + }, + }, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + t.Parallel() + + out := make(chan model.LogEntry, tc.chanSize) + handler := NewCloudwatch(tc.config) + + err := handler.Handle(ctx, tc.event, out) + close(out) + + var got []model.LogEntry + for entry := range out { + got = append(got, entry) + } + + if tc.wantErr { + if err == nil { + t.Fatal("want error, got nil") + } + return + } + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if diff := cmp.Diff(tc.want, got); diff != "" { + t.Errorf("mismatch (-want +got):\n%s", diff) + } + }) + } +} + +func TestCloudwatchSource(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + logGroup string + logStream string + want string + }{ + "step function": {logGroup: "/aws/vendedlogs", logStream: "states/my-machine/abc", want: "stepfunction"}, + "cloudtrail via log stream": {logGroup: "/aws/something", logStream: "123_CloudTrail_us-east-1", want: "cloudtrail"}, + "cloudtrail via log group": {logGroup: "_cloudtrail_logs", logStream: "stream", want: "cloudtrail"}, + "cloudtrail via contains": {logGroup: "my-cloudtrail-group", logStream: "stream", want: "cloudtrail"}, + "kinesis": {logGroup: "/aws/kinesis/my-stream", logStream: "stream", want: "kinesis"}, + "lambda": {logGroup: "/aws/lambda/my-function", logStream: "stream", want: "lambda"}, + "sns": {logGroup: "sns/us-east-1/123/topic", logStream: "stream", want: "sns"}, + "fallback cloudwatch": {logGroup: "/aws/rds/cluster", logStream: "stream", want: "cloudwatch"}, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + t.Parallel() + if got := CloudwatchSource(tc.logGroup, tc.logStream); got != tc.want { + t.Errorf("CloudwatchSource(%q, %q) = %q, want %q", tc.logGroup, tc.logStream, got, tc.want) + } + }) + } +} diff --git a/aws/logs_monitoring_go/internal/handling/handler.go b/aws/logs_monitoring_go/internal/handling/handler.go new file mode 100644 index 000000000..6edab85a3 --- /dev/null +++ b/aws/logs_monitoring_go/internal/handling/handler.go @@ -0,0 +1,24 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package handling + +import ( + "context" + "encoding/json" + + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/parsing" +) + +var Handlers = make(map[parsing.InvocationSource]Handler) + +type Handler interface { + Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error +} + +func Register(invocation parsing.InvocationSource, handler Handler) { + Handlers[invocation] = handler +} diff --git a/aws/logs_monitoring_go/internal/parsing/kinesis.go b/aws/logs_monitoring_go/internal/handling/kinesis.go similarity index 61% rename from aws/logs_monitoring_go/internal/parsing/kinesis.go rename to aws/logs_monitoring_go/internal/handling/kinesis.go index 4e4781468..a1caf3426 100644 --- a/aws/logs_monitoring_go/internal/parsing/kinesis.go +++ b/aws/logs_monitoring_go/internal/handling/kinesis.go @@ -3,11 +3,10 @@ // This product includes software developed at Datadog (https://www.datadoghq.com/). // Copyright 2026-Present Datadog, Inc. -package parsing +package handling import ( "context" - "encoding/base64" "encoding/json" "fmt" "log/slog" @@ -17,25 +16,31 @@ import ( "github.com/aws/aws-lambda-go/events" ) -func HandleKinesis(ctx context.Context, event json.RawMessage, cfg *config.Config, out chan<- model.CloudwatchLogEntry) error { +type KinesisHandler struct { + cfg *config.Config +} + +func NewKinesis(cfg *config.Config) *KinesisHandler { + return &KinesisHandler{ + cfg: cfg, + } +} + +func (h KinesisHandler) Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error { var kinesisEvent events.KinesisEvent if err := json.Unmarshal(event, &kinesisEvent); err != nil { return fmt.Errorf("unmarshal: %w", err) } + cw := CloudwatchHandler(h) for i, record := range kinesisEvent.Records { - cwEvent := events.CloudwatchLogsEvent{ - AWSLogs: events.CloudwatchLogsRawData{ - Data: base64.StdEncoding.EncodeToString(record.Kinesis.Data), - }, - } - - cwRaw, err := json.Marshal(cwEvent) + cwData, err := decompressCloudwatchLogs(record.Kinesis.Data) if err != nil { - return fmt.Errorf("marshal: %w", err) + slog.WarnContext(ctx, "skipping kinesis record", slog.Int("i", i), slog.Any("error", err)) + continue } - if err := HandleCloudwatch(ctx, cwRaw, cfg, out); err != nil { + if err := cw.handleCloudwatchData(ctx, cwData, out); err != nil { slog.WarnContext(ctx, "skipping kinesis record", slog.Int("i", i), slog.Any("error", err)) continue } diff --git a/aws/logs_monitoring_go/internal/handling/kinesis_test.go b/aws/logs_monitoring_go/internal/handling/kinesis_test.go new file mode 100644 index 000000000..f4f88a0b2 --- /dev/null +++ b/aws/logs_monitoring_go/internal/handling/kinesis_test.go @@ -0,0 +1,103 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package handling + +import ( + "encoding/json" + "testing" + + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/testutil" +) + +func TestKinesisHandler_Handle(t *testing.T) { + t.Parallel() + + ctx := testutil.LambdaContext(t) + + tests := map[string]struct { + event json.RawMessage + wantN int + wantErr bool + }{ + "invalid JSON": { + event: json.RawMessage(`not json`), + wantErr: true, + }, + "no records": { + event: json.RawMessage(`{"Records":[]}`), + wantN: 0, + }, + "single record": { + event: testutil.MustKinesisEvent(t, testutil.MustGzipJSON(t, map[string]any{ + "messageType": "DATA_MESSAGE", + "logGroup": "/aws/lambda/fn", + "logStream": "stream", + "logEvents": []map[string]any{{"id": "1", "timestamp": 1000, "message": "hello"}}, + })), + wantN: 1, + }, + "multiple records": { + event: testutil.MustKinesisEvent(t, + testutil.MustGzipJSON(t, map[string]any{ + "messageType": "DATA_MESSAGE", + "logGroup": "/aws/lambda/fn-a", + "logStream": "stream", + "logEvents": []map[string]any{{"id": "1", "timestamp": 1000, "message": "a"}}, + }), + testutil.MustGzipJSON(t, map[string]any{ + "messageType": "DATA_MESSAGE", + "logGroup": "/aws/lambda/fn-b", + "logStream": "stream", + "logEvents": []map[string]any{{"id": "2", "timestamp": 2000, "message": "b"}}, + }), + ), + wantN: 2, + }, + "bad record is skipped": { + event: testutil.MustKinesisEvent(t, + []byte("not valid gzip"), + testutil.MustGzipJSON(t, map[string]any{ + "messageType": "DATA_MESSAGE", + "logGroup": "/aws/lambda/good", + "logStream": "stream", + "logEvents": []map[string]any{{"id": "1", "timestamp": 1000, "message": "ok"}}, + }), + ), + wantN: 1, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + t.Parallel() + + out := make(chan model.LogEntry, tc.wantN) + handler := NewKinesis(testutil.EmptyConfig()) + + err := handler.Handle(ctx, tc.event, out) + close(out) + + if tc.wantErr { + if err == nil { + t.Fatal("want error, got nil") + } + return + } + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + var got int + for range out { + got++ + } + if got != tc.wantN { + t.Errorf("got %d entries, want %d", got, tc.wantN) + } + }) + } +} diff --git a/aws/logs_monitoring_go/internal/parsing/tags.go b/aws/logs_monitoring_go/internal/handling/message.go similarity index 52% rename from aws/logs_monitoring_go/internal/parsing/tags.go rename to aws/logs_monitoring_go/internal/handling/message.go index 470b13abf..1564da207 100644 --- a/aws/logs_monitoring_go/internal/parsing/tags.go +++ b/aws/logs_monitoring_go/internal/handling/message.go @@ -3,7 +3,7 @@ // This product includes software developed at Datadog (https://www.datadoghq.com/). // Copyright 2026-Present Datadog, Inc. -package parsing +package handling import ( "encoding/json" @@ -11,42 +11,8 @@ import ( "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" - "github.com/aws/aws-lambda-go/lambdacontext" ) -const ( - ddtagsJSONKey = "ddtags" - forwarderNameKey = "forwardername:" - forwarderVersionKey = "forwarder_version:" - tagSeparator = "," - serviceKey = "service:" - sourceOverrideKey = "source_overridden:" -) - -func getTagsAndService(cfg *config.Config) (tags model.Tags, service string) { - if cfg.CustomTags != "" { - for tag := range strings.SplitSeq(cfg.CustomTags, tagSeparator) { - v, found := strings.CutPrefix(tag, serviceKey) - if found { - if service == "" { - service = v - } - continue - } - tags = append(tags, tag) - } - } - - if cfg.Source != "" { - tags = append(tags, sourceOverrideKey+"true") - } - tags = append(tags, - forwarderNameKey+strings.ToLower(lambdacontext.FunctionName), - forwarderVersionKey+config.ForwarderVersion, - ) - return -} - func extractFromMessage(message string) (model.Tags, string, string) { var tags model.Tags var service string @@ -55,7 +21,7 @@ func extractFromMessage(message string) (model.Tags, string, string) { return nil, service, message } - ddtagsRaw, ok := jsonMessage[ddtagsJSONKey] + ddtagsRaw, ok := jsonMessage[config.DdtagsJSONKey] if !ok { return nil, service, message } @@ -66,8 +32,11 @@ func extractFromMessage(message string) (model.Tags, string, string) { } ddtagsStr = strings.ReplaceAll(ddtagsStr, " ", "") - for tag := range strings.SplitSeq(ddtagsStr, tagSeparator) { - v, found := strings.CutPrefix(tag, serviceKey) + for tag := range strings.SplitSeq(ddtagsStr, config.TagSeparator) { + if tag == "" { + continue + } + v, found := strings.CutPrefix(tag, config.ServiceKey) if found { if service == "" { service = v @@ -77,7 +46,7 @@ func extractFromMessage(message string) (model.Tags, string, string) { tags = append(tags, tag) } - delete(jsonMessage, ddtagsJSONKey) + delete(jsonMessage, config.DdtagsJSONKey) newMessage, err := json.Marshal(jsonMessage) if err != nil { diff --git a/aws/logs_monitoring_go/internal/parsing/tags_test.go b/aws/logs_monitoring_go/internal/handling/message_test.go similarity index 88% rename from aws/logs_monitoring_go/internal/parsing/tags_test.go rename to aws/logs_monitoring_go/internal/handling/message_test.go index da951acfb..7d3deb936 100644 --- a/aws/logs_monitoring_go/internal/parsing/tags_test.go +++ b/aws/logs_monitoring_go/internal/handling/message_test.go @@ -3,7 +3,7 @@ // This product includes software developed at Datadog (https://www.datadoghq.com/). // Copyright 2026-Present Datadog, Inc. -package parsing +package handling import ( "encoding/json" @@ -11,6 +11,7 @@ import ( "strings" "testing" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" ) @@ -91,7 +92,7 @@ func TestExtractFromMessage(t *testing.T) { }, "ddtags is empty string": { message: `{"msg":"hello","ddtags":""}`, - wantTags: model.Tags{""}, + wantTags: nil, wantService: "", wantMessage: `{"msg":"hello"}`, }, @@ -110,13 +111,13 @@ func TestExtractFromMessage(t *testing.T) { gotTags, gotService, gotMessage := extractFromMessage(tc.message) if !slices.Equal(gotTags, tc.wantTags) { - t.Errorf("tags: got %v, want %v", gotTags, tc.wantTags) + t.Errorf("extractFromMessage(%q) tags: got %v, want %v", tc.message, gotTags, tc.wantTags) } if gotService != tc.wantService { - t.Errorf("service: got %q, want %q", gotService, tc.wantService) + t.Errorf("extractFromMessage(%q) service: got %q, want %q", tc.message, gotService, tc.wantService) } if gotMessage != tc.wantMessage { - t.Errorf("message: got %q, want %q", gotMessage, tc.wantMessage) + t.Errorf("extractFromMessage(%q) message: got %q, want %q", tc.message, gotMessage, tc.wantMessage) } }) } @@ -150,8 +151,8 @@ func FuzzExtractFromMessage(f *testing.F) { if err := json.Unmarshal([]byte(outMessage), &parsed); err != nil { t.Errorf("output message is not valid JSON: %v", err) } - if _, ok := parsed[ddtagsJSONKey]; ok { - t.Errorf("output message still contains %q key", ddtagsJSONKey) + if _, ok := parsed[config.DdtagsJSONKey]; ok { + t.Errorf("output message still contains %q key", config.DdtagsJSONKey) } } diff --git a/aws/logs_monitoring_go/internal/parsing/s3.go b/aws/logs_monitoring_go/internal/handling/s3.go similarity index 51% rename from aws/logs_monitoring_go/internal/parsing/s3.go rename to aws/logs_monitoring_go/internal/handling/s3.go index af07fb21a..9e0b91bd3 100644 --- a/aws/logs_monitoring_go/internal/parsing/s3.go +++ b/aws/logs_monitoring_go/internal/handling/s3.go @@ -3,7 +3,7 @@ // This product includes software developed at Datadog (https://www.datadoghq.com/). // Copyright 2026-Present Datadog, Inc. -package parsing +package handling import ( "cmp" @@ -11,7 +11,6 @@ import ( "encoding/json" "fmt" "log/slog" - "regexp" "slices" "strings" @@ -28,21 +27,23 @@ const ( s3KeyCloudtrail = "_CloudTrail_" ) -type s3EntryBase struct { - metadata model.S3Metadata - source string - service string - tags model.Tags - multilineRegex *regexp.Regexp +type S3Handler struct { + cfg *config.Config } -func HandleS3(ctx context.Context, event json.RawMessage, cfg *config.Config, out chan<- model.LogEntry) error { +func NewS3(cfg *config.Config) *S3Handler { + return &S3Handler{ + cfg: cfg, + } +} + +func (h S3Handler) Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error { var s3Event events.S3Event if err := json.Unmarshal(event, &s3Event); err != nil { return fmt.Errorf("unmarshal: %w", err) } - client, err := getS3APIClient(ctx, cfg.UseFIPS) + client, err := getS3APIClient(ctx, h.cfg.UseFIPS) if err != nil { return fmt.Errorf("get S3 client: %w", err) } @@ -53,16 +54,18 @@ func HandleS3(ctx context.Context, event json.RawMessage, cfg *config.Config, ou } for _, record := range s3Event.Records { - base := newS3EntryBase(record, cfg, lambdaOrigin) - if err := processS3Record(ctx, client, out, base); err != nil { - return fmt.Errorf("process s3://%s/%s: %w", base.metadata.Origin.Bucket, base.metadata.Origin.Key, err) + if err := h.processRecord(ctx, client, out, record, lambdaOrigin); err != nil { + return err } } return nil } -func processS3Record(ctx context.Context, client S3APIClient, out chan<- model.LogEntry, base s3EntryBase) error { - body, err := getS3Object(ctx, client, base.metadata.Origin.Bucket, base.metadata.Origin.Key) +func (h S3Handler) processRecord(ctx context.Context, client S3APIClient, out chan<- model.LogEntry, record events.S3EventRecord, lambdaOrigin model.LambdaOrigin) error { + bucket := record.S3.Bucket.Name + key := record.S3.Object.URLDecodedKey + + body, err := getS3Object(ctx, client, bucket, key) if err != nil { return err } @@ -72,10 +75,16 @@ func processS3Record(ctx context.Context, client S3APIClient, out chan<- model.L } }() - scanner := NewScanner(body, base.multilineRegex) + scanner := NewScanner(body, h.cfg.S3MultilineLogRegex) for scanner.Scan() { message := strings.ToValidUTF8(scanner.Text(), "") - if err := concurrent.SafeSender(ctx, out, newS3LogEntry(base, message)); err != nil { + entry := h.newS3LogEntry(record, message, lambdaOrigin) + if h.cfg.Filter.ShouldExclude(entry.Message) { + continue + } + + entry.Message = h.cfg.Scrubber.Scrub(entry.Message) + if err := concurrent.SafeSender(ctx, out, entry); err != nil { return err } } @@ -86,36 +95,27 @@ func processS3Record(ctx context.Context, client S3APIClient, out chan<- model.L return nil } -func newS3EntryBase(record events.S3EventRecord, cfg *config.Config, lambdaOrigin model.LambdaOrigin) s3EntryBase { - bucket := record.S3.Bucket.Name +func (h S3Handler) newS3LogEntry(record events.S3EventRecord, message string, lambdaOrigin model.LambdaOrigin) model.LogEntry { key := record.S3.Object.URLDecodedKey - source := cmp.Or(cfg.Source, getS3Source(key)) - tags, service := getTagsAndService(cfg) - service = cmp.Or(service, source) - - return s3EntryBase{ - metadata: model.S3Metadata{ - LambdaOrigin: lambdaOrigin, - Origin: model.S3Origin{ - Bucket: bucket, - Key: key, - }, + metadata := model.S3Metadata{ + LambdaOrigin: lambdaOrigin, + Origin: model.S3Origin{ + Bucket: record.S3.Bucket.Name, + Key: key, }, - source: source, - service: service, - tags: tags, - multilineRegex: cfg.S3MultilineLogRegex, } -} - -func newS3LogEntry(base s3EntryBase, message string) model.LogEntry { tags, service, message := extractFromMessage(message) - service = cmp.Or(service, base.service) - tags = slices.Concat(tags, model.Tags{"service:" + service}, base.tags) - return model.NewLogEntry(base.metadata, tags, message, base.source, service) + + entry := model.NewLogEntry() + entry.Message = message + entry.Metadata = metadata + entry.Source = cmp.Or(h.cfg.Source, S3Source(key)) + entry.Service = cmp.Or(h.cfg.Service, service, entry.Source) + entry.Tags = slices.Concat(tags, model.Tags{"service:" + entry.Service}, h.cfg.Tags) + return entry } -func getS3Source(key string) string { +func S3Source(key string) string { if strings.Contains(key, s3KeyWAF1) || strings.Contains(key, s3KeyWAF2) { return sourceWAF } diff --git a/aws/logs_monitoring_go/internal/parsing/s3_client.go b/aws/logs_monitoring_go/internal/handling/s3_client.go similarity index 94% rename from aws/logs_monitoring_go/internal/parsing/s3_client.go rename to aws/logs_monitoring_go/internal/handling/s3_client.go index 772981d04..29738b379 100644 --- a/aws/logs_monitoring_go/internal/parsing/s3_client.go +++ b/aws/logs_monitoring_go/internal/handling/s3_client.go @@ -3,9 +3,9 @@ // This product includes software developed at Datadog (https://www.datadoghq.com/). // Copyright 2026-Present Datadog, Inc. -package parsing +package handling -//go:generate go tool mockgen -source=s3_client.go -package=parsing -destination=s3_client_mockgen.go +//go:generate go tool mockgen -source=s3_client.go -package=handling -destination=s3_client_mockgen.go import ( "context" diff --git a/aws/logs_monitoring_go/internal/parsing/s3_client_mockgen.go b/aws/logs_monitoring_go/internal/handling/s3_client_mockgen.go similarity index 91% rename from aws/logs_monitoring_go/internal/parsing/s3_client_mockgen.go rename to aws/logs_monitoring_go/internal/handling/s3_client_mockgen.go index 8158e3df2..e6c046565 100644 --- a/aws/logs_monitoring_go/internal/parsing/s3_client_mockgen.go +++ b/aws/logs_monitoring_go/internal/handling/s3_client_mockgen.go @@ -3,11 +3,11 @@ // // Generated by this command: // -// mockgen -source=s3_client.go -package=parsing -destination=s3_client_mockgen.go +// mockgen -source=s3_client.go -package=handling -destination=s3_client_mockgen.go // -// Package parsing is a generated GoMock package. -package parsing +// Package handling is a generated GoMock package. +package handling import ( context "context" diff --git a/aws/logs_monitoring_go/internal/parsing/s3_client_test.go b/aws/logs_monitoring_go/internal/handling/s3_client_test.go similarity index 92% rename from aws/logs_monitoring_go/internal/parsing/s3_client_test.go rename to aws/logs_monitoring_go/internal/handling/s3_client_test.go index c5845cc60..0bf4cfb16 100644 --- a/aws/logs_monitoring_go/internal/parsing/s3_client_test.go +++ b/aws/logs_monitoring_go/internal/handling/s3_client_test.go @@ -3,7 +3,7 @@ // This product includes software developed at Datadog (https://www.datadoghq.com/). // Copyright 2026-Present Datadog, Inc. -package parsing +package handling import ( "errors" @@ -16,13 +16,15 @@ import ( ) func TestGetS3Object(t *testing.T) { + t.Parallel() + tests := map[string]struct { mockSetup func(m *MockS3APIClient) bucket string key string wantErr bool }{ - "success": { + "returns body on success": { mockSetup: func(m *MockS3APIClient) { m.EXPECT(). GetObject(gomock.Any(), gomock.Any()). @@ -33,7 +35,7 @@ func TestGetS3Object(t *testing.T) { bucket: "my-bucket", key: "my-key", }, - "error": { + "returns error on S3 failure": { mockSetup: func(m *MockS3APIClient) { m.EXPECT(). GetObject(gomock.Any(), gomock.Any()). @@ -47,6 +49,7 @@ func TestGetS3Object(t *testing.T) { for name, tc := range tests { t.Run(name, func(t *testing.T) { + t.Parallel() ctrl := gomock.NewController(t) mock := NewMockS3APIClient(ctrl) tc.mockSetup(mock) diff --git a/aws/logs_monitoring_go/internal/parsing/s3_test.go b/aws/logs_monitoring_go/internal/handling/s3_test.go similarity index 64% rename from aws/logs_monitoring_go/internal/parsing/s3_test.go rename to aws/logs_monitoring_go/internal/handling/s3_test.go index df9d95fc0..8a5a2f02b 100644 --- a/aws/logs_monitoring_go/internal/parsing/s3_test.go +++ b/aws/logs_monitoring_go/internal/handling/s3_test.go @@ -3,7 +3,7 @@ // This product includes software developed at Datadog (https://www.datadoghq.com/). // Copyright 2026-Present Datadog, Inc. -package parsing +package handling import ( "errors" @@ -12,162 +12,150 @@ import ( "strings" "testing" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/testutil" + "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/google/go-cmp/cmp" "go.uber.org/mock/gomock" ) -var testS3Metadata = model.S3Metadata{ - Origin: model.S3Origin{Bucket: "b", Key: "k"}, +var testS3Record = events.S3EventRecord{ + S3: events.S3Entity{ + Bucket: events.S3Bucket{Name: "b"}, + Object: events.S3Object{URLDecodedKey: "k"}, + }, } func TestProcessS3Record(t *testing.T) { + t.Parallel() + tests := map[string]struct { mockSetup func(m *MockS3APIClient) + cfg *config.Config chanSize int - base s3EntryBase want []model.LogEntry wantErr bool }{ - "single_line": { + "single line": { mockSetup: func(m *MockS3APIClient) { m.EXPECT().GetObject(gomock.Any(), gomock.Any()). Return(&s3.GetObjectOutput{ Body: io.NopCloser(strings.NewReader("line1")), }, nil) }, + cfg: testutil.EmptyConfig(), chanSize: 1, - base: newTestS3Base(), want: []model.LogEntry{wantS3Entry("line1", "s3", "s3", model.Tags{"service:s3"})}, }, - "multiple_lines": { + "multiple lines": { mockSetup: func(m *MockS3APIClient) { m.EXPECT().GetObject(gomock.Any(), gomock.Any()). Return(&s3.GetObjectOutput{ Body: io.NopCloser(strings.NewReader("line1\nline2\nline3")), }, nil) }, + cfg: testutil.EmptyConfig(), chanSize: 3, - base: newTestS3Base(), want: []model.LogEntry{ wantS3Entry("line1", "s3", "s3", model.Tags{"service:s3"}), wantS3Entry("line2", "s3", "s3", model.Tags{"service:s3"}), wantS3Entry("line3", "s3", "s3", model.Tags{"service:s3"}), }, }, - "empty_file": { + "empty file": { mockSetup: func(m *MockS3APIClient) { m.EXPECT().GetObject(gomock.Any(), gomock.Any()). Return(&s3.GetObjectOutput{ Body: io.NopCloser(strings.NewReader("")), }, nil) }, - chanSize: 0, - base: newTestS3Base(), - want: nil, + cfg: testutil.EmptyConfig(), + want: nil, }, - "s3_error": { + "s3 error": { mockSetup: func(m *MockS3APIClient) { m.EXPECT().GetObject(gomock.Any(), gomock.Any()). Return(nil, errors.New("access denied")) }, - chanSize: 1, - base: newTestS3Base(), - wantErr: true, + cfg: testutil.EmptyConfig(), + wantErr: true, }, - "ddtags_extraction": { + "ddtags extraction": { mockSetup: func(m *MockS3APIClient) { m.EXPECT().GetObject(gomock.Any(), gomock.Any()). Return(&s3.GetObjectOutput{ Body: io.NopCloser(strings.NewReader(`{"ddtags":"env:prod,service:myapp","msg":"hello"}`)), }, nil) }, + cfg: testutil.EmptyConfig(), chanSize: 1, - base: newTestS3Base(), want: []model.LogEntry{wantS3Entry(`{"msg":"hello"}`, "s3", "myapp", model.Tags{"env:prod", "service:myapp"})}, }, - "invalid_utf8_stripped": { + "invalid utf8 stripped": { mockSetup: func(m *MockS3APIClient) { m.EXPECT().GetObject(gomock.Any(), gomock.Any()). Return(&s3.GetObjectOutput{ Body: io.NopCloser(strings.NewReader("hello\x80world")), }, nil) }, + cfg: testutil.EmptyConfig(), chanSize: 1, - base: newTestS3Base(), want: []model.LogEntry{wantS3Entry("helloworld", "s3", "s3", model.Tags{"service:s3"})}, }, - "multiline_groups_continuation_lines": { + "multiline groups continuation lines": { mockSetup: func(m *MockS3APIClient) { m.EXPECT().GetObject(gomock.Any(), gomock.Any()). Return(&s3.GetObjectOutput{ Body: io.NopCloser(strings.NewReader("2024-01-15 ERROR NullPointer\n at com.foo.Bar\n2024-01-15 INFO started")), }, nil) }, + cfg: &config.Config{S3MultilineLogRegex: regexp.MustCompile(`\d{4}-\d{2}-\d{2}`)}, chanSize: 2, - base: newTestS3Base(func(b *s3EntryBase) { - b.multilineRegex = regexp.MustCompile(`\d{4}-\d{2}-\d{2}`) - }), want: []model.LogEntry{ wantS3Entry("2024-01-15 ERROR NullPointer\n at com.foo.Bar\n", "s3", "s3", model.Tags{"service:s3"}), wantS3Entry("2024-01-15 INFO started", "s3", "s3", model.Tags{"service:s3"}), }, }, - "multiline_flushes_at_eof": { + "multiline flushes at eof": { mockSetup: func(m *MockS3APIClient) { m.EXPECT().GetObject(gomock.Any(), gomock.Any()). Return(&s3.GetObjectOutput{ Body: io.NopCloser(strings.NewReader("2024-01-15 ERROR\n stacktrace")), }, nil) }, + cfg: &config.Config{S3MultilineLogRegex: regexp.MustCompile(`\d{4}-\d{2}-\d{2}`)}, chanSize: 1, - base: newTestS3Base(func(b *s3EntryBase) { - b.multilineRegex = regexp.MustCompile(`\d{4}-\d{2}-\d{2}`) - }), - want: []model.LogEntry{wantS3Entry("2024-01-15 ERROR\n stacktrace", "s3", "s3", model.Tags{"service:s3"})}, - }, - "multimatch_single_line": { - mockSetup: func(m *MockS3APIClient) { - m.EXPECT().GetObject(gomock.Any(), gomock.Any()). - Return(&s3.GetObjectOutput{ - Body: io.NopCloser(strings.NewReader("2024-01-15 ERROR2024-01-15 ERROR2024-01-15 ERROR\n stacktrace")), - }, nil) - }, - chanSize: 3, - base: newTestS3Base(func(b *s3EntryBase) { - b.multilineRegex = regexp.MustCompile(`\d{4}-\d{2}-\d{2}`) - }), - want: []model.LogEntry{ - wantS3Entry("2024-01-15 ERROR", "s3", "s3", model.Tags{"service:s3"}), - wantS3Entry("2024-01-15 ERROR", "s3", "s3", model.Tags{"service:s3"}), - wantS3Entry("2024-01-15 ERROR\n stacktrace", "s3", "s3", model.Tags{"service:s3"}), - }, + want: []model.LogEntry{wantS3Entry("2024-01-15 ERROR\n stacktrace", "s3", "s3", model.Tags{"service:s3"})}, }, - "custom_tags_passed_through": { + "custom tags passed through": { mockSetup: func(m *MockS3APIClient) { m.EXPECT().GetObject(gomock.Any(), gomock.Any()). Return(&s3.GetObjectOutput{ Body: io.NopCloser(strings.NewReader("line1")), }, nil) }, + cfg: &config.Config{Tags: model.Tags{"env:prod", "team:aws"}}, chanSize: 1, - base: newTestS3Base(func(b *s3EntryBase) { - b.tags = model.Tags{"env:prod", "team:aws"} - }), - want: []model.LogEntry{wantS3Entry("line1", "s3", "s3", model.Tags{"service:s3", "env:prod", "team:aws"})}, + want: []model.LogEntry{wantS3Entry("line1", "s3", "s3", model.Tags{"service:s3", "env:prod", "team:aws"})}, }, } for name, tc := range tests { t.Run(name, func(t *testing.T) { - out := make(chan model.LogEntry, tc.chanSize) + t.Parallel() + ctrl := gomock.NewController(t) mock := NewMockS3APIClient(ctrl) tc.mockSetup(mock) - err := processS3Record(t.Context(), mock, out, tc.base) + out := make(chan model.LogEntry, tc.chanSize) + handler := NewS3(tc.cfg) + + err := handler.processRecord(t.Context(), mock, out, testS3Record, testutil.LambdaOrigin()) close(out) + var got []model.LogEntry for entry := range out { got = append(got, entry) @@ -179,7 +167,9 @@ func TestProcessS3Record(t *testing.T) { } return } - + if err != nil { + t.Fatalf("unexpected error: %v", err) + } if diff := cmp.Diff(tc.want, got); diff != "" { t.Errorf("mismatch (-want +got):\n%s", diff) } @@ -187,19 +177,15 @@ func TestProcessS3Record(t *testing.T) { } } -func newTestS3Base(opts ...func(*s3EntryBase)) s3EntryBase { - base := s3EntryBase{ - metadata: testS3Metadata, - source: "s3", - service: "s3", - tags: model.Tags{}, - } - for _, o := range opts { - o(&base) - } - return base -} - func wantS3Entry(message, source, service string, tags model.Tags) model.LogEntry { - return model.NewLogEntry(testS3Metadata, tags, message, source, service) + entry := model.NewLogEntry() + entry.Message = message + entry.Source = source + entry.Service = service + entry.Tags = tags + entry.Metadata = model.S3Metadata{ + LambdaOrigin: testutil.LambdaOrigin(), + Origin: model.S3Origin{Bucket: "b", Key: "k"}, + } + return entry } diff --git a/aws/logs_monitoring_go/internal/parsing/scanner.go b/aws/logs_monitoring_go/internal/handling/scanner.go similarity index 98% rename from aws/logs_monitoring_go/internal/parsing/scanner.go rename to aws/logs_monitoring_go/internal/handling/scanner.go index 8374f6d97..60cd249e8 100644 --- a/aws/logs_monitoring_go/internal/parsing/scanner.go +++ b/aws/logs_monitoring_go/internal/handling/scanner.go @@ -3,7 +3,7 @@ // This product includes software developed at Datadog (https://www.datadoghq.com/). // Copyright 2026-Present Datadog, Inc. -package parsing +package handling import ( "bufio" diff --git a/aws/logs_monitoring_go/internal/parsing/scanner_test.go b/aws/logs_monitoring_go/internal/handling/scanner_test.go similarity index 70% rename from aws/logs_monitoring_go/internal/parsing/scanner_test.go rename to aws/logs_monitoring_go/internal/handling/scanner_test.go index bd5a4807f..dfe302f72 100644 --- a/aws/logs_monitoring_go/internal/parsing/scanner_test.go +++ b/aws/logs_monitoring_go/internal/handling/scanner_test.go @@ -3,7 +3,7 @@ // This product includes software developed at Datadog (https://www.datadoghq.com/). // Copyright 2026-Present Datadog, Inc. -package parsing +package handling import ( "regexp" @@ -23,25 +23,25 @@ func TestScanner(t *testing.T) { re *regexp.Regexp want []string }{ - "lines_empty_string": {input: "", re: nil, want: nil}, - "lines_plain_string": {input: "hello", re: nil, want: []string{"hello"}}, - "lines_plain_string_spaces": {input: "hello world !", re: nil, want: []string{"hello world !"}}, - "lines_new_lines": {input: "a\nb\nc", re: nil, want: []string{"a", "b", "c"}}, - "lines_trailing_new_line": {input: "a\nb\n", re: nil, want: []string{"a", "b"}}, - "lines_crlf": {input: "a\r\nb\r\nc", re: nil, want: []string{"a", "b", "c"}}, - "lines_form_feed": {input: "a\fb\fc", re: nil, want: []string{"a", "b", "c"}}, - "lines_mixed_delimiters": {input: "a\r\n\fb", re: nil, want: []string{"a", "b"}}, - "lines_only_delimiters": {input: "\n\r\f", re: nil, want: nil}, - - "regex_empty": {input: "", re: dateRegex, want: nil}, - "regex_not_matching_at_start": {input: "ERROR something2024-01-15 ERROR something", re: dateRegex, want: []string{"ERROR something", "2024-01-15 ERROR something"}}, - "regex_single_entry": {input: "2024-01-15 ERROR something", re: dateRegex, want: []string{"2024-01-15 ERROR something"}}, - "regex_two_entries_with_newline": {input: "2024-01-15 ERROR\n2024-01-16 INFO", re: dateRegex, want: []string{"2024-01-15 ERROR\n", "2024-01-16 INFO"}}, - "regex_continuation_lines": {input: "2024-01-15 ERROR\n at com.foo\n2024-01-16 INFO", re: dateRegex, want: []string{"2024-01-15 ERROR\n at com.foo\n", "2024-01-16 INFO"}}, - "regex_multiple_matches_one_line": {input: "2024-01-15 ERROR2024-01-16 INFO", re: dateRegex, want: []string{"2024-01-15 ERROR", "2024-01-16 INFO"}}, - "regex_three_matches_one_line": {input: "2024-01-15 A2024-01-16 B2024-01-17 C", re: dateRegex, want: []string{"2024-01-15 A", "2024-01-16 B", "2024-01-17 C"}}, - "regex_empty_string": {input: "", re: dateRegex, want: nil}, - "regex_no_match": {input: "hello world", re: dateRegex, want: []string{"hello world"}}, + "lines empty string": {input: "", re: nil, want: nil}, + "lines plain string": {input: "hello", re: nil, want: []string{"hello"}}, + "lines plain string spaces": {input: "hello world !", re: nil, want: []string{"hello world !"}}, + "lines new lines": {input: "a\nb\nc", re: nil, want: []string{"a", "b", "c"}}, + "lines trailing new line": {input: "a\nb\n", re: nil, want: []string{"a", "b"}}, + "lines crlf": {input: "a\r\nb\r\nc", re: nil, want: []string{"a", "b", "c"}}, + "lines form feed": {input: "a\fb\fc", re: nil, want: []string{"a", "b", "c"}}, + "lines mixed delimiters": {input: "a\r\n\fb", re: nil, want: []string{"a", "b"}}, + "lines only delimiters": {input: "\n\r\f", re: nil, want: nil}, + + "regex empty": {input: "", re: dateRegex, want: nil}, + "regex not matching at start": {input: "ERROR something2024-01-15 ERROR something", re: dateRegex, want: []string{"ERROR something", "2024-01-15 ERROR something"}}, + "regex single entry": {input: "2024-01-15 ERROR something", re: dateRegex, want: []string{"2024-01-15 ERROR something"}}, + "regex two entries with newline": {input: "2024-01-15 ERROR\n2024-01-16 INFO", re: dateRegex, want: []string{"2024-01-15 ERROR\n", "2024-01-16 INFO"}}, + "regex continuation lines": {input: "2024-01-15 ERROR\n at com.foo\n2024-01-16 INFO", re: dateRegex, want: []string{"2024-01-15 ERROR\n at com.foo\n", "2024-01-16 INFO"}}, + "regex multiple matches one line": {input: "2024-01-15 ERROR2024-01-16 INFO", re: dateRegex, want: []string{"2024-01-15 ERROR", "2024-01-16 INFO"}}, + "regex three matches one line": {input: "2024-01-15 A2024-01-16 B2024-01-17 C", re: dateRegex, want: []string{"2024-01-15 A", "2024-01-16 B", "2024-01-17 C"}}, + "regex empty string": {input: "", re: dateRegex, want: nil}, + "regex no match": {input: "hello world", re: dateRegex, want: []string{"hello world"}}, } for name, tc := range tests { diff --git a/aws/logs_monitoring_go/internal/parsing/source.go b/aws/logs_monitoring_go/internal/handling/source.go similarity index 96% rename from aws/logs_monitoring_go/internal/parsing/source.go rename to aws/logs_monitoring_go/internal/handling/source.go index 0a3929878..13071e150 100644 --- a/aws/logs_monitoring_go/internal/parsing/source.go +++ b/aws/logs_monitoring_go/internal/handling/source.go @@ -3,7 +3,7 @@ // This product includes software developed at Datadog (https://www.datadoghq.com/). // Copyright 2026-Present Datadog, Inc. -package parsing +package handling const ( sourceCloudtrail = "cloudtrail" diff --git a/aws/logs_monitoring_go/internal/model/cloudwatch.go b/aws/logs_monitoring_go/internal/model/cloudwatch.go index c9467cc16..a9bf55fda 100644 --- a/aws/logs_monitoring_go/internal/model/cloudwatch.go +++ b/aws/logs_monitoring_go/internal/model/cloudwatch.go @@ -5,13 +5,6 @@ package model -type CloudwatchLogEntry struct { - LogEntry - ID string `json:"id"` - Timestamp int64 `json:"timestamp"` - Host string `json:"hostname"` -} - type CloudwatchMetadata struct { LambdaOrigin Origin CloudwatchOrigin `json:"awslogs"` diff --git a/aws/logs_monitoring_go/internal/model/log.go b/aws/logs_monitoring_go/internal/model/log.go index e9a784775..c26da1ac9 100644 --- a/aws/logs_monitoring_go/internal/model/log.go +++ b/aws/logs_monitoring_go/internal/model/log.go @@ -13,22 +13,20 @@ import ( const sourceCategory = "aws" type LogEntry struct { - Message string `json:"message"` + Host string `json:"hostname,omitempty"` + ID string `json:"id,omitempty"` + Timestamp int64 `json:"timestamp,omitempty"` + Message string `json:"message,omitempty"` + Service string `json:"service,omitempty"` Source string `json:"ddsource"` SourceCategory string `json:"ddsourcecategory"` - Service string `json:"service"` Tags Tags `json:"ddtags"` Metadata any `json:"aws"` } -func NewLogEntry(metadata any, tags Tags, message, source, service string) LogEntry { +func NewLogEntry() LogEntry { return LogEntry{ - Message: message, - Source: source, SourceCategory: sourceCategory, - Service: service, - Tags: tags, - Metadata: metadata, } } diff --git a/aws/logs_monitoring_go/internal/parsing/cloudwatch.go b/aws/logs_monitoring_go/internal/parsing/cloudwatch.go deleted file mode 100644 index 129f59f42..000000000 --- a/aws/logs_monitoring_go/internal/parsing/cloudwatch.go +++ /dev/null @@ -1,130 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2026-Present Datadog, Inc. - -package parsing - -import ( - "cmp" - "context" - "encoding/json" - "fmt" - "slices" - "strings" - - "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/concurrent" - "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" - "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" - "github.com/aws/aws-lambda-go/events" -) - -const ( - logGroupCloudtrail = "_cloudtrail_" - logGroupKinesis = "/aws/kinesis" - logGroupLambda = "/aws/lambda" - logGroupSNS = "sns/" - logStreamStepFunction = "states/" - logStreamCloudtrail = "_CloudTrail_" -) - -type cloudwatchEntryBase struct { - metadata model.CloudwatchMetadata - source string - host string - service string - tags model.Tags -} - -func HandleCloudwatch(ctx context.Context, event json.RawMessage, cfg *config.Config, out chan<- model.CloudwatchLogEntry) error { - var cwEvent events.CloudwatchLogsEvent - if err := json.Unmarshal(event, &cwEvent); err != nil { - return fmt.Errorf("unmarshal: %w", err) - } - - cwData, err := cwEvent.AWSLogs.Parse() - if err != nil { - return fmt.Errorf("parse: %w", err) - } - - if cwData.MessageType == "CONTROL_MESSAGE" { - return nil - } - - base, err := newCloudwatchEntryBase(ctx, cwData, cfg) - if err != nil { - return fmt.Errorf("create cloudwatch entry base: %w", err) - } - - for _, event := range cwData.LogEvents { - if err := concurrent.SafeSender(ctx, out, newCloudwatchLogEntry(base, event)); err != nil { - return err - } - } - - return nil -} - -func newCloudwatchEntryBase(ctx context.Context, data events.CloudwatchLogsData, cfg *config.Config) (cloudwatchEntryBase, error) { - lambdaOrigin, err := model.GetLambdaOrigin(ctx) - if err != nil { - return cloudwatchEntryBase{}, err - } - - source := cmp.Or(cfg.Source, getCloudwatchSource(strings.ToLower(data.LogGroup), data.LogStream)) - host := cmp.Or(cfg.Host, data.LogGroup) - tags, service := getTagsAndService(cfg) - service = cmp.Or(service, source) - - return cloudwatchEntryBase{ - metadata: model.CloudwatchMetadata{ - LambdaOrigin: lambdaOrigin, - Origin: model.CloudwatchOrigin{ - LogGroup: data.LogGroup, - LogStream: data.LogStream, - Owner: data.Owner, - }, - }, - source: source, - host: host, - service: service, - tags: tags, - }, nil -} - -func newCloudwatchLogEntry(base cloudwatchEntryBase, event events.CloudwatchLogsLogEvent) model.CloudwatchLogEntry { - tags, service, message := extractFromMessage(event.Message) - service = cmp.Or(service, base.service) - tags = slices.Concat(tags, model.Tags{"service:" + service}, base.tags) - return model.CloudwatchLogEntry{ - LogEntry: model.NewLogEntry(base.metadata, tags, message, base.source, service), - ID: event.ID, - Timestamp: event.Timestamp, - Host: base.host, - } -} - -func getCloudwatchSource(logGroup, logStream string) string { - if strings.HasPrefix(logStream, logStreamStepFunction) { - return sourceStepFunction - } - if strings.Contains(logStream, logStreamCloudtrail) { - return sourceCloudtrail - } - if strings.HasPrefix(logGroup, logGroupCloudtrail) { - return sourceCloudtrail - } - if strings.HasPrefix(logGroup, logGroupKinesis) { - return sourceKinesis - } - if strings.HasPrefix(logGroup, logGroupLambda) { - return sourceLambda - } - if strings.HasPrefix(logGroup, logGroupSNS) { - return sourceSNS - } - if strings.Contains(logGroup, sourceCloudtrail) { - return sourceCloudtrail - } - return sourceCloudwatch -} diff --git a/aws/logs_monitoring_go/internal/parsing/kinesis_test.go b/aws/logs_monitoring_go/internal/parsing/kinesis_test.go deleted file mode 100644 index 3699b3156..000000000 --- a/aws/logs_monitoring_go/internal/parsing/kinesis_test.go +++ /dev/null @@ -1,243 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2026-Present Datadog, Inc. - -package parsing - -import ( - "bytes" - "compress/gzip" - "encoding/json" - "testing" - - "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" - "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" - "github.com/aws/aws-lambda-go/events" - "github.com/aws/aws-lambda-go/lambdacontext" - "github.com/google/go-cmp/cmp" -) - -func TestHandleKinesis(t *testing.T) { - t.Parallel() - - ctx := lambdacontext.NewContext(t.Context(), &lambdacontext.LambdaContext{ - InvokedFunctionArn: "arn:aws:lambda:us-east-1:123456789012:function:forwarder", - }) - - tests := map[string]struct { - event json.RawMessage - config *config.Config - chanSize int - want []model.CloudwatchLogEntry - wantErr bool - }{ - "not a Kinesis event": { - event: json.RawMessage(`{"awslogs":{"data":"dGVzdA=="}}`), - config: &config.Config{}, - want: nil, - }, - "invalid JSON": { - event: json.RawMessage(`not json`), - config: &config.Config{}, - wantErr: true, - }, - "single log": { - event: mustKinesisEvent(t, mustGzipJSON(t, map[string]any{ - "messageType": "DATA_MESSAGE", - "owner": "601427279990", - "logGroup": "/aws/lambda/testing-datadog", - "logStream": "2024/10/10/[$LATEST]20bddfd5a2dc4c6b97ac02800eae90d0", - "logEvents": []map[string]any{ - {"id": "ev1", "timestamp": 1583425836114, "message": `{"status": "debug", "message": "hello"}`}, - }, - })), - config: &config.Config{}, - chanSize: 1, - want: []model.CloudwatchLogEntry{ - { - LogEntry: model.LogEntry{ - Message: `{"status": "debug", "message": "hello"}`, - Source: "lambda", - SourceCategory: "aws", - Service: "lambda", - Tags: model.Tags{"service:lambda", "forwardername:", "forwarder_version:6.0"}, - Metadata: model.CloudwatchMetadata{ - LambdaOrigin: model.LambdaOrigin{ARN: "arn:aws:lambda:us-east-1:123456789012:function:forwarder"}, - Origin: model.CloudwatchOrigin{ - LogGroup: "/aws/lambda/testing-datadog", - LogStream: "2024/10/10/[$LATEST]20bddfd5a2dc4c6b97ac02800eae90d0", - Owner: "601427279990", - }, - }, - }, - ID: "ev1", - Timestamp: 1583425836114, - Host: "/aws/lambda/testing-datadog", - }, - }, - }, - "multiple records": { - event: mustKinesisEvent(t, - mustGzipJSON(t, map[string]any{ - "messageType": "DATA_MESSAGE", - "owner": "111111111111", - "logGroup": "/aws/lambda/fn-a", - "logStream": "stream-a", - "logEvents": []map[string]any{ - {"id": "a1", "timestamp": 1000, "message": "from-a"}, - }, - }), - mustGzipJSON(t, map[string]any{ - "messageType": "DATA_MESSAGE", - "owner": "222222222222", - "logGroup": "/aws/rds/cluster", - "logStream": "stream-b", - "logEvents": []map[string]any{ - {"id": "b1", "timestamp": 2000, "message": "from-b"}, - }, - }), - ), - config: &config.Config{}, - chanSize: 2, - want: []model.CloudwatchLogEntry{ - { - LogEntry: model.LogEntry{ - Message: "from-a", - Source: "lambda", - SourceCategory: "aws", - Service: "lambda", - Tags: model.Tags{"service:lambda", "forwardername:", "forwarder_version:6.0"}, - Metadata: model.CloudwatchMetadata{ - LambdaOrigin: model.LambdaOrigin{ARN: "arn:aws:lambda:us-east-1:123456789012:function:forwarder"}, - Origin: model.CloudwatchOrigin{LogGroup: "/aws/lambda/fn-a", LogStream: "stream-a", Owner: "111111111111"}, - }, - }, - ID: "a1", - Timestamp: 1000, - Host: "/aws/lambda/fn-a", - }, - { - LogEntry: model.LogEntry{ - Message: "from-b", - Source: "cloudwatch", - SourceCategory: "aws", - Service: "cloudwatch", - Tags: model.Tags{"service:cloudwatch", "forwardername:", "forwarder_version:6.0"}, - Metadata: model.CloudwatchMetadata{ - LambdaOrigin: model.LambdaOrigin{ARN: "arn:aws:lambda:us-east-1:123456789012:function:forwarder"}, - Origin: model.CloudwatchOrigin{LogGroup: "/aws/rds/cluster", LogStream: "stream-b", Owner: "222222222222"}, - }, - }, - ID: "b1", - Timestamp: 2000, - Host: "/aws/rds/cluster", - }, - }, - }, - "bad record is skipped": { - event: mustKinesisEvent(t, - []byte("not valid gzip"), - mustGzipJSON(t, map[string]any{ - "messageType": "DATA_MESSAGE", - "owner": "123456789012", - "logGroup": "/aws/lambda/good", - "logStream": "stream", - "logEvents": []map[string]any{ - {"id": "g1", "timestamp": 3000, "message": "survived"}, - }, - }), - ), - config: &config.Config{}, - chanSize: 1, - want: []model.CloudwatchLogEntry{ - { - LogEntry: model.LogEntry{ - Message: "survived", - Source: "lambda", - SourceCategory: "aws", - Service: "lambda", - Tags: model.Tags{"service:lambda", "forwardername:", "forwarder_version:6.0"}, - Metadata: model.CloudwatchMetadata{ - LambdaOrigin: model.LambdaOrigin{ARN: "arn:aws:lambda:us-east-1:123456789012:function:forwarder"}, - Origin: model.CloudwatchOrigin{LogGroup: "/aws/lambda/good", LogStream: "stream", Owner: "123456789012"}, - }, - }, - ID: "g1", - Timestamp: 3000, - Host: "/aws/lambda/good", - }, - }, - }, - } - - for name, tc := range tests { - t.Run(name, func(t *testing.T) { - t.Parallel() - - out := make(chan model.CloudwatchLogEntry, tc.chanSize) - - err := HandleKinesis(ctx, tc.event, tc.config, out) - close(out) - - var got []model.CloudwatchLogEntry - for entry := range out { - got = append(got, entry) - } - - if tc.wantErr { - if err == nil { - t.Fatal("want error, got nil") - } - return - } - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if diff := cmp.Diff(tc.want, got); diff != "" { - t.Errorf("mismatch (-want +got):\n%s", diff) - } - }) - } -} - -func mustGzipJSON(t *testing.T, v any) []byte { - t.Helper() - - raw, err := json.Marshal(v) - if err != nil { - t.Fatalf("marshal: %v", err) - } - - var buf bytes.Buffer - w := gzip.NewWriter(&buf) - if _, err := w.Write(raw); err != nil { - t.Fatalf("gzip write: %v", err) - } - - if err := w.Close(); err != nil { - t.Fatalf("gzip close: %v", err) - } - - return buf.Bytes() -} - -func mustKinesisEvent(t *testing.T, records ...[]byte) json.RawMessage { - t.Helper() - - var evt events.KinesisEvent - for _, data := range records { - evt.Records = append(evt.Records, events.KinesisEventRecord{ - Kinesis: events.KinesisRecord{ - Data: data, - }, - }) - } - - raw, err := json.Marshal(evt) - if err != nil { - t.Fatalf("marshal kinesis event: %v", err) - } - - return raw -} diff --git a/aws/logs_monitoring_go/internal/pipeline/pipeline.go b/aws/logs_monitoring_go/internal/pipeline/pipeline.go index bcb974971..28ddf1598 100644 --- a/aws/logs_monitoring_go/internal/pipeline/pipeline.go +++ b/aws/logs_monitoring_go/internal/pipeline/pipeline.go @@ -9,39 +9,28 @@ import ( "context" "encoding/json" - "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/forwarding" - "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/processing" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" "golang.org/x/sync/errgroup" ) -func Run[T any]( +func Start( ctx context.Context, event json.RawMessage, - cfg *config.Config, - storage string, - handler func(context.Context, json.RawMessage, *config.Config, chan<- T) error, + run *Run, ) error { - g, ctx := errgroup.WithContext(ctx) // Fragile because if one goroutine fails, all stages will stop gracefully + g, ctx := errgroup.WithContext(ctx) - entries := make(chan T) - batches := make(chan []byte) - - batcher := processing.NewBatcher[T]() - forwarder := forwarding.NewForwarder(cfg, forwarding.Client, storage) + entries := make(chan model.LogEntry) + forwarder := forwarding.NewForwarder(run.Cfg, forwarding.Client, run.Storage) g.Go(func() error { defer close(entries) - return handler(ctx, event, cfg, entries) - }) - - g.Go(func() error { - defer close(batches) - return batcher.Batch(ctx, entries, batches) + return run.Handler.Handle(ctx, event, entries) }) g.Go(func() error { - return forwarder.Forward(ctx, batches) + return forwarder.Start(ctx, entries) }) return g.Wait() diff --git a/aws/logs_monitoring_go/internal/pipeline/run.go b/aws/logs_monitoring_go/internal/pipeline/run.go new file mode 100644 index 000000000..5aee557a1 --- /dev/null +++ b/aws/logs_monitoring_go/internal/pipeline/run.go @@ -0,0 +1,25 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package pipeline + +import ( + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/handling" +) + +type Run struct { + Cfg *config.Config + Handler handling.Handler + Storage string +} + +func NewRun(cfg *config.Config, handler handling.Handler, storage string) *Run { + return &Run{ + Cfg: cfg, + Handler: handler, + Storage: storage, + } +} diff --git a/aws/logs_monitoring_go/internal/processing/batching_test.go b/aws/logs_monitoring_go/internal/processing/batching_test.go deleted file mode 100644 index 9ca0490f2..000000000 --- a/aws/logs_monitoring_go/internal/processing/batching_test.go +++ /dev/null @@ -1,107 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2026-Present Datadog, Inc. - -package processing - -import ( - "encoding/json" - "strings" - "testing" - - "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" -) - -func TestBatch(t *testing.T) { - t.Parallel() - - tests := map[string]struct { - entries []model.LogEntry - wantBatchCount int - wantEntryCounts []int - }{ - "empty": { - entries: nil, - wantBatchCount: 0, - }, - "single entry": { - entries: []model.LogEntry{makeTestLogEntry()}, - wantBatchCount: 1, - wantEntryCounts: []int{1}, - }, - "multiple entries, one batch": { - entries: []model.LogEntry{makeTestLogEntry(), makeTestLogEntry(), makeTestLogEntry()}, - wantBatchCount: 1, - wantEntryCounts: []int{3}, - }, - "drop oversized entry": { - entries: []model.LogEntry{model.NewLogEntry(nil, nil, strings.Repeat("x", maxItemSize+1), "", ""), makeTestLogEntry()}, - wantBatchCount: 1, - wantEntryCounts: []int{1}, - }, - "split": { - entries: func() []model.LogEntry { - entries := make([]model.LogEntry, maxItemsPerBatch+1) - for i := range entries { - entries[i] = makeTestLogEntry() - } - return entries - }(), - wantBatchCount: 2, - wantEntryCounts: []int{maxItemsPerBatch, 1}, - }, - } - - for name, tc := range tests { - t.Run(name, func(t *testing.T) { - t.Parallel() - - batches := collectBatches(t, feedChannel(tc.entries...)) - if len(batches) != tc.wantBatchCount { - t.Fatalf("expected %d batches, got %d", tc.wantBatchCount, len(batches)) - } - - for i, wantCount := range tc.wantEntryCounts { - var entries []model.LogEntry - if err := json.Unmarshal(batches[i], &entries); err != nil { - t.Fatalf("failed to unmarshal batch %d: %v", i, err) - } - if len(entries) != wantCount { - t.Errorf("batch %d: expected %d entries, got %d", i, wantCount, len(entries)) - } - } - }) - } -} - -func makeTestLogEntry() model.LogEntry { - return model.NewLogEntry(nil, nil, "test", "test", "test") -} - -func feedChannel(entries ...model.LogEntry) <-chan model.LogEntry { - ch := make(chan model.LogEntry, len(entries)) - for _, e := range entries { - ch <- e - } - close(ch) - return ch -} - -func collectBatches(t *testing.T, in <-chan model.LogEntry) [][]byte { - t.Helper() - - out := make(chan []byte, 100) - batcher := NewBatcher[model.LogEntry]() - err := batcher.Batch(t.Context(), in, out) - close(out) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - var batches [][]byte - for b := range out { - batches = append(batches, b) - } - return batches -} diff --git a/aws/logs_monitoring_go/internal/processing/filtering.go b/aws/logs_monitoring_go/internal/processing/filtering.go deleted file mode 100644 index 17c7f320a..000000000 --- a/aws/logs_monitoring_go/internal/processing/filtering.go +++ /dev/null @@ -1,52 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2026-Present Datadog, Inc. - -package processing - -import ( - "log/slog" - "regexp" - - "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" -) - -type Filter struct { - includeRegex *regexp.Regexp - excludeRegex *regexp.Regexp -} - -func NewFilter(cfg config.FilteringConfig) Filter { - var f Filter - - if cfg.IncludePattern != "" { - re, err := regexp.Compile(cfg.IncludePattern) - if err != nil { - slog.Error("invalid include filter pattern", slog.String("pattern", cfg.IncludePattern), slog.Any("error", err)) - } else { - f.includeRegex = re - } - } - - if cfg.ExcludePattern != "" { - re, err := regexp.Compile(cfg.ExcludePattern) - if err != nil { - slog.Error("invalid exclude filter pattern", slog.String("pattern", cfg.ExcludePattern), slog.Any("error", err)) - } else { - f.excludeRegex = re - } - } - - return f -} - -func (f Filter) Match(msg string) bool { - if f.excludeRegex != nil && f.excludeRegex.MatchString(msg) { - return false - } - if f.includeRegex != nil && !f.includeRegex.MatchString(msg) { - return false - } - return true -} diff --git a/aws/logs_monitoring_go/internal/processing/filtering_test.go b/aws/logs_monitoring_go/internal/processing/filtering_test.go deleted file mode 100644 index 6f3e1ceb8..000000000 --- a/aws/logs_monitoring_go/internal/processing/filtering_test.go +++ /dev/null @@ -1,93 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2026-Present Datadog, Inc. - -package processing - -import ( - "testing" - - "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" -) - -func TestFilterMatch(t *testing.T) { - t.Parallel() - - tests := map[string]struct { - cfg config.FilteringConfig - msg string - want bool - }{ - "no_filter": { - cfg: config.FilteringConfig{}, - msg: "hello", - want: true, - }, - "include_match": { - cfg: config.FilteringConfig{IncludePattern: `error`}, - msg: "an error occurred", - want: true, - }, - "include_no_match": { - cfg: config.FilteringConfig{IncludePattern: `error`}, - msg: "hello", - want: false, - }, - "exclude_match": { - cfg: config.FilteringConfig{ExcludePattern: `DEBUG`}, - msg: "DEBUG message", - want: false, - }, - "exclude_no_match": { - cfg: config.FilteringConfig{ExcludePattern: `DEBUG`}, - msg: "INFO message", - want: true, - }, - "exclude_overrides_include": { - cfg: config.FilteringConfig{IncludePattern: `error`, ExcludePattern: `error`}, - msg: "error message", - want: false, - }, - "include_match_exclude_not_match": { - cfg: config.FilteringConfig{IncludePattern: `error`, ExcludePattern: `DEBUG`}, - msg: "error happened", - want: true, - }, - "include_and_exclude_neither_match": { - cfg: config.FilteringConfig{IncludePattern: `error`, ExcludePattern: `DEBUG`}, - msg: "INFO normal", - want: false, - }, - "partial_match": { - cfg: config.FilteringConfig{IncludePattern: `err`}, - msg: "some error here", - want: true, - }, - "case_sensitive": { - cfg: config.FilteringConfig{IncludePattern: `ERROR`}, - msg: "error", - want: false, - }, - "special_chars": { - cfg: config.FilteringConfig{IncludePattern: `\d{3}`}, - msg: "error 404", - want: true, - }, - "empty": { - cfg: config.FilteringConfig{IncludePattern: `error`}, - msg: "", - want: false, - }, - } - - for name, tc := range tests { - t.Run(name, func(t *testing.T) { - t.Parallel() - f := NewFilter(tc.cfg) - if got := f.Match(tc.msg); got != tc.want { - t.Errorf("Match(%q) = %v, want %v", tc.msg, got, tc.want) - } - }) - } -} diff --git a/aws/logs_monitoring_go/internal/processing/scrubbing_test.go b/aws/logs_monitoring_go/internal/processing/scrubbing_test.go deleted file mode 100644 index 263faf2ba..000000000 --- a/aws/logs_monitoring_go/internal/processing/scrubbing_test.go +++ /dev/null @@ -1,120 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2026-Present Datadog, Inc. - -package processing - -import ( - "testing" - - "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" -) - -func TestNewScrubber(t *testing.T) { - t.Parallel() - - tests := map[string]struct { - cfg config.ScrubbingConfig - nRules int - }{ - "no_rules": { - cfg: config.ScrubbingConfig{}, - nRules: 0, - }, - "ip_only": { - cfg: config.ScrubbingConfig{ScrubIP: true}, - nRules: 1, - }, - "email_only": { - cfg: config.ScrubbingConfig{ScrubEmail: true}, - nRules: 1, - }, - "ip_and_email": { - cfg: config.ScrubbingConfig{ScrubIP: true, ScrubEmail: true}, - nRules: 2, - }, - "custom_rule": { - cfg: config.ScrubbingConfig{CustomRule: `\d+`, CustomReplacement: "NUM"}, - nRules: 1, - }, - "all_rules": { - cfg: config.ScrubbingConfig{ScrubIP: true, ScrubEmail: true, CustomRule: `secret`, CustomReplacement: "[REDACTED]"}, - nRules: 3, - }, - "invalid_custom_regex": { - cfg: config.ScrubbingConfig{CustomRule: `([invalid`}, - nRules: 0, - }, - } - - for name, tc := range tests { - t.Run(name, func(t *testing.T) { - t.Parallel() - s := NewScrubber(tc.cfg) - if got := len(s.rules); got != tc.nRules { - t.Errorf("got %d rules, want %d", got, tc.nRules) - } - }) - } -} - -func TestScrubMessage(t *testing.T) { - tests := map[string]struct { - cfg config.ScrubbingConfig - input string - want string - }{ - "ip_redaction": { - cfg: config.ScrubbingConfig{ScrubIP: true}, - input: "connected from 192.168.1.1 to 10.0.0.1", - want: "connected from xxx.xxx.xxx.xxx to xxx.xxx.xxx.xxx", - }, - "email_redaction": { - cfg: config.ScrubbingConfig{ScrubEmail: true}, - input: "user john.doe@example.com logged in", - want: "user xxxxx@xxxxx.com logged in", - }, - "custom_pattern": { - cfg: config.ScrubbingConfig{CustomRule: `secret-\w+`, CustomReplacement: "[REDACTED]"}, - input: "token=secret-abc123 visible", - want: "token=[REDACTED] visible", - }, - "custom_empty_replacement": { - cfg: config.ScrubbingConfig{CustomRule: `remove-this `}, - input: "remove-this here", - want: "here", - }, - "ip_and_email_sequential": { - cfg: config.ScrubbingConfig{ScrubIP: true, ScrubEmail: true}, - input: "192.168.1.1 user@host.com", - want: "xxx.xxx.xxx.xxx xxxxx@xxxxx.com", - }, - "no_match": { - cfg: config.ScrubbingConfig{ScrubIP: true, ScrubEmail: true}, - input: "clean message with no sensitive data", - want: "clean message with no sensitive data", - }, - "multiple_ips": { - cfg: config.ScrubbingConfig{ScrubIP: true}, - input: "src=1.2.3.4 dst=5.6.7.8 via=10.0.0.1", - want: "src=xxx.xxx.xxx.xxx dst=xxx.xxx.xxx.xxx via=xxx.xxx.xxx.xxx", - }, - "non_ascii_custom": { - cfg: config.ScrubbingConfig{CustomRule: `[^\x01-\x7f]+`, CustomReplacement: "xxxxx"}, - input: "abcdef\u65e5\u672c\u8a9eefg\u304b\u304d\u304f\u3051\u3053hij", - want: "abcdefxxxxxefgxxxxxhij", - }, - } - - for name, tc := range tests { - t.Run(name, func(t *testing.T) { - t.Parallel() - s := NewScrubber(tc.cfg) - got := s.ScrubMessage(tc.input) - if got != tc.want { - t.Errorf("got %q, want %q", got, tc.want) - } - }) - } -} diff --git a/aws/logs_monitoring_go/internal/processing/scrubbing.go b/aws/logs_monitoring_go/internal/scrubbing/scrubber.go similarity index 56% rename from aws/logs_monitoring_go/internal/processing/scrubbing.go rename to aws/logs_monitoring_go/internal/scrubbing/scrubber.go index 9f623e298..be992b857 100644 --- a/aws/logs_monitoring_go/internal/processing/scrubbing.go +++ b/aws/logs_monitoring_go/internal/scrubbing/scrubber.go @@ -3,13 +3,11 @@ // This product includes software developed at Datadog (https://www.datadoghq.com/). // Copyright 2026-Present Datadog, Inc. -package processing +package scrubbing import ( - "log/slog" + "fmt" "regexp" - - "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" ) const ( @@ -29,44 +27,40 @@ type Scrubber struct { rules []scrubbingRule } -func NewScrubber(cfg config.ScrubbingConfig) Scrubber { +func NewScrubber(customMatch, customReplacement string, ip, email bool) (*Scrubber, error) { var rules []scrubbingRule - - if cfg.ScrubIP { + if ip { rules = append(rules, scrubbingRule{ regex: regexp.MustCompile(ipPattern), replacement: ipReplacement, }) } - - if cfg.ScrubEmail { + if email { rules = append(rules, scrubbingRule{ regex: regexp.MustCompile(emailPattern), replacement: emailReplacement, }) } - - if cfg.CustomRule != "" { - re, err := regexp.Compile(cfg.CustomRule) + if customMatch != "" { + re, err := regexp.Compile(customMatch) if err != nil { - slog.Error("invalid custom scrubbing rule, make sure your regex is RE2 compatible", - slog.String("pattern", cfg.CustomRule), - slog.Any("error", err), - ) - } else { - rules = append(rules, scrubbingRule{ - regex: re, - replacement: cfg.CustomReplacement, - }) + return nil, fmt.Errorf("compile custom scrubbing: %w", err) } - } - return Scrubber{rules: rules} + rules = append(rules, scrubbingRule{ + regex: re, + replacement: customReplacement, + }) + } + return &Scrubber{rules: rules}, nil } -func (s Scrubber) ScrubMessage(msg string) string { +func (s *Scrubber) Scrub(content string) string { + if s == nil { + return content + } for _, rule := range s.rules { - msg = rule.regex.ReplaceAllString(msg, rule.replacement) + content = rule.regex.ReplaceAllString(content, rule.replacement) } - return msg + return content } diff --git a/aws/logs_monitoring_go/internal/scrubbing/scrubber_test.go b/aws/logs_monitoring_go/internal/scrubbing/scrubber_test.go new file mode 100644 index 000000000..f05d901c3 --- /dev/null +++ b/aws/logs_monitoring_go/internal/scrubbing/scrubber_test.go @@ -0,0 +1,146 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package scrubbing + +import ( + "testing" +) + +func TestNewScrubber(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + customMatch string + customReplacement string + ip bool + email bool + nRules int + wantErr bool + }{ + "no rules": { + nRules: 0, + }, + "ip only": { + ip: true, + nRules: 1, + }, + "email only": { + email: true, + nRules: 1, + }, + "ip and email": { + ip: true, + email: true, + nRules: 2, + }, + "custom rule": { + customMatch: `\d+`, + customReplacement: "NUM", + nRules: 1, + }, + "all rules": { + ip: true, + email: true, + customMatch: `secret`, + customReplacement: "[REDACTED]", + nRules: 3, + }, + "invalid custom regex": { + customMatch: `([invalid`, + wantErr: true, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + t.Parallel() + s, err := NewScrubber(tc.customMatch, tc.customReplacement, tc.ip, tc.email) + if tc.wantErr { + if err == nil { + t.Fatal("want error, got nil") + } + return + } + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got := len(s.rules); got != tc.nRules { + t.Errorf("got %d rules, want %d", got, tc.nRules) + } + }) + } +} + +func TestScrub(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + customMatch string + customReplacement string + ip bool + email bool + input string + want string + }{ + "ip_redaction": { + ip: true, + input: "connected from 192.168.1.1 to 10.0.0.1", + want: "connected from xxx.xxx.xxx.xxx to xxx.xxx.xxx.xxx", + }, + "email_redaction": { + email: true, + input: "user john.doe@example.com logged in", + want: "user xxxxx@xxxxx.com logged in", + }, + "custom_pattern": { + customMatch: `secret-\w+`, + customReplacement: "[REDACTED]", + input: "token=secret-abc123 visible", + want: "token=[REDACTED] visible", + }, + "custom_empty_replacement": { + customMatch: `remove-this `, + input: "remove-this here", + want: "here", + }, + "ip_and_email_sequential": { + ip: true, + email: true, + input: "192.168.1.1 user@host.com", + want: "xxx.xxx.xxx.xxx xxxxx@xxxxx.com", + }, + "no_match": { + ip: true, + email: true, + input: "clean message with no sensitive data", + want: "clean message with no sensitive data", + }, + "multiple_ips": { + ip: true, + input: "src=1.2.3.4 dst=5.6.7.8 via=10.0.0.1", + want: "src=xxx.xxx.xxx.xxx dst=xxx.xxx.xxx.xxx via=xxx.xxx.xxx.xxx", + }, + "non_ascii_custom": { + customMatch: `[^\x01-\x7f]+`, + customReplacement: "xxxxx", + input: "abcdef\u65e5\u672c\u8a9eefg\u304b\u304d\u304f\u3051\u3053hij", // abcdef日本語efgかきくけこhij + want: "abcdefxxxxxefgxxxxxhij", + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + t.Parallel() + s, err := NewScrubber(tc.customMatch, tc.customReplacement, tc.ip, tc.email) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got := s.Scrub(tc.input); got != tc.want { + t.Errorf("got %q, want %q", got, tc.want) + } + }) + } +} diff --git a/aws/logs_monitoring_go/internal/testutil/testutil.go b/aws/logs_monitoring_go/internal/testutil/testutil.go new file mode 100644 index 000000000..5ee3815a3 --- /dev/null +++ b/aws/logs_monitoring_go/internal/testutil/testutil.go @@ -0,0 +1,92 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package testutil + +import ( + "bytes" + "compress/gzip" + "context" + "encoding/base64" + "encoding/json" + "testing" + + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" + "github.com/aws/aws-lambda-go/events" + "github.com/aws/aws-lambda-go/lambdacontext" +) + +const ARN = "arn:aws:lambda:us-east-1:123456789012:function:forwarder" + +func LambdaContext(t *testing.T) context.Context { + t.Helper() + return lambdacontext.NewContext(t.Context(), &lambdacontext.LambdaContext{ + InvokedFunctionArn: ARN, + }) +} + +func LambdaOrigin() model.LambdaOrigin { + return model.LambdaOrigin{ARN: ARN} +} + +func EmptyConfig() *config.Config { + return &config.Config{} +} + +func MustGzipJSON(t *testing.T, v any) []byte { + t.Helper() + + raw, err := json.Marshal(v) + if err != nil { + t.Fatalf("marshal: %v", err) + } + + var buf bytes.Buffer + w := gzip.NewWriter(&buf) + if _, err := w.Write(raw); err != nil { + t.Fatalf("gzip write: %v", err) + } + if err := w.Close(); err != nil { + t.Fatalf("gzip close: %v", err) + } + + return buf.Bytes() +} + +func MustCloudwatchEvent(t *testing.T, data []byte) json.RawMessage { + t.Helper() + + evt := events.CloudwatchLogsEvent{ + AWSLogs: events.CloudwatchLogsRawData{ + Data: base64.StdEncoding.EncodeToString(data), + }, + } + + raw, err := json.Marshal(evt) + if err != nil { + t.Fatalf("marshal cloudwatch event: %v", err) + } + return raw +} + +func MustKinesisEvent(t *testing.T, records ...[]byte) json.RawMessage { + t.Helper() + + var evt events.KinesisEvent + for _, data := range records { + evt.Records = append(evt.Records, events.KinesisEventRecord{ + Kinesis: events.KinesisRecord{ + Data: data, + }, + }) + } + + raw, err := json.Marshal(evt) + if err != nil { + t.Fatalf("marshal kinesis event: %v", err) + } + return raw +}