Skip to content

Commit b75208a

Browse files
authored
feat(go-forwarder): add S3 retry (#1143)
* refactor(go-forwarder): decoupling config.Config (#1144) * refactor(go-forwarder): stop sending on first error (#1147)
1 parent f72291e commit b75208a

37 files changed

Lines changed: 1042 additions & 625 deletions

aws/logs_monitoring_go/cmd/forwarder/main.go

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,19 @@ package main
88
import (
99
"context"
1010
"encoding/json"
11+
"errors"
1112
"fmt"
1213
"log"
1314

1415
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config"
16+
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/filtering"
17+
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/forwarding"
18+
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/handling"
1519
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/httpclient"
1620
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/parsing"
1721
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/pipeline"
22+
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/scrubbing"
23+
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/storing"
1824

1925
"github.com/aws/aws-lambda-go/lambda"
2026
)
@@ -52,6 +58,42 @@ func handleRequest(cfg *config.Config) func(ctx context.Context, event json.RawM
5258
if err != nil {
5359
return fmt.Errorf("parse: %w", err)
5460
}
55-
return pipeline.Start(ctx, parsed, cfg)
61+
62+
if len(parsed) == 0 {
63+
return errors.New("no events to process")
64+
}
65+
66+
filterer := filtering.NewFilterer(cfg.FilterInclude, cfg.FilterExclude)
67+
scrubber := scrubbing.NewScrubber(cfg.ScrubbingRegex, cfg.ScrubbingReplacement, cfg.ScrubIP, cfg.ScrubEmail)
68+
handlerCfg := handling.Config{
69+
Host: cfg.Host,
70+
Service: cfg.Service,
71+
Source: cfg.Source,
72+
Tags: cfg.Tags,
73+
S3MultilineLogRegex: cfg.S3MultilineLogRegex,
74+
UseFIPS: cfg.UseFIPS,
75+
}
76+
77+
forwarderCfg := forwarding.Config{
78+
APIKey: cfg.APIKey,
79+
IntakeURL: cfg.IntakeURL,
80+
CompressionLevel: cfg.CompressionLevel,
81+
}
82+
83+
var storage storing.Storage
84+
if cfg.StoreOnFail {
85+
storageOpts := storing.Options{FIPS: cfg.UseFIPS, S3Bucket: cfg.S3RetryBucketName}
86+
if storage, err = storing.NewStorage(ctx, storageOpts); err != nil {
87+
return fmt.Errorf("new storage: %w", err)
88+
}
89+
}
90+
91+
forwarder := forwarding.NewForwarder(
92+
forwarderCfg,
93+
httpclient.Client,
94+
storage,
95+
)
96+
97+
return pipeline.New(handlerCfg, scrubber, filterer, forwarder).Start(ctx, parsed)
5698
}
5799
}

aws/logs_monitoring_go/internal/batching/batcher.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"bytes"
1010
"context"
1111
"encoding/json"
12+
"fmt"
1213
"log/slog"
1314

1415
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/concurrent"
@@ -26,7 +27,7 @@ type Batcher struct {
2627
batchSize int
2728
}
2829

29-
func NewBatcher() *Batcher {
30+
func New() *Batcher {
3031
return &Batcher{
3132
batch: make([][]byte, 0, maxItemsPerBatch),
3233
}
@@ -44,8 +45,7 @@ func (b *Batcher) Batch(ctx context.Context, in <-chan model.LogEntry, out chan<
4445

4546
data, err := json.Marshal(entry)
4647
if err != nil {
47-
slog.Warn("failed to marshal log entry, skipped", slog.Any("error", err))
48-
continue
48+
return fmt.Errorf("marshal: %w", err)
4949
}
5050

5151
if len(data) > maxItemSize {

aws/logs_monitoring_go/internal/batching/batcher_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func TestBatch(t *testing.T) {
6464
}
6565
close(in)
6666

67-
batcher := NewBatcher()
67+
batcher := New()
6868
err := batcher.Batch(t.Context(), in, out)
6969
require.NoError(t, err)
7070
close(out)

aws/logs_monitoring_go/internal/config/apikey.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func (c *Config) ValidateAPIKey(ctx context.Context) error {
5757
}
5858

5959
if len(c.APIKey) != 32 {
60-
return fmt.Errorf("expected 32 characters, got %d. Verify your API key at https://app.%s/organization-settings/api-keys: %w", len(c.APIKey), c.Site, ErrInvalidAPIKey)
60+
return fmt.Errorf("expected 32 characters, got %d: %w", len(c.APIKey), ErrInvalidAPIKey)
6161
}
6262

6363
slog.Debug("validating Datadog API key")
@@ -79,9 +79,10 @@ func (c *Config) ValidateAPIKey(ctx context.Context) error {
7979
defer httpclient.DrainClose(resp)
8080

8181
if resp.StatusCode == http.StatusForbidden {
82-
slog.Warn("invalid Datadog API key", slog.String("url", "https://app."+c.Site+"/organization-settings/api-keys"))
83-
} else if resp.StatusCode != http.StatusOK {
84-
slog.Warn("unexpected response from validation endpoint", slog.String("status", resp.Status))
82+
return errors.New("invalid Datadog API key")
83+
}
84+
if resp.StatusCode != http.StatusOK {
85+
return fmt.Errorf("key validation (http/%d)", resp.StatusCode)
8586
}
8687

8788
return nil

aws/logs_monitoring_go/internal/config/config.go

Lines changed: 55 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,19 @@ package config
88
import (
99
"compress/gzip"
1010
"errors"
11-
"fmt"
1211
"log/slog"
12+
"os"
1313
"regexp"
1414

15-
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/filtering"
1615
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model"
17-
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/scrubbing"
1816
)
1917

2018
const (
21-
DefaultSite = "datadoghq.com"
22-
DefaultLogLevel = "INFO"
19+
DefaultSite = "datadoghq.com"
20+
DefaultPort = "443"
21+
DefaultProtocol = "https"
22+
DefaultLogLevel = "INFO"
23+
2324
EnvAPIKey = "DD_API_KEY"
2425
EnvSite = "DD_SITE"
2526
EnvURL = "DD_URL"
@@ -40,97 +41,92 @@ const (
4041
EnvRedactEmail = "REDACT_EMAIL"
4142
EnvIncludeAtMatch = "INCLUDE_AT_MATCH"
4243
EnvExcludeAtMatch = "EXCLUDE_AT_MATCH"
44+
EnvS3RetryBucketName = "DD_S3_BUCKET_NAME"
45+
EnvStoreFailedEvents = "DD_STORE_FAILED_EVENTS"
4346
ForwarderVersion = "6.0"
4447
)
4548

4649
type Config struct {
4750
APIKey string
4851
APIURL string
49-
CompressionLevel int
50-
Filter *filtering.Filter
51-
Host string
5252
IntakeURL string
53-
LogLevel string
54-
Port string
55-
S3MultilineLogRegex *regexp.Regexp
56-
Scrubber *scrubbing.Scrubber
57-
Service string
58-
Site string
53+
CompressionLevel int
5954
SkipServerCertificate bool
55+
UseFIPS bool
56+
Host string
6057
Source string
58+
Service string
6159
Tags model.Tags
62-
UseFIPS bool
63-
UseHTTP bool
60+
S3MultilineLogRegex *regexp.Regexp
61+
FilterInclude *regexp.Regexp
62+
FilterExclude *regexp.Regexp
63+
ScrubbingRegex *regexp.Regexp
64+
ScrubbingReplacement string
65+
ScrubIP bool
66+
ScrubEmail bool
67+
StoreOnFail bool
68+
S3RetryBucketName string
6469
}
6570

6671
func Load() (*Config, error) {
67-
logLevel := envOrDefault(EnvLogLevel, DefaultLogLevel)
68-
initLogger(logLevel)
72+
initLogger(envOrDefault(EnvLogLevel, DefaultLogLevel))
6973
logDroppedEnvVars()
7074

7175
var cfg Config
72-
cfg.LogLevel = logLevel
7376
cfg.loadEnv()
7477
cfg.extractFromEnv()
7578

76-
err := cfg.compileS3MultilineLogRegex()
77-
78-
scrubber, scrubbingErr := scrubbing.NewScrubber(
79-
envOrDefault(EnvScrubbingRule, ""),
80-
envOrDefault(EnvScrubbingRuleReplacement, ""),
81-
envOrDefaultBool(EnvRedactIP, false),
82-
envOrDefaultBool(EnvRedactEmail, false),
83-
)
84-
err = errors.Join(err, scrubbingErr)
85-
86-
filter, filteringErr := filtering.NewFilter(
87-
envOrDefault(EnvIncludeAtMatch, ""),
88-
envOrDefault(EnvExcludeAtMatch, ""),
89-
)
90-
err = errors.Join(err, filteringErr)
91-
if err != nil {
92-
return nil, err
79+
var errs []error
80+
patterns := []struct {
81+
env string
82+
field **regexp.Regexp
83+
}{
84+
{env: EnvScrubbingRule, field: &cfg.ScrubbingRegex},
85+
{env: EnvIncludeAtMatch, field: &cfg.FilterInclude},
86+
{env: EnvExcludeAtMatch, field: &cfg.FilterExclude},
87+
{env: EnvMultilineLogRegex, field: &cfg.S3MultilineLogRegex},
88+
}
89+
for _, pattern := range patterns {
90+
if envValue := os.Getenv(pattern.env); envValue != "" {
91+
re, err := regexp.Compile(envValue)
92+
*pattern.field = re
93+
errs = append(errs, err)
94+
}
9395
}
9496

95-
cfg.Scrubber = scrubber
96-
cfg.Filter = filter
97-
return &cfg, nil
97+
return &cfg, errors.Join(errs...)
9898
}
9999

100100
func (c *Config) loadEnv() {
101-
c.Site = envOrDefault(EnvSite, DefaultSite)
101+
site := envOrDefault(EnvSite, DefaultSite)
102+
port := envOrDefault(EnvPort, DefaultPort)
103+
protocol := DefaultProtocol
104+
102105
c.SkipServerCertificate = envOrDefaultBool(EnvSkipServerCertificate, false)
103-
c.UseHTTP = envOrDefaultBool(EnvUseHTTP, false)
104106

105-
protocol := "https"
106-
if c.UseHTTP {
107+
if envOrDefaultBool(EnvUseHTTP, false) {
107108
protocol = "http"
108109
}
109110

110111
compressionLevel := envOrDefaultInt(EnvCompressionLevel, gzip.DefaultCompression)
111112
if compressionLevel < gzip.HuffmanOnly || compressionLevel > gzip.BestCompression {
112113
slog.Warn("invalid compression level, falling back to default", slog.Int("level", compressionLevel), slog.Int("fallback", gzip.DefaultCompression))
113-
compressionLevel = gzip.BestCompression
114+
compressionLevel = gzip.DefaultCompression
114115
}
115116
c.CompressionLevel = compressionLevel
116117

117-
c.Port = envOrDefault(EnvPort, "443")
118-
c.IntakeURL = envOrDefault(EnvURL, protocol+"://http-intake.logs."+c.Site+":"+c.Port+"/api/v2/logs")
119-
c.APIURL = envOrDefault(EnvAPIURL, protocol+"://api."+c.Site)
118+
c.IntakeURL = envOrDefault(EnvURL, protocol+"://http-intake.logs."+site+":"+port+"/api/v2/logs")
119+
c.APIURL = envOrDefault(EnvAPIURL, protocol+"://api."+site)
120+
120121
c.UseFIPS = envOrDefaultBool(EnvUseFIPS, false)
122+
121123
c.Source = envOrDefault(EnvSource, "")
122124
c.Host = envOrDefault(EnvHost, "")
123-
}
124125

125-
func (c *Config) compileS3MultilineLogRegex() error {
126-
pattern := envOrDefault(EnvMultilineLogRegex, "")
127-
if pattern == "" {
128-
return nil
129-
}
130-
re, err := regexp.Compile(pattern)
131-
if err != nil {
132-
return fmt.Errorf("compile multiline log regex: %w", err)
133-
}
134-
c.S3MultilineLogRegex = re
135-
return nil
126+
c.StoreOnFail = envOrDefaultBool(EnvStoreFailedEvents, false)
127+
c.S3RetryBucketName = envOrDefault(EnvS3RetryBucketName, "")
128+
129+
c.ScrubbingReplacement = envOrDefault(EnvScrubbingRuleReplacement, "")
130+
c.ScrubIP = envOrDefaultBool(EnvRedactIP, false)
131+
c.ScrubEmail = envOrDefaultBool(EnvRedactEmail, false)
136132
}

aws/logs_monitoring_go/internal/config/config_test.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,27 +23,27 @@ func TestLoad(t *testing.T) {
2323
wantErr bool
2424
}{
2525
"defaults": {
26-
want: Config{Site: DefaultSite, IntakeURL: defaultURL, APIURL: defaultAPI},
26+
want: Config{IntakeURL: defaultURL, APIURL: defaultAPI},
2727
},
2828
"eu site": {
2929
env: map[string]string{EnvSite: "datadoghq.eu"},
30-
want: Config{Site: "datadoghq.eu", IntakeURL: "https://http-intake.logs.datadoghq.eu:443/api/v2/logs", APIURL: "https://api.datadoghq.eu"},
30+
want: Config{IntakeURL: "https://http-intake.logs.datadoghq.eu:443/api/v2/logs", APIURL: "https://api.datadoghq.eu"},
3131
},
3232
"custom url": {
3333
env: map[string]string{EnvURL: "https://custom.example.com"},
34-
want: Config{Site: DefaultSite, IntakeURL: "https://custom.example.com", APIURL: defaultAPI},
34+
want: Config{IntakeURL: "https://custom.example.com", APIURL: defaultAPI},
3535
},
3636
"source and host": {
3737
env: map[string]string{EnvSource: "custom", EnvHost: "my-host"},
38-
want: Config{Site: DefaultSite, IntakeURL: defaultURL, APIURL: defaultAPI, Source: "custom", Host: "my-host"},
38+
want: Config{IntakeURL: defaultURL, APIURL: defaultAPI, Source: "custom", Host: "my-host"},
3939
},
4040
"fips enabled": {
4141
env: map[string]string{EnvUseFIPS: "true"},
42-
want: Config{Site: DefaultSite, IntakeURL: defaultURL, APIURL: defaultAPI, UseFIPS: true},
42+
want: Config{IntakeURL: defaultURL, APIURL: defaultAPI, UseFIPS: true},
4343
},
4444
"valid multiline regex": {
4545
env: map[string]string{EnvMultilineLogRegex: `\d{4}-\d{2}-\d{2}`},
46-
want: Config{Site: DefaultSite, IntakeURL: defaultURL, APIURL: defaultAPI},
46+
want: Config{IntakeURL: defaultURL, APIURL: defaultAPI},
4747
wantRegex: true,
4848
},
4949
"invalid multiline regex": {
@@ -66,7 +66,6 @@ func TestLoad(t *testing.T) {
6666
}
6767
require.NoError(t, err)
6868

69-
assert.Equal(t, tc.want.Site, got.Site)
7069
assert.Equal(t, tc.want.IntakeURL, got.IntakeURL)
7170
assert.Equal(t, tc.want.APIURL, got.APIURL)
7271
assert.Equal(t, tc.want.Source, got.Source)

aws/logs_monitoring_go/internal/filtering/filter.go

Lines changed: 0 additions & 52 deletions
This file was deleted.

0 commit comments

Comments
 (0)