Skip to content

Commit b14650c

Browse files
authored
[AWSINTS-3516] refactor(go-forwarder) (#1115)
1 parent 1feb74f commit b14650c

40 files changed

Lines changed: 1473 additions & 1269 deletions

aws/logs_monitoring_go/cmd/forwarder/main.go

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,40 +8,49 @@ package main
88
import (
99
"context"
1010
"encoding/json"
11-
"log/slog"
11+
"errors"
1212

1313
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config"
1414
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/forwarding"
15+
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/handling"
1516
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/parsing"
1617
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/pipeline"
1718

1819
"github.com/aws/aws-lambda-go/lambda"
1920
)
2021

2122
func main() {
22-
ctx := context.Background()
23-
cfg, err := config.Load(ctx)
23+
cfg, err := config.Load()
2424
if err != nil {
25-
slog.Error("config load failed", slog.Any("error", err))
26-
return
25+
panic(err)
2726
}
27+
err = cfg.ResolveAPIKey(context.Background())
28+
if err != nil {
29+
panic(err)
30+
}
31+
err = cfg.ValidateAPIKey(context.Background())
32+
if err != nil {
33+
panic(err)
34+
}
35+
36+
cwHandler := handling.NewCloudwatch(cfg)
37+
kinesisHandler := handling.NewKinesis(cfg)
38+
s3Handler := handling.NewS3(cfg)
39+
handling.Register(parsing.InvocationSourceCloudwatchLogs, cwHandler)
40+
handling.Register(parsing.InvocationSourceKinesis, kinesisHandler)
41+
handling.Register(parsing.InvocationSourceS3, s3Handler)
2842

2943
lambda.Start(handleRequest(cfg))
3044
}
3145

32-
func handleRequest(cfg *config.Config) func(context.Context, json.RawMessage) error {
46+
func handleRequest(cfg *config.Config) func(ctx context.Context, event json.RawMessage) error {
3347
return func(ctx context.Context, event json.RawMessage) error {
34-
invocationSource := parsing.DetectInvocationSource(event)
35-
switch invocationSource {
36-
case parsing.InvocationSourceCloudwatchLogs:
37-
return pipeline.Run(ctx, event, cfg, forwarding.CloudwatchStorage, parsing.HandleCloudwatch)
38-
case parsing.InvocationSourceKinesis:
39-
return pipeline.Run(ctx, event, cfg, forwarding.CloudwatchStorage, parsing.HandleKinesis)
40-
case parsing.InvocationSourceS3:
41-
return pipeline.Run(ctx, event, cfg, forwarding.S3Storage, parsing.HandleS3)
42-
default:
43-
slog.Error("unsupported invocation source", slog.String("source", invocationSource.String()))
44-
return nil
48+
invocation := parsing.DetectInvocationSource(event)
49+
if invocation == parsing.InvocationSourceUnknown {
50+
return errors.New("unknown invocation")
4551
}
52+
53+
run := pipeline.NewRun(cfg, handling.Handlers[invocation], forwarding.Storage(invocation))
54+
return pipeline.Start(ctx, event, run)
4655
}
4756
}

aws/logs_monitoring_go/internal/processing/batching.go renamed to aws/logs_monitoring_go/internal/batching/batcher.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
// This product includes software developed at Datadog (https://www.datadoghq.com/).
44
// Copyright 2026-Present Datadog, Inc.
55

6-
package processing
6+
package batching
77

88
import (
99
"bytes"
@@ -12,6 +12,7 @@ import (
1212
"log/slog"
1313

1414
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/concurrent"
15+
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model"
1516
)
1617

1718
const (
@@ -20,18 +21,18 @@ const (
2021
maxItemsPerBatch = 1000
2122
)
2223

23-
type Batcher[T any] struct {
24+
type Batcher struct {
2425
batch [][]byte
2526
batchSize int
2627
}
2728

28-
func NewBatcher[T any]() *Batcher[T] {
29-
return &Batcher[T]{
29+
func NewBatcher() *Batcher {
30+
return &Batcher{
3031
batch: make([][]byte, 0, maxItemsPerBatch),
3132
}
3233
}
3334

34-
func (b *Batcher[T]) Batch(ctx context.Context, in <-chan T, out chan<- []byte) error {
35+
func (b *Batcher) Batch(ctx context.Context, in <-chan model.LogEntry, out chan<- []byte) error {
3536
for {
3637
entry, ok, err := concurrent.SafeReader(ctx, in)
3738
if err != nil {
@@ -66,7 +67,7 @@ func (b *Batcher[T]) Batch(ctx context.Context, in <-chan T, out chan<- []byte)
6667
}
6768
}
6869

69-
func (b *Batcher[T]) flush(ctx context.Context, out chan<- []byte) error {
70+
func (b *Batcher) flush(ctx context.Context, out chan<- []byte) error {
7071
if len(b.batch) == 0 {
7172
return nil
7273
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed
2+
// under the Apache License Version 2.0.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
// Copyright 2026-Present Datadog, Inc.
5+
6+
package batching
7+
8+
import (
9+
"encoding/json"
10+
"strings"
11+
"testing"
12+
13+
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model"
14+
)
15+
16+
func TestBatch(t *testing.T) {
17+
t.Parallel()
18+
19+
tests := map[string]struct {
20+
entries []model.LogEntry
21+
wantBatchCount int
22+
wantEntryCounts []int
23+
}{
24+
"empty": {
25+
entries: nil,
26+
wantBatchCount: 0,
27+
},
28+
"single entry": {
29+
entries: []model.LogEntry{model.NewLogEntry()},
30+
wantBatchCount: 1,
31+
wantEntryCounts: []int{1},
32+
},
33+
"multiple entries, one batch": {
34+
entries: []model.LogEntry{model.NewLogEntry(), model.NewLogEntry(), model.NewLogEntry()},
35+
wantBatchCount: 1,
36+
wantEntryCounts: []int{3},
37+
},
38+
"drop oversized entry": {
39+
entries: func() []model.LogEntry {
40+
entry := model.NewLogEntry()
41+
entry.Message = strings.Repeat("a", maxItemSize+1)
42+
return []model.LogEntry{entry}
43+
}(),
44+
wantBatchCount: 0,
45+
wantEntryCounts: nil,
46+
},
47+
"split": {
48+
entries: make([]model.LogEntry, maxItemsPerBatch+1),
49+
wantBatchCount: 2,
50+
wantEntryCounts: []int{maxItemsPerBatch, 1},
51+
},
52+
}
53+
54+
for name, tc := range tests {
55+
t.Run(name, func(t *testing.T) {
56+
t.Parallel()
57+
58+
in := make(chan model.LogEntry, len(tc.entries))
59+
out := make(chan []byte, len(tc.wantEntryCounts))
60+
for _, entry := range tc.entries {
61+
in <- entry
62+
}
63+
close(in)
64+
65+
batcher := NewBatcher()
66+
err := batcher.Batch(t.Context(), in, out)
67+
if err != nil {
68+
t.Fatalf("unexpected error: %v", err)
69+
}
70+
close(out)
71+
72+
var batches [][]byte
73+
for b := range out {
74+
batches = append(batches, b)
75+
}
76+
77+
if len(batches) != tc.wantBatchCount {
78+
t.Fatalf("Batch() got %d batches, want %d", len(batches), tc.wantBatchCount)
79+
}
80+
81+
for i, wantCount := range tc.wantEntryCounts {
82+
var entries []model.LogEntry
83+
if err := json.Unmarshal(batches[i], &entries); err != nil {
84+
t.Fatalf("Batch() failed to unmarshal batch %d: %v", i, err)
85+
}
86+
if len(entries) != wantCount {
87+
t.Errorf("Batch() batch %d: got %d entries, want %d", i, len(entries), wantCount)
88+
}
89+
}
90+
})
91+
}
92+
}

aws/logs_monitoring_go/internal/config/apikey.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ type apiKeyResolver struct {
3030
resolve func(ctx context.Context, value string) (string, error)
3131
}
3232

33-
func (c *Config) resolveAPIKey(ctx context.Context) error {
33+
func (c *Config) ResolveAPIKey(ctx context.Context) error {
3434
resolvers := []apiKeyResolver{
3535
{"DD_API_KEY_SECRET_ARN", c.resolveAPIKeyFromSecretsManager},
3636
{"DD_API_KEY_SSM_NAME", c.resolveAPIKeyFromSSM},
@@ -55,7 +55,7 @@ func (c *Config) resolveAPIKey(ctx context.Context) error {
5555
return errors.New("no API key configured: set DD_API_KEY_SECRET_ARN, DD_API_KEY_SSM_NAME or DD_KMS_API_KEY. See: https://docs.datadoghq.com/serverless/forwarder/")
5656
}
5757

58-
func (c *Config) validateAPIKey(ctx context.Context) error {
58+
func (c *Config) ValidateAPIKey(ctx context.Context) error {
5959
if c.APIKey == "" {
6060
return fmt.Errorf("set DD_API_KEY_SECRET_ARN, DD_API_KEY_SSM_NAME or DD_KMS_API_KEY. See: https://docs.datadoghq.com/serverless/forwarder/: %w", ErrMissingAPIKey)
6161
}

aws/logs_monitoring_go/internal/config/config.go

Lines changed: 46 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,13 @@
66
package config
77

88
import (
9-
"context"
9+
"errors"
1010
"fmt"
11-
"log/slog"
1211
"regexp"
12+
13+
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/filtering"
14+
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model"
15+
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/scrubbing"
1316
)
1417

1518
const ForwarderVersion = "6.0"
@@ -23,96 +26,65 @@ type Config struct {
2326
UseFIPS bool
2427
Source string
2528
Host string
26-
CustomTags string
27-
Scrubbing ScrubbingConfig
28-
Filtering FilteringConfig
29+
Tags model.Tags
30+
Service string
31+
Scrubber *scrubbing.Scrubber
32+
Filter *filtering.Filter
2933
S3MultilineLogRegex *regexp.Regexp
3034
}
3135

32-
type ScrubbingConfig struct {
33-
ScrubIP bool
34-
ScrubEmail bool
35-
CustomRule string
36-
CustomReplacement string
37-
}
38-
39-
type FilteringConfig struct {
40-
IncludePattern string
41-
ExcludePattern string
42-
}
43-
44-
func Load(ctx context.Context) (*Config, error) {
45-
initLogger(envOrDefault("DD_LOG_LEVEL", "INFO"))
36+
func Load() (*Config, error) {
37+
logLevel := envOrDefault("DD_LOG_LEVEL", "INFO")
38+
initLogger(logLevel)
4639
logDroppedEnvVars()
4740

48-
cfg := loadConfig()
49-
slog.Debug("config loaded", "config", cfg)
41+
var cfg Config
42+
cfg.LogLevel = logLevel
43+
cfg.loadEnv()
44+
cfg.extractFromEnv()
5045

51-
if err := cfg.resolveAPIKey(ctx); err != nil {
52-
return nil, fmt.Errorf("resolving API key: %w", err)
53-
}
46+
err := cfg.compileS3MultilineLogRegex()
47+
48+
scrubber, scrubbingErr := scrubbing.NewScrubber(
49+
envOrDefault("DD_SCRUBBING_RULE", ""),
50+
envOrDefault("DD_SCRUBBING_RULE_REPLACEMENT", ""),
51+
envOrDefaultBool("REDACT_IP", false),
52+
envOrDefaultBool("REDACT_EMAIL", false),
53+
)
54+
err = errors.Join(err, scrubbingErr)
5455

55-
if err := cfg.validateAPIKey(ctx); err != nil {
56-
return nil, fmt.Errorf("validating API key: %w", err)
56+
filter, filteringErr := filtering.NewFilter(
57+
envOrDefault("INCLUDE_AT_MATCH", ""),
58+
envOrDefault("EXCLUDE_AT_MATCH", ""),
59+
)
60+
err = errors.Join(err, filteringErr)
61+
if err != nil {
62+
return nil, err
5763
}
5864

59-
return cfg, nil
65+
cfg.Scrubber = scrubber
66+
cfg.Filter = filter
67+
return &cfg, nil
6068
}
6169

62-
func loadConfig() *Config {
63-
S3MultilineLogRegex := loadS3MultilineLogRegex()
64-
site := envOrDefault("DD_SITE", "datadoghq.com")
65-
66-
return &Config{
67-
Site: site,
68-
IntakeURL: envOrDefault("DD_URL", "https://http-intake.logs."+site+"/api/v2/logs"),
69-
APIURL: envOrDefault("DD_API_URL", "https://api."+site),
70-
LogLevel: envOrDefault("DD_LOG_LEVEL", "INFO"),
71-
UseFIPS: envOrDefaultBool("DD_USE_FIPS", false),
72-
Source: envOrDefault("DD_SOURCE", ""),
73-
Host: envOrDefault("DD_HOST", ""),
74-
CustomTags: envOrDefault("DD_TAGS", ""),
75-
Scrubbing: ScrubbingConfig{
76-
ScrubIP: envOrDefaultBool("REDACT_IP", false),
77-
ScrubEmail: envOrDefaultBool("REDACT_EMAIL", false),
78-
CustomRule: envOrDefault("DD_SCRUBBING_RULE", ""),
79-
CustomReplacement: envOrDefault("DD_SCRUBBING_RULE_REPLACEMENT", ""),
80-
},
81-
Filtering: FilteringConfig{
82-
IncludePattern: envOrDefault("INCLUDE_AT_MATCH", ""),
83-
ExcludePattern: envOrDefault("EXCLUDE_AT_MATCH", ""),
84-
},
85-
S3MultilineLogRegex: S3MultilineLogRegex,
86-
}
70+
func (c *Config) loadEnv() {
71+
c.Site = envOrDefault("DD_SITE", "datadoghq.com")
72+
c.IntakeURL = envOrDefault("DD_URL", "https://http-intake.logs."+c.Site+"/api/v2/logs")
73+
c.APIURL = envOrDefault("DD_API_URL", "https://api."+c.Site)
74+
c.UseFIPS = envOrDefaultBool("DD_USE_FIPS", false)
75+
c.Source = envOrDefault("DD_SOURCE", "")
76+
c.Host = envOrDefault("DD_HOST", "")
8777
}
8878

89-
func loadS3MultilineLogRegex() *regexp.Regexp {
79+
func (c *Config) compileS3MultilineLogRegex() error {
9080
pattern := envOrDefault("DD_MULTILINE_LOG_REGEX_PATTERN", "")
9181
if pattern == "" {
9282
return nil
9383
}
94-
9584
re, err := regexp.Compile(pattern)
9685
if err != nil {
97-
slog.Error("invalid multiline log pattern", slog.String("pattern", pattern), slog.Any("error", err))
98-
return nil
86+
return fmt.Errorf("compile multiline log regex: %w", err)
9987
}
100-
101-
return re
102-
}
103-
104-
func (c Config) LogValue() slog.Value {
105-
return slog.GroupValue(
106-
slog.String("site", c.Site),
107-
slog.String("intakeUrl", c.IntakeURL),
108-
slog.String("apiUrl", c.APIURL),
109-
slog.String("loglevel", c.LogLevel),
110-
slog.Bool("fips", c.UseFIPS),
111-
slog.Bool("redactIP", c.Scrubbing.ScrubIP),
112-
slog.Bool("redactEmail", c.Scrubbing.ScrubEmail),
113-
slog.Bool("customScrubbing", c.Scrubbing.CustomRule != ""),
114-
slog.Bool("includeFilter", c.Filtering.IncludePattern != ""),
115-
slog.Bool("excludeFilter", c.Filtering.ExcludePattern != ""),
116-
slog.Bool("multilineRegex", c.S3MultilineLogRegex != nil),
117-
)
88+
c.S3MultilineLogRegex = re
89+
return nil
11890
}

0 commit comments

Comments
 (0)