Skip to content

Commit 002e0bb

Browse files
committed
refactor(go-forwarder): decouple key resolution, cache clients
share AWS config between sdk clients misc feat(go-forwarder): add SQS as failed event storage delete ChangeMessageVisibilityBatch API call Add functional option pattern to batching and use batching inside SQS Delete old Store implementation + change message on debug
1 parent 507a75b commit 002e0bb

14 files changed

Lines changed: 726 additions & 334 deletions

File tree

aws/logs_monitoring_go/cmd/forwarder/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,8 @@ func handleRequest(cfg *config.Config) func(ctx context.Context, event json.RawM
7878

7979
var storage storing.Storage
8080
if cfg.StoreOnFail {
81-
storageOpts := storing.Options{S3Bucket: cfg.S3RetryBucketName}
82-
if storage, err = storing.NewStorage(ctx, storageOpts); err != nil {
81+
storageOpts := storing.Options{S3Bucket: cfg.S3RetryBucketName, SQSQueue: cfg.SQSQueueURL}
82+
if storage, err = storing.NewStorage(storageOpts); err != nil {
8383
return fmt.Errorf("new storage: %w", err)
8484
}
8585
}

aws/logs_monitoring_go/go.mod

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ go 1.26
44

55
require (
66
github.com/aws/aws-lambda-go v1.53.0
7-
github.com/aws/aws-sdk-go-v2 v1.41.5
7+
github.com/aws/aws-sdk-go-v2 v1.42.0
88
github.com/aws/aws-sdk-go-v2/config v1.32.12
99
github.com/aws/aws-sdk-go-v2/service/kms v1.50.4
1010
github.com/aws/aws-sdk-go-v2/service/s3 v1.99.0
@@ -19,19 +19,20 @@ require (
1919
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.8 // indirect
2020
github.com/aws/aws-sdk-go-v2/credentials v1.19.12 // indirect
2121
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.20 // indirect
22-
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.21 // indirect
23-
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.21 // indirect
22+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.29 // indirect
23+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.29 // indirect
2424
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.6 // indirect
2525
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.22 // indirect
2626
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.7 // indirect
2727
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.13 // indirect
2828
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.21 // indirect
2929
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.21 // indirect
3030
github.com/aws/aws-sdk-go-v2/service/signin v1.0.8 // indirect
31+
github.com/aws/aws-sdk-go-v2/service/sqs v1.44.0 // indirect
3132
github.com/aws/aws-sdk-go-v2/service/sso v1.30.13 // indirect
3233
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.17 // indirect
3334
github.com/aws/aws-sdk-go-v2/service/sts v1.41.9 // indirect
34-
github.com/aws/smithy-go v1.24.2 // indirect
35+
github.com/aws/smithy-go v1.27.1 // indirect
3536
github.com/davecgh/go-spew v1.1.1 // indirect
3637
github.com/google/go-cmp v0.7.0 // indirect
3738
github.com/pmezard/go-difflib v1.0.0 // indirect

aws/logs_monitoring_go/go.sum

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ github.com/aws/aws-lambda-go v1.53.0 h1:uAMv6W/vCP/L494BAUSxe+8KVBIPK+SGPyapFt3F
22
github.com/aws/aws-lambda-go v1.53.0/go.mod h1:dpMpZgvWx5vuQJfBt0zqBha60q7Dd7RfgJv23DymV8A=
33
github.com/aws/aws-sdk-go-v2 v1.41.5 h1:dj5kopbwUsVUVFgO4Fi5BIT3t4WyqIDjGKCangnV/yY=
44
github.com/aws/aws-sdk-go-v2 v1.41.5/go.mod h1:mwsPRE8ceUUpiTgF7QmQIJ7lgsKUPQOUl3o72QBrE1o=
5+
github.com/aws/aws-sdk-go-v2 v1.42.0 h1:XvXMJTkFQtpBKIWZnmr9ZEOc2InWM2yldjXEJ/bymhA=
6+
github.com/aws/aws-sdk-go-v2 v1.42.0/go.mod h1:27+ACypSLljLAEKsCYOmrjKh83vuTRkuAe9Uv/3A4bg=
57
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.8 h1:eBMB84YGghSocM7PsjmmPffTa+1FBUeNvGvFou6V/4o=
68
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.8/go.mod h1:lyw7GFp3qENLh7kwzf7iMzAxDn+NzjXEAGjKS2UOKqI=
79
github.com/aws/aws-sdk-go-v2/config v1.32.12 h1:O3csC7HUGn2895eNrLytOJQdoL2xyJy0iYXhoZ1OmP0=
@@ -12,8 +14,12 @@ github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.20 h1:zOgq3uezl5nznfoK3ODuqb
1214
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.20/go.mod h1:z/MVwUARehy6GAg/yQ1GO2IMl0k++cu1ohP9zo887wE=
1315
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.21 h1:Rgg6wvjjtX8bNHcvi9OnXWwcE0a2vGpbwmtICOsvcf4=
1416
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.21/go.mod h1:A/kJFst/nm//cyqonihbdpQZwiUhhzpqTsdbhDdRF9c=
17+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.29 h1:f3vKqSo13fhTYb+JEcXwXefZQE26I1FB5eTSniU67ko=
18+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.29/go.mod h1:MzoLFUArKGpGD+ukmPiTPG1X5x4o6M2kq4v2dr1FiEc=
1519
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.21 h1:PEgGVtPoB6NTpPrBgqSE5hE/o47Ij9qk/SEZFbUOe9A=
1620
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.21/go.mod h1:p+hz+PRAYlY3zcpJhPwXlLC4C+kqn70WIHwnzAfs6ps=
21+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.29 h1:RdwIf/CuUsvJX3RgJagbOyotl/cxoLY4xviKuE7p2GY=
22+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.29/go.mod h1:71wt8W2EgswdZy9Mf9KNnzxZ3TiZlv4caKghPktDOkA=
1723
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.6 h1:qYQ4pzQ2Oz6WpQ8T3HvGHnZydA72MnLuFK9tJwmrbHw=
1824
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.6/go.mod h1:O3h0IK87yXci+kg6flUKzJnWeziQUKciKrLjcatSNcY=
1925
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.22 h1:rWyie/PxDRIdhNf4DzRk0lvjVOqFJuNnO8WwaIRVxzQ=
@@ -34,6 +40,8 @@ github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.41.4 h1:9aZbO86sraeCIHHCp
3440
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.41.4/go.mod h1:cxiXDhEzIq7Xx1BtmC4lGBK3SwAZ79+EUWiKawYHo14=
3541
github.com/aws/aws-sdk-go-v2/service/signin v1.0.8 h1:0GFOLzEbOyZABS3PhYfBIx2rNBACYcKty+XGkTgw1ow=
3642
github.com/aws/aws-sdk-go-v2/service/signin v1.0.8/go.mod h1:LXypKvk85AROkKhOG6/YEcHFPoX+prKTowKnVdcaIxE=
43+
github.com/aws/aws-sdk-go-v2/service/sqs v1.44.0 h1:6DQ95Zq5xPUSYm6KWcrar7zvreqAMFmyynYCcmzv6yc=
44+
github.com/aws/aws-sdk-go-v2/service/sqs v1.44.0/go.mod h1:d7eKytKiwDFJeNAP4VWo47VCiNM9z539tkoCGJ6PjXw=
3745
github.com/aws/aws-sdk-go-v2/service/ssm v1.68.4 h1:5Wg8AAAnIWM2LE/0KFGqllZff96bm4dBs+uerYFfReE=
3846
github.com/aws/aws-sdk-go-v2/service/ssm v1.68.4/go.mod h1:nph0ypDLWm9D9iA9zOX39W/N+A4GqwzlxA13jzXVD4k=
3947
github.com/aws/aws-sdk-go-v2/service/sso v1.30.13 h1:kiIDLZ005EcKomYYITtfsjn7dtOwHDOFy7IbPXKek2o=
@@ -44,6 +52,8 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.41.9 h1:Cng+OOwCHmFljXIxpEVXAGMnBia8
4452
github.com/aws/aws-sdk-go-v2/service/sts v1.41.9/go.mod h1:LrlIndBDdjA/EeXeyNBle+gyCwTlizzW5ycgWnvIxkk=
4553
github.com/aws/smithy-go v1.24.2 h1:FzA3bu/nt/vDvmnkg+R8Xl46gmzEDam6mZ1hzmwXFng=
4654
github.com/aws/smithy-go v1.24.2/go.mod h1:YE2RhdIuDbA5E5bTdciG9KrW3+TiEONeUWCqxX9i1Fc=
55+
github.com/aws/smithy-go v1.27.1 h1:4T340VFndXtADGF52gYa1POyL7s9E4Z1OeZ1hCscIw8=
56+
github.com/aws/smithy-go v1.27.1/go.mod h1:YE2RhdIuDbA5E5bTdciG9KrW3+TiEONeUWCqxX9i1Fc=
4757
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
4858
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
4959
github.com/google/go-cmdtest v0.4.1-0.20220921163831-55ab3332a786 h1:rcv+Ippz6RAtvaGgKxc+8FQIpxHgsF+HBzPyYL2cyVU=

aws/logs_monitoring_go/internal/batching/batcher.go

Lines changed: 98 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
package batching
77

88
import (
9-
"bytes"
109
"context"
1110
"encoding/json"
1211
"fmt"
@@ -17,79 +16,137 @@ import (
1716
)
1817

1918
const (
20-
maxItemSize = 1 * 1024 * 1024
21-
maxBatchSize = 5 * 1024 * 1024
22-
maxItemsPerBatch = 1000
19+
defaultMaxItemSize = 1 * 1024 * 1024
20+
defaultMaxBatchSize = 5 * 1024 * 1024
21+
defaultMaxItemsPerBatch = 1000
2322
)
2423

2524
type Batcher struct {
26-
batch [][]byte
27-
batchSize int
25+
maxItemSize int
26+
maxBatchSize int
27+
maxItemsPerBatch int
28+
batch []json.RawMessage
29+
batchSize int
2830
}
2931

30-
func New() *Batcher {
31-
return &Batcher{
32-
batch: make([][]byte, 0, maxItemsPerBatch),
32+
func New(opts ...Option) *Batcher {
33+
b := &Batcher{
34+
maxItemSize: defaultMaxItemSize,
35+
maxBatchSize: defaultMaxBatchSize,
36+
maxItemsPerBatch: defaultMaxItemsPerBatch,
37+
batchSize: 2, // '[' and ']'
3338
}
39+
40+
for _, opt := range opts {
41+
opt(b)
42+
}
43+
44+
b.batch = make([]json.RawMessage, 0, b.maxItemsPerBatch)
45+
return b
3446
}
3547

36-
func (b *Batcher) Batch(ctx context.Context, in <-chan model.LogEntry, out chan<- []byte) error {
48+
func (b *Batcher) Start(ctx context.Context, in <-chan model.LogEntry, out chan<- json.RawMessage) error {
3749
for {
38-
entry, ok, err := concurrent.SafeReader(ctx, in)
39-
if err != nil {
40-
return err
41-
}
50+
v, ok, _ := concurrent.SafeReader(ctx, in)
4251
if !ok {
43-
return b.flush(ctx, out)
52+
batch, err := b.construct()
53+
if err != nil {
54+
return err
55+
}
56+
if err = concurrent.SafeSender(ctx, out, batch); err != nil {
57+
return err
58+
}
59+
break
4460
}
4561

46-
data, err := json.Marshal(entry)
62+
item, err := json.Marshal(v)
4763
if err != nil {
4864
return fmt.Errorf("marshal: %w", err)
4965
}
5066

51-
if len(data) > maxItemSize {
52-
slog.Warn("log entry exceeds max item size, dropping",
53-
slog.Int("size", len(data)),
54-
slog.Int("max", maxItemSize),
67+
if !b.valid(item) {
68+
slog.Warn("invalid item, dropping",
69+
slog.Int("size", len(item)),
70+
slog.Int("max", b.maxItemSize),
5571
)
5672
continue
5773
}
5874

59-
if b.batchSize+len(data) > maxBatchSize || len(b.batch) >= maxItemsPerBatch {
60-
if err := b.flush(ctx, out); err != nil {
75+
if ok := b.add(item); !ok {
76+
batch, err := b.construct()
77+
if err != nil {
78+
return err
79+
}
80+
if err = concurrent.SafeSender(ctx, out, batch); err != nil {
6181
return err
6282
}
83+
_ = b.add(item)
84+
}
85+
}
86+
return nil
87+
}
88+
89+
func (b *Batcher) StartSlice(items []json.RawMessage) ([]json.RawMessage, error) {
90+
var batchedItems []json.RawMessage
91+
for _, item := range items {
92+
if !b.valid(item) {
93+
slog.Warn("invalid item, dropping",
94+
slog.Int("size", len(item)),
95+
slog.Int("max", b.maxItemSize),
96+
)
97+
continue
98+
}
99+
100+
if ok := b.add(item); !ok {
101+
batch, err := b.construct()
102+
if err != nil {
103+
return nil, err
104+
}
105+
batchedItems = append(batchedItems, batch)
106+
_ = b.add(item)
63107
}
108+
}
109+
110+
batch, err := b.construct()
111+
if err != nil {
112+
return nil, err
113+
}
64114

65-
b.batch = append(b.batch, data)
66-
b.batchSize += len(data)
115+
if batch != nil {
116+
batchedItems = append(batchedItems, batch)
67117
}
118+
return batchedItems, nil
68119
}
69120

70-
func (b *Batcher) flush(ctx context.Context, out chan<- []byte) error {
71-
if len(b.batch) == 0 {
72-
return nil
121+
func (b *Batcher) add(item json.RawMessage) bool {
122+
if len(b.batch) >= b.maxItemsPerBatch || b.batchSize+len(item)+1 > b.maxBatchSize {
123+
return false
73124
}
74125

75-
payload := assembleBatch(b.batch)
76-
b.batch = b.batch[:0]
77-
b.batchSize = 0
126+
b.batch = append(b.batch, item)
127+
b.batchSize += len(item) + 1 // ','
128+
return true
129+
}
78130

79-
return concurrent.SafeSender(ctx, out, payload)
131+
func (b *Batcher) valid(item json.RawMessage) bool {
132+
return len(item) <= b.maxItemSize
80133
}
81134

82-
func assembleBatch(entries [][]byte) []byte {
83-
var buf bytes.Buffer
135+
func (b *Batcher) construct() (json.RawMessage, error) {
136+
if len(b.batch) == 0 {
137+
return nil, nil
138+
}
84139

85-
buf.WriteByte('[')
86-
for i, entry := range entries {
87-
if i > 0 {
88-
buf.WriteByte(',')
89-
}
90-
buf.Write(entry)
140+
batch, err := json.Marshal(&b.batch)
141+
if err != nil {
142+
return nil, fmt.Errorf("marshal: %w", err)
91143
}
92-
buf.WriteByte(']')
93144

94-
return buf.Bytes()
145+
b.reset()
146+
return json.RawMessage(batch), nil
147+
}
148+
149+
func (b *Batcher) reset() {
150+
b.batch = b.batch[:0]
151+
b.batchSize = 2
95152
}

aws/logs_monitoring_go/internal/batching/batcher_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,16 +40,16 @@ func TestBatch(t *testing.T) {
4040
"drop oversized entry": {
4141
entries: func() []model.LogEntry {
4242
entry := model.NewLogEntry()
43-
entry.Message = strings.Repeat("a", maxItemSize+1)
43+
entry.Message = strings.Repeat("a", 1*1024*1024+1)
4444
return []model.LogEntry{entry}
4545
}(),
4646
wantBatchCount: 0,
4747
wantEntryCounts: nil,
4848
},
4949
"split": {
50-
entries: make([]model.LogEntry, maxItemsPerBatch+1),
50+
entries: make([]model.LogEntry, 1001),
5151
wantBatchCount: 2,
52-
wantEntryCounts: []int{maxItemsPerBatch, 1},
52+
wantEntryCounts: []int{1000, 1},
5353
},
5454
}
5555

@@ -58,14 +58,14 @@ func TestBatch(t *testing.T) {
5858
t.Parallel()
5959

6060
in := make(chan model.LogEntry, len(tc.entries))
61-
out := make(chan []byte, len(tc.wantEntryCounts))
61+
out := make(chan json.RawMessage, len(tc.wantEntryCounts))
6262
for _, entry := range tc.entries {
6363
in <- entry
6464
}
6565
close(in)
6666

6767
batcher := New()
68-
err := batcher.Batch(t.Context(), in, out)
68+
err := batcher.Start(t.Context(), in, out)
6969
require.NoError(t, err)
7070
close(out)
7171

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed
2+
// under the Apache License Version 2.0.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
// Copyright 2026-Present Datadog, Inc.
5+
6+
package batching
7+
8+
type Option func(*Batcher)
9+
10+
func WithMaxItemSize(n int) Option {
11+
return func(b *Batcher) {
12+
b.maxItemSize = n
13+
}
14+
}
15+
16+
func WithMaxBatchSize(n int) Option {
17+
return func(b *Batcher) {
18+
b.maxBatchSize = n
19+
}
20+
}
21+
22+
func WithMaxItemsPerBatch(n int) Option {
23+
return func(b *Batcher) {
24+
b.maxItemsPerBatch = n
25+
}
26+
}

aws/logs_monitoring_go/internal/config/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ const (
4242
EnvIncludeAtMatch = "INCLUDE_AT_MATCH"
4343
EnvExcludeAtMatch = "EXCLUDE_AT_MATCH"
4444
EnvS3RetryBucketName = "DD_S3_BUCKET_NAME"
45+
EnvSQSQueueURL = "DD_SQS_QUEUE_URL"
4546
EnvStoreFailedEvents = "DD_STORE_FAILED_EVENTS"
4647
ForwarderVersion = "6.0"
4748
)
@@ -76,6 +77,7 @@ type Config struct {
7677
ScrubEmail bool
7778
StoreOnFail bool
7879
S3RetryBucketName string
80+
SQSQueueURL string
7981
}
8082

8183
func Load() (*Config, error) {
@@ -136,6 +138,7 @@ func (c *Config) loadEnv() {
136138

137139
c.StoreOnFail = envOrDefaultBool(EnvStoreFailedEvents, false)
138140
c.S3RetryBucketName = envOrDefault(EnvS3RetryBucketName, "")
141+
c.SQSQueueURL = envOrDefault(EnvSQSQueueURL, "")
139142

140143
c.ScrubbingReplacement = envOrDefault(EnvScrubbingRuleReplacement, "")
141144
c.ScrubIP = envOrDefaultBool(EnvRedactIP, false)

0 commit comments

Comments
 (0)