Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions aws/logs_monitoring_go/cmd/forwarder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ func handleRequest(cfg *config.Config) func(ctx context.Context, awsevent json.R
var storage storing.Storage
var storageErr error
if cfg.StoreOnFail {
storageOpts := storing.Options{S3Bucket: cfg.S3RetryBucketName}
if storage, storageErr = storing.NewStorage(ctx, storageOpts); storageErr != nil {
storageOpts := storing.Options{S3Bucket: cfg.S3RetryBucketName, SQSQueue: cfg.SQSQueueURL}
if storage, storageErr = storing.NewStorage(storageOpts); storageErr != nil {
return nil, fmt.Errorf("new storage: %w", storageErr)
}
}
Expand Down
9 changes: 5 additions & 4 deletions aws/logs_monitoring_go/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.26

require (
github.com/aws/aws-lambda-go v1.53.0
github.com/aws/aws-sdk-go-v2 v1.41.5
github.com/aws/aws-sdk-go-v2 v1.42.0
github.com/aws/aws-sdk-go-v2/config v1.32.12
github.com/aws/aws-sdk-go-v2/service/kms v1.50.4
github.com/aws/aws-sdk-go-v2/service/s3 v1.99.0
Expand All @@ -19,19 +19,20 @@ require (
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.8 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.19.12 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.20 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.21 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.21 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.29 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.29 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.6 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.22 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.7 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.13 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.21 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.21 // indirect
github.com/aws/aws-sdk-go-v2/service/signin v1.0.8 // indirect
github.com/aws/aws-sdk-go-v2/service/sqs v1.44.0 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.30.13 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.17 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.41.9 // indirect
github.com/aws/smithy-go v1.24.2 // indirect
github.com/aws/smithy-go v1.27.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/go-cmp v0.7.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
Expand Down
10 changes: 10 additions & 0 deletions aws/logs_monitoring_go/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ github.com/aws/aws-lambda-go v1.53.0 h1:uAMv6W/vCP/L494BAUSxe+8KVBIPK+SGPyapFt3F
github.com/aws/aws-lambda-go v1.53.0/go.mod h1:dpMpZgvWx5vuQJfBt0zqBha60q7Dd7RfgJv23DymV8A=
github.com/aws/aws-sdk-go-v2 v1.41.5 h1:dj5kopbwUsVUVFgO4Fi5BIT3t4WyqIDjGKCangnV/yY=
github.com/aws/aws-sdk-go-v2 v1.41.5/go.mod h1:mwsPRE8ceUUpiTgF7QmQIJ7lgsKUPQOUl3o72QBrE1o=
github.com/aws/aws-sdk-go-v2 v1.42.0 h1:XvXMJTkFQtpBKIWZnmr9ZEOc2InWM2yldjXEJ/bymhA=
github.com/aws/aws-sdk-go-v2 v1.42.0/go.mod h1:27+ACypSLljLAEKsCYOmrjKh83vuTRkuAe9Uv/3A4bg=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.8 h1:eBMB84YGghSocM7PsjmmPffTa+1FBUeNvGvFou6V/4o=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.8/go.mod h1:lyw7GFp3qENLh7kwzf7iMzAxDn+NzjXEAGjKS2UOKqI=
github.com/aws/aws-sdk-go-v2/config v1.32.12 h1:O3csC7HUGn2895eNrLytOJQdoL2xyJy0iYXhoZ1OmP0=
Expand All @@ -12,8 +14,12 @@ github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.20 h1:zOgq3uezl5nznfoK3ODuqb
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.20/go.mod h1:z/MVwUARehy6GAg/yQ1GO2IMl0k++cu1ohP9zo887wE=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.21 h1:Rgg6wvjjtX8bNHcvi9OnXWwcE0a2vGpbwmtICOsvcf4=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.21/go.mod h1:A/kJFst/nm//cyqonihbdpQZwiUhhzpqTsdbhDdRF9c=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.29 h1:f3vKqSo13fhTYb+JEcXwXefZQE26I1FB5eTSniU67ko=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.29/go.mod h1:MzoLFUArKGpGD+ukmPiTPG1X5x4o6M2kq4v2dr1FiEc=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.21 h1:PEgGVtPoB6NTpPrBgqSE5hE/o47Ij9qk/SEZFbUOe9A=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.21/go.mod h1:p+hz+PRAYlY3zcpJhPwXlLC4C+kqn70WIHwnzAfs6ps=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.29 h1:RdwIf/CuUsvJX3RgJagbOyotl/cxoLY4xviKuE7p2GY=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.29/go.mod h1:71wt8W2EgswdZy9Mf9KNnzxZ3TiZlv4caKghPktDOkA=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.6 h1:qYQ4pzQ2Oz6WpQ8T3HvGHnZydA72MnLuFK9tJwmrbHw=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.6/go.mod h1:O3h0IK87yXci+kg6flUKzJnWeziQUKciKrLjcatSNcY=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.22 h1:rWyie/PxDRIdhNf4DzRk0lvjVOqFJuNnO8WwaIRVxzQ=
Expand All @@ -34,6 +40,8 @@ github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.41.4 h1:9aZbO86sraeCIHHCp
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.41.4/go.mod h1:cxiXDhEzIq7Xx1BtmC4lGBK3SwAZ79+EUWiKawYHo14=
github.com/aws/aws-sdk-go-v2/service/signin v1.0.8 h1:0GFOLzEbOyZABS3PhYfBIx2rNBACYcKty+XGkTgw1ow=
github.com/aws/aws-sdk-go-v2/service/signin v1.0.8/go.mod h1:LXypKvk85AROkKhOG6/YEcHFPoX+prKTowKnVdcaIxE=
github.com/aws/aws-sdk-go-v2/service/sqs v1.44.0 h1:6DQ95Zq5xPUSYm6KWcrar7zvreqAMFmyynYCcmzv6yc=
github.com/aws/aws-sdk-go-v2/service/sqs v1.44.0/go.mod h1:d7eKytKiwDFJeNAP4VWo47VCiNM9z539tkoCGJ6PjXw=
github.com/aws/aws-sdk-go-v2/service/ssm v1.68.4 h1:5Wg8AAAnIWM2LE/0KFGqllZff96bm4dBs+uerYFfReE=
github.com/aws/aws-sdk-go-v2/service/ssm v1.68.4/go.mod h1:nph0ypDLWm9D9iA9zOX39W/N+A4GqwzlxA13jzXVD4k=
github.com/aws/aws-sdk-go-v2/service/sso v1.30.13 h1:kiIDLZ005EcKomYYITtfsjn7dtOwHDOFy7IbPXKek2o=
Expand All @@ -44,6 +52,8 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.41.9 h1:Cng+OOwCHmFljXIxpEVXAGMnBia8
github.com/aws/aws-sdk-go-v2/service/sts v1.41.9/go.mod h1:LrlIndBDdjA/EeXeyNBle+gyCwTlizzW5ycgWnvIxkk=
github.com/aws/smithy-go v1.24.2 h1:FzA3bu/nt/vDvmnkg+R8Xl46gmzEDam6mZ1hzmwXFng=
github.com/aws/smithy-go v1.24.2/go.mod h1:YE2RhdIuDbA5E5bTdciG9KrW3+TiEONeUWCqxX9i1Fc=
github.com/aws/smithy-go v1.27.1 h1:4T340VFndXtADGF52gYa1POyL7s9E4Z1OeZ1hCscIw8=
github.com/aws/smithy-go v1.27.1/go.mod h1:YE2RhdIuDbA5E5bTdciG9KrW3+TiEONeUWCqxX9i1Fc=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/go-cmdtest v0.4.1-0.20220921163831-55ab3332a786 h1:rcv+Ippz6RAtvaGgKxc+8FQIpxHgsF+HBzPyYL2cyVU=
Expand Down
120 changes: 75 additions & 45 deletions aws/logs_monitoring_go/internal/batching/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,90 +6,120 @@
package batching

import (
"bytes"
"context"
"encoding/json"
"fmt"
"log/slog"

"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/concurrent"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model"
)

const (
maxItemSize = 1 * 1024 * 1024
maxBatchSize = 5 * 1024 * 1024
maxItemsPerBatch = 1000
)
type Config struct {
maxItemSize int
maxBatchSize int
maxItemsPerBatch int
}

type Batcher struct {
batch [][]byte
func NewConfig(maxItemSize, maxBatchSize, maxItemsPerBatch int) Config {
return Config{
maxItemSize: maxItemSize,
maxBatchSize: maxBatchSize,
maxItemsPerBatch: maxItemsPerBatch,
Comment on lines +23 to +27

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had three solution to reuse the Batcher.Start() method :

  1. Keep the old functional option pattern and change the in channel from model.LogEntry to any. Too much restrictive: we would lose type safety and change the Handler signature.
  2. Keep the old functional option pattern, make the Batcher and the Option both generic. Too much verbose, e.g. batching.WithMaxItemSize[model.LogEntry](1*1024*1024) three times and it does not provide any value since only the Start method leverage genericity.
  3. Delete the old functional option pattern, use a config struct directly and make the batcher generic.

Opted for 3.

}
}

type Batcher[T any] struct {
Config
batch []json.RawMessage
batchSize int
}

func New() *Batcher {
return &Batcher{
batch: make([][]byte, 0, maxItemsPerBatch),
func New[T any](cfg Config) *Batcher[T] {
return &Batcher[T]{
Config: cfg,
batch: make([]json.RawMessage, 0, cfg.maxItemsPerBatch),
batchSize: 2, // '[' and ']'
}
}

func (b *Batcher) Batch(ctx context.Context, in <-chan model.LogEntry, out chan<- []byte) error {
func (b *Batcher[T]) Start(ctx context.Context, in <-chan T, out chan<- json.RawMessage) error {
for {
entry, ok, err := concurrent.SafeReader(ctx, in)
if err != nil {
return err
}
v, ok, _ := concurrent.SafeReader(ctx, in)
Comment thread
ge0Aja marked this conversation as resolved.
if !ok {
return b.flush(ctx, out)
batch, constructed, err := b.construct()
if err != nil {
return err
}

if constructed {
if err = concurrent.SafeSender(ctx, out, batch); err != nil {
return err
}
}
break
}

data, err := json.Marshal(entry)
item, err := json.Marshal(v)
if err != nil {
return fmt.Errorf("marshal: %w", err)
}

if len(data) > maxItemSize {
slog.Warn("log entry exceeds max item size, dropping",
slog.Int("size", len(data)),
slog.Int("max", maxItemSize),
if !b.valid(item) {
slog.Warn("invalid item, dropping",
slog.Int("size", len(item)),
slog.Int("max", b.maxItemSize),
)
continue
}

if b.batchSize+len(data) > maxBatchSize || len(b.batch) >= maxItemsPerBatch {
if err := b.flush(ctx, out); err != nil {
if ok := b.add(item); !ok {
batch, constructed, err := b.construct()
if err != nil {
return err
}
}
_ = b.add(item)
Comment thread
ge0Aja marked this conversation as resolved.

if !constructed {
continue
}

b.batch = append(b.batch, data)
b.batchSize += len(data)
if err = concurrent.SafeSender(ctx, out, batch); err != nil {
return err
}
}
}
return nil
}

func (b *Batcher) flush(ctx context.Context, out chan<- []byte) error {
if len(b.batch) == 0 {
return nil
func (b *Batcher[T]) add(item json.RawMessage) bool {
if (b.maxItemsPerBatch != 0 && len(b.batch) >= b.maxItemsPerBatch) || b.batchSize+len(item)+1 > b.maxBatchSize {
return false
}

payload := assembleBatch(b.batch)
b.batch = b.batch[:0]
b.batchSize = 0
b.batch = append(b.batch, item)
b.batchSize += len(item) + 1
return true
}

return concurrent.SafeSender(ctx, out, payload)
func (b *Batcher[T]) valid(item json.RawMessage) bool {
return len(item) <= b.maxItemSize
}

func assembleBatch(entries [][]byte) []byte {
var buf bytes.Buffer
func (b *Batcher[T]) construct() (json.RawMessage, bool, error) {
if len(b.batch) == 0 {
return nil, false, nil
}

buf.WriteByte('[')
for i, entry := range entries {
if i > 0 {
buf.WriteByte(',')
}
buf.Write(entry)
batch, err := json.Marshal(&b.batch)
if err != nil {
return nil, false, fmt.Errorf("marshal: %w", err)
}
buf.WriteByte(']')

return buf.Bytes()
b.reset()
return json.RawMessage(batch), true, nil
}

func (b *Batcher[T]) reset() {
b.batch = b.batch[:0]
b.batchSize = 2
}
72 changes: 29 additions & 43 deletions aws/logs_monitoring_go/internal/batching/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ package batching

import (
"encoding/json"
"strings"
"testing"

"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand All @@ -19,67 +18,54 @@ func TestBatch(t *testing.T) {
t.Parallel()

tests := map[string]struct {
entries []model.LogEntry
wantBatchCount int
wantEntryCounts []int
items []any
wantBatchItems []int
}{
"empty": {
entries: nil,
wantBatchCount: 0,
items: nil,
wantBatchItems: nil,
},
"single entry": {
entries: []model.LogEntry{model.NewLogEntry()},
wantBatchCount: 1,
wantEntryCounts: []int{1},
items: []any{testutil.GenerateJSONLogs(t, 1024)},
wantBatchItems: []int{1},
},
"multiple entries, one batch": {
entries: []model.LogEntry{model.NewLogEntry(), model.NewLogEntry(), model.NewLogEntry()},
wantBatchCount: 1,
wantEntryCounts: []int{3},
items: []any{testutil.GenerateJSONLogs(t, 1024), testutil.GenerateJSONLogs(t, 1024), testutil.GenerateJSONLogs(t, 1024)},
wantBatchItems: []int{3},
},
"drop oversized entry": {
entries: func() []model.LogEntry {
entry := model.NewLogEntry()
entry.Message = strings.Repeat("a", maxItemSize+1)
return []model.LogEntry{entry}
}(),
wantBatchCount: 0,
wantEntryCounts: nil,
items: []any{testutil.GenerateJSONLogs(t, 1*1024*1024+1)},
wantBatchItems: nil,
},
"split": {
entries: make([]model.LogEntry, maxItemsPerBatch+1),
wantBatchCount: 2,
wantEntryCounts: []int{maxItemsPerBatch, 1},
items: func() (items []any) {
for range 1001 {
items = append(items, testutil.GenerateJSONLog(t, 100))
}
return
}(),
wantBatchItems: []int{1000, 1},
},
}

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
t.Parallel()

in := make(chan model.LogEntry, len(tc.entries))
out := make(chan []byte, len(tc.wantEntryCounts))
for _, entry := range tc.entries {
in <- entry
}
close(in)
cfg := Config{maxItemSize: 1 * 1024 * 1024, maxBatchSize: 5 * 1024 * 1024, maxItemsPerBatch: 1000}
batcher := New[any](cfg)
in := testutil.ToChannel(t, tc.items)
out := make(chan json.RawMessage, len(tc.wantBatchItems))

batcher := New()
err := batcher.Batch(t.Context(), in, out)
require.NoError(t, err)
err := batcher.Start(t.Context(), in, out)
close(out)

var batches [][]byte
for b := range out {
batches = append(batches, b)
}

require.Len(t, batches, tc.wantBatchCount)

for i, wantCount := range tc.wantEntryCounts {
var entries []model.LogEntry
require.NoError(t, json.Unmarshal(batches[i], &entries))
assert.Len(t, entries, wantCount)
got := testutil.Drain(t, out)
require.NoError(t, err)
for i := range len(tc.wantBatchItems) {
var gotItems []json.RawMessage
_ = json.Unmarshal(got[i], &gotItems)
assert.Equal(t, len(gotItems), tc.wantBatchItems[i])
}
})
}
Expand Down
3 changes: 3 additions & 0 deletions aws/logs_monitoring_go/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const (
EnvIncludeAtMatch = "INCLUDE_AT_MATCH"
EnvExcludeAtMatch = "EXCLUDE_AT_MATCH"
EnvS3RetryBucketName = "DD_S3_BUCKET_NAME"
EnvSQSQueueURL = "DD_SQS_QUEUE_URL"
EnvStoreFailedEvents = "DD_STORE_FAILED_EVENTS"
ForwarderVersion = "6.0"
)
Expand Down Expand Up @@ -76,6 +77,7 @@ type Config struct {
ScrubEmail bool
StoreOnFail bool
S3RetryBucketName string
SQSQueueURL string
}

func Load() (*Config, error) {
Expand Down Expand Up @@ -136,6 +138,7 @@ func (c *Config) loadEnv() {

c.StoreOnFail = envOrDefaultBool(EnvStoreFailedEvents, false)
c.S3RetryBucketName = envOrDefault(EnvS3RetryBucketName, "")
c.SQSQueueURL = envOrDefault(EnvSQSQueueURL, "")

c.ScrubbingReplacement = envOrDefault(EnvScrubbingRuleReplacement, "")
c.ScrubIP = envOrDefaultBool(EnvRedactIP, false)
Expand Down
Loading
Loading