diff --git a/aws/logs_monitoring_go/cmd/forwarder/main.go b/aws/logs_monitoring_go/cmd/forwarder/main.go index 98fcf02a9..5558c2b9a 100644 --- a/aws/logs_monitoring_go/cmd/forwarder/main.go +++ b/aws/logs_monitoring_go/cmd/forwarder/main.go @@ -67,8 +67,8 @@ func handleRequest(cfg *config.Config) func(ctx context.Context, awsevent json.R var storage storing.Storage var storageErr error if cfg.StoreOnFail { - storageOpts := storing.Options{S3Bucket: cfg.S3RetryBucketName} - if storage, storageErr = storing.NewStorage(ctx, storageOpts); storageErr != nil { + storageOpts := storing.Options{S3Bucket: cfg.S3RetryBucketName, SQSQueue: cfg.SQSQueueURL} + if storage, storageErr = storing.NewStorage(storageOpts); storageErr != nil { return nil, fmt.Errorf("new storage: %w", storageErr) } } diff --git a/aws/logs_monitoring_go/go.mod b/aws/logs_monitoring_go/go.mod index aec94161a..e3aa0bb9b 100644 --- a/aws/logs_monitoring_go/go.mod +++ b/aws/logs_monitoring_go/go.mod @@ -4,7 +4,7 @@ go 1.26 require ( github.com/aws/aws-lambda-go v1.53.0 - github.com/aws/aws-sdk-go-v2 v1.41.5 + github.com/aws/aws-sdk-go-v2 v1.42.0 github.com/aws/aws-sdk-go-v2/config v1.32.12 github.com/aws/aws-sdk-go-v2/service/kms v1.50.4 github.com/aws/aws-sdk-go-v2/service/s3 v1.99.0 @@ -19,8 +19,8 @@ require ( github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.8 // indirect github.com/aws/aws-sdk-go-v2/credentials v1.19.12 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.20 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.21 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.21 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.29 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.29 // indirect github.com/aws/aws-sdk-go-v2/internal/ini v1.8.6 // indirect github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.22 // indirect github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.7 // indirect @@ -28,10 +28,11 @@ require ( github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.21 // indirect github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.21 // indirect github.com/aws/aws-sdk-go-v2/service/signin v1.0.8 // indirect + github.com/aws/aws-sdk-go-v2/service/sqs v1.44.0 // indirect github.com/aws/aws-sdk-go-v2/service/sso v1.30.13 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.17 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.41.9 // indirect - github.com/aws/smithy-go v1.24.2 // indirect + github.com/aws/smithy-go v1.27.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/google/go-cmp v0.7.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/aws/logs_monitoring_go/go.sum b/aws/logs_monitoring_go/go.sum index fe0c5dc23..70a4ba934 100644 --- a/aws/logs_monitoring_go/go.sum +++ b/aws/logs_monitoring_go/go.sum @@ -2,6 +2,8 @@ github.com/aws/aws-lambda-go v1.53.0 h1:uAMv6W/vCP/L494BAUSxe+8KVBIPK+SGPyapFt3F github.com/aws/aws-lambda-go v1.53.0/go.mod h1:dpMpZgvWx5vuQJfBt0zqBha60q7Dd7RfgJv23DymV8A= github.com/aws/aws-sdk-go-v2 v1.41.5 h1:dj5kopbwUsVUVFgO4Fi5BIT3t4WyqIDjGKCangnV/yY= github.com/aws/aws-sdk-go-v2 v1.41.5/go.mod h1:mwsPRE8ceUUpiTgF7QmQIJ7lgsKUPQOUl3o72QBrE1o= +github.com/aws/aws-sdk-go-v2 v1.42.0 h1:XvXMJTkFQtpBKIWZnmr9ZEOc2InWM2yldjXEJ/bymhA= +github.com/aws/aws-sdk-go-v2 v1.42.0/go.mod h1:27+ACypSLljLAEKsCYOmrjKh83vuTRkuAe9Uv/3A4bg= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.8 h1:eBMB84YGghSocM7PsjmmPffTa+1FBUeNvGvFou6V/4o= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.8/go.mod h1:lyw7GFp3qENLh7kwzf7iMzAxDn+NzjXEAGjKS2UOKqI= github.com/aws/aws-sdk-go-v2/config v1.32.12 h1:O3csC7HUGn2895eNrLytOJQdoL2xyJy0iYXhoZ1OmP0= @@ -12,8 +14,12 @@ github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.20 h1:zOgq3uezl5nznfoK3ODuqb github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.20/go.mod h1:z/MVwUARehy6GAg/yQ1GO2IMl0k++cu1ohP9zo887wE= github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.21 h1:Rgg6wvjjtX8bNHcvi9OnXWwcE0a2vGpbwmtICOsvcf4= github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.21/go.mod h1:A/kJFst/nm//cyqonihbdpQZwiUhhzpqTsdbhDdRF9c= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.29 h1:f3vKqSo13fhTYb+JEcXwXefZQE26I1FB5eTSniU67ko= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.29/go.mod h1:MzoLFUArKGpGD+ukmPiTPG1X5x4o6M2kq4v2dr1FiEc= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.21 h1:PEgGVtPoB6NTpPrBgqSE5hE/o47Ij9qk/SEZFbUOe9A= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.21/go.mod h1:p+hz+PRAYlY3zcpJhPwXlLC4C+kqn70WIHwnzAfs6ps= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.29 h1:RdwIf/CuUsvJX3RgJagbOyotl/cxoLY4xviKuE7p2GY= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.29/go.mod h1:71wt8W2EgswdZy9Mf9KNnzxZ3TiZlv4caKghPktDOkA= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.6 h1:qYQ4pzQ2Oz6WpQ8T3HvGHnZydA72MnLuFK9tJwmrbHw= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.6/go.mod h1:O3h0IK87yXci+kg6flUKzJnWeziQUKciKrLjcatSNcY= github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.22 h1:rWyie/PxDRIdhNf4DzRk0lvjVOqFJuNnO8WwaIRVxzQ= @@ -34,6 +40,8 @@ github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.41.4 h1:9aZbO86sraeCIHHCp github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.41.4/go.mod h1:cxiXDhEzIq7Xx1BtmC4lGBK3SwAZ79+EUWiKawYHo14= github.com/aws/aws-sdk-go-v2/service/signin v1.0.8 h1:0GFOLzEbOyZABS3PhYfBIx2rNBACYcKty+XGkTgw1ow= github.com/aws/aws-sdk-go-v2/service/signin v1.0.8/go.mod h1:LXypKvk85AROkKhOG6/YEcHFPoX+prKTowKnVdcaIxE= +github.com/aws/aws-sdk-go-v2/service/sqs v1.44.0 h1:6DQ95Zq5xPUSYm6KWcrar7zvreqAMFmyynYCcmzv6yc= +github.com/aws/aws-sdk-go-v2/service/sqs v1.44.0/go.mod h1:d7eKytKiwDFJeNAP4VWo47VCiNM9z539tkoCGJ6PjXw= github.com/aws/aws-sdk-go-v2/service/ssm v1.68.4 h1:5Wg8AAAnIWM2LE/0KFGqllZff96bm4dBs+uerYFfReE= github.com/aws/aws-sdk-go-v2/service/ssm v1.68.4/go.mod h1:nph0ypDLWm9D9iA9zOX39W/N+A4GqwzlxA13jzXVD4k= github.com/aws/aws-sdk-go-v2/service/sso v1.30.13 h1:kiIDLZ005EcKomYYITtfsjn7dtOwHDOFy7IbPXKek2o= @@ -44,6 +52,8 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.41.9 h1:Cng+OOwCHmFljXIxpEVXAGMnBia8 github.com/aws/aws-sdk-go-v2/service/sts v1.41.9/go.mod h1:LrlIndBDdjA/EeXeyNBle+gyCwTlizzW5ycgWnvIxkk= github.com/aws/smithy-go v1.24.2 h1:FzA3bu/nt/vDvmnkg+R8Xl46gmzEDam6mZ1hzmwXFng= github.com/aws/smithy-go v1.24.2/go.mod h1:YE2RhdIuDbA5E5bTdciG9KrW3+TiEONeUWCqxX9i1Fc= +github.com/aws/smithy-go v1.27.1 h1:4T340VFndXtADGF52gYa1POyL7s9E4Z1OeZ1hCscIw8= +github.com/aws/smithy-go v1.27.1/go.mod h1:YE2RhdIuDbA5E5bTdciG9KrW3+TiEONeUWCqxX9i1Fc= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/google/go-cmdtest v0.4.1-0.20220921163831-55ab3332a786 h1:rcv+Ippz6RAtvaGgKxc+8FQIpxHgsF+HBzPyYL2cyVU= diff --git a/aws/logs_monitoring_go/internal/batching/batcher.go b/aws/logs_monitoring_go/internal/batching/batcher.go index c25b2736f..d3af145f7 100644 --- a/aws/logs_monitoring_go/internal/batching/batcher.go +++ b/aws/logs_monitoring_go/internal/batching/batcher.go @@ -6,90 +6,120 @@ package batching import ( - "bytes" "context" "encoding/json" "fmt" "log/slog" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/concurrent" - "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" ) -const ( - maxItemSize = 1 * 1024 * 1024 - maxBatchSize = 5 * 1024 * 1024 - maxItemsPerBatch = 1000 -) +type Config struct { + maxItemSize int + maxBatchSize int + maxItemsPerBatch int +} -type Batcher struct { - batch [][]byte +func NewConfig(maxItemSize, maxBatchSize, maxItemsPerBatch int) Config { + return Config{ + maxItemSize: maxItemSize, + maxBatchSize: maxBatchSize, + maxItemsPerBatch: maxItemsPerBatch, + } +} + +type Batcher[T any] struct { + Config + batch []json.RawMessage batchSize int } -func New() *Batcher { - return &Batcher{ - batch: make([][]byte, 0, maxItemsPerBatch), +func New[T any](cfg Config) *Batcher[T] { + return &Batcher[T]{ + Config: cfg, + batch: make([]json.RawMessage, 0, cfg.maxItemsPerBatch), + batchSize: 2, // '[' and ']' } } -func (b *Batcher) Batch(ctx context.Context, in <-chan model.LogEntry, out chan<- []byte) error { +func (b *Batcher[T]) Start(ctx context.Context, in <-chan T, out chan<- json.RawMessage) error { for { - entry, ok, err := concurrent.SafeReader(ctx, in) - if err != nil { - return err - } + v, ok, _ := concurrent.SafeReader(ctx, in) if !ok { - return b.flush(ctx, out) + batch, constructed, err := b.construct() + if err != nil { + return err + } + + if constructed { + if err = concurrent.SafeSender(ctx, out, batch); err != nil { + return err + } + } + break } - data, err := json.Marshal(entry) + item, err := json.Marshal(v) if err != nil { return fmt.Errorf("marshal: %w", err) } - if len(data) > maxItemSize { - slog.Warn("log entry exceeds max item size, dropping", - slog.Int("size", len(data)), - slog.Int("max", maxItemSize), + if !b.valid(item) { + slog.Warn("invalid item, dropping", + slog.Int("size", len(item)), + slog.Int("max", b.maxItemSize), ) continue } - if b.batchSize+len(data) > maxBatchSize || len(b.batch) >= maxItemsPerBatch { - if err := b.flush(ctx, out); err != nil { + if ok := b.add(item); !ok { + batch, constructed, err := b.construct() + if err != nil { return err } - } + _ = b.add(item) + + if !constructed { + continue + } - b.batch = append(b.batch, data) - b.batchSize += len(data) + if err = concurrent.SafeSender(ctx, out, batch); err != nil { + return err + } + } } + return nil } -func (b *Batcher) flush(ctx context.Context, out chan<- []byte) error { - if len(b.batch) == 0 { - return nil +func (b *Batcher[T]) add(item json.RawMessage) bool { + if (b.maxItemsPerBatch != 0 && len(b.batch) >= b.maxItemsPerBatch) || b.batchSize+len(item)+1 > b.maxBatchSize { + return false } - payload := assembleBatch(b.batch) - b.batch = b.batch[:0] - b.batchSize = 0 + b.batch = append(b.batch, item) + b.batchSize += len(item) + 1 + return true +} - return concurrent.SafeSender(ctx, out, payload) +func (b *Batcher[T]) valid(item json.RawMessage) bool { + return len(item) <= b.maxItemSize } -func assembleBatch(entries [][]byte) []byte { - var buf bytes.Buffer +func (b *Batcher[T]) construct() (json.RawMessage, bool, error) { + if len(b.batch) == 0 { + return nil, false, nil + } - buf.WriteByte('[') - for i, entry := range entries { - if i > 0 { - buf.WriteByte(',') - } - buf.Write(entry) + batch, err := json.Marshal(&b.batch) + if err != nil { + return nil, false, fmt.Errorf("marshal: %w", err) } - buf.WriteByte(']') - return buf.Bytes() + b.reset() + return json.RawMessage(batch), true, nil +} + +func (b *Batcher[T]) reset() { + b.batch = b.batch[:0] + b.batchSize = 2 } diff --git a/aws/logs_monitoring_go/internal/batching/batcher_test.go b/aws/logs_monitoring_go/internal/batching/batcher_test.go index d0083d0e2..53aecbcbf 100644 --- a/aws/logs_monitoring_go/internal/batching/batcher_test.go +++ b/aws/logs_monitoring_go/internal/batching/batcher_test.go @@ -7,10 +7,9 @@ package batching import ( "encoding/json" - "strings" "testing" - "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -19,37 +18,33 @@ func TestBatch(t *testing.T) { t.Parallel() tests := map[string]struct { - entries []model.LogEntry - wantBatchCount int - wantEntryCounts []int + items []any + wantBatchItems []int }{ "empty": { - entries: nil, - wantBatchCount: 0, + items: nil, + wantBatchItems: nil, }, "single entry": { - entries: []model.LogEntry{model.NewLogEntry()}, - wantBatchCount: 1, - wantEntryCounts: []int{1}, + items: []any{testutil.GenerateJSONLogs(t, 1024)}, + wantBatchItems: []int{1}, }, "multiple entries, one batch": { - entries: []model.LogEntry{model.NewLogEntry(), model.NewLogEntry(), model.NewLogEntry()}, - wantBatchCount: 1, - wantEntryCounts: []int{3}, + items: []any{testutil.GenerateJSONLogs(t, 1024), testutil.GenerateJSONLogs(t, 1024), testutil.GenerateJSONLogs(t, 1024)}, + wantBatchItems: []int{3}, }, "drop oversized entry": { - entries: func() []model.LogEntry { - entry := model.NewLogEntry() - entry.Message = strings.Repeat("a", maxItemSize+1) - return []model.LogEntry{entry} - }(), - wantBatchCount: 0, - wantEntryCounts: nil, + items: []any{testutil.GenerateJSONLogs(t, 1*1024*1024+1)}, + wantBatchItems: nil, }, "split": { - entries: make([]model.LogEntry, maxItemsPerBatch+1), - wantBatchCount: 2, - wantEntryCounts: []int{maxItemsPerBatch, 1}, + items: func() (items []any) { + for range 1001 { + items = append(items, testutil.GenerateJSONLog(t, 100)) + } + return + }(), + wantBatchItems: []int{1000, 1}, }, } @@ -57,29 +52,20 @@ func TestBatch(t *testing.T) { t.Run(name, func(t *testing.T) { t.Parallel() - in := make(chan model.LogEntry, len(tc.entries)) - out := make(chan []byte, len(tc.wantEntryCounts)) - for _, entry := range tc.entries { - in <- entry - } - close(in) + cfg := Config{maxItemSize: 1 * 1024 * 1024, maxBatchSize: 5 * 1024 * 1024, maxItemsPerBatch: 1000} + batcher := New[any](cfg) + in := testutil.ToChannel(t, tc.items) + out := make(chan json.RawMessage, len(tc.wantBatchItems)) - batcher := New() - err := batcher.Batch(t.Context(), in, out) - require.NoError(t, err) + err := batcher.Start(t.Context(), in, out) close(out) - var batches [][]byte - for b := range out { - batches = append(batches, b) - } - - require.Len(t, batches, tc.wantBatchCount) - - for i, wantCount := range tc.wantEntryCounts { - var entries []model.LogEntry - require.NoError(t, json.Unmarshal(batches[i], &entries)) - assert.Len(t, entries, wantCount) + got := testutil.Drain(t, out) + require.NoError(t, err) + for i := range len(tc.wantBatchItems) { + var gotItems []json.RawMessage + _ = json.Unmarshal(got[i], &gotItems) + assert.Equal(t, len(gotItems), tc.wantBatchItems[i]) } }) } diff --git a/aws/logs_monitoring_go/internal/config/config.go b/aws/logs_monitoring_go/internal/config/config.go index 3ea136856..50f0db601 100644 --- a/aws/logs_monitoring_go/internal/config/config.go +++ b/aws/logs_monitoring_go/internal/config/config.go @@ -42,6 +42,7 @@ const ( EnvIncludeAtMatch = "INCLUDE_AT_MATCH" EnvExcludeAtMatch = "EXCLUDE_AT_MATCH" EnvS3RetryBucketName = "DD_S3_BUCKET_NAME" + EnvSQSQueueURL = "DD_SQS_QUEUE_URL" EnvStoreFailedEvents = "DD_STORE_FAILED_EVENTS" ForwarderVersion = "6.0" ) @@ -76,6 +77,7 @@ type Config struct { ScrubEmail bool StoreOnFail bool S3RetryBucketName string + SQSQueueURL string } func Load() (*Config, error) { @@ -136,6 +138,7 @@ func (c *Config) loadEnv() { c.StoreOnFail = envOrDefaultBool(EnvStoreFailedEvents, false) c.S3RetryBucketName = envOrDefault(EnvS3RetryBucketName, "") + c.SQSQueueURL = envOrDefault(EnvSQSQueueURL, "") c.ScrubbingReplacement = envOrDefault(EnvScrubbingRuleReplacement, "") c.ScrubIP = envOrDefaultBool(EnvRedactIP, false) diff --git a/aws/logs_monitoring_go/internal/forwarding/forwarding.go b/aws/logs_monitoring_go/internal/forwarding/forwarding.go index bf44b0480..db04a5747 100644 --- a/aws/logs_monitoring_go/internal/forwarding/forwarding.go +++ b/aws/logs_monitoring_go/internal/forwarding/forwarding.go @@ -9,10 +9,10 @@ import ( "bytes" "compress/gzip" "context" + "encoding/json" "errors" "fmt" "io" - "log/slog" "net/http" "sync" @@ -25,6 +25,14 @@ import ( "golang.org/x/sync/errgroup" ) +const ( + ddRetryHeader = "DD-RETRY-TAG" + maxLogSize = 1 * 1024 * 1024 + maxBatchSize = 5 * 1024 * 1024 + maxLogsPerBatch = 1000 + maxBatchesInMemory = 12 // 12 * 5MiB = 60MiB maximum +) + var bufPool = sync.Pool{ New: func() any { return new(bytes.Buffer) }, } @@ -69,10 +77,11 @@ func (f *Forwarder) Start(ctx context.Context, in <-chan model.LogEntry, storage eg, ctx := errgroup.WithContext(ctx) eg.SetLimit(1 + httpclient.MaxConcurrency) - batches := make(chan []byte) + batches := make(chan json.RawMessage, maxBatchesInMemory) eg.Go(func() error { defer close(batches) - if err := batching.New().Batch(ctx, in, batches); err != nil { + cfg := batching.NewConfig(maxLogSize, maxBatchSize, maxLogsPerBatch) + if err := batching.New[model.LogEntry](cfg).Start(ctx, in, batches); err != nil { return fmt.Errorf("batch: %w", err) } return nil @@ -101,8 +110,8 @@ func (f *Forwarder) Start(ctx context.Context, in <-chan model.LogEntry, storage break } - if err := f.storage.Put(ctx, batch, storageTag); err != nil { - return fmt.Errorf("put: %w", err) + if err := f.storage.Store(ctx, storing.Batch{Data: batch, StorageTag: storageTag}); err != nil { + return fmt.Errorf("store: %w", err) } } return nil @@ -113,33 +122,31 @@ func (f *Forwarder) Start(ctx context.Context, in <-chan model.LogEntry, storage } func (f *Forwarder) Retry(ctx context.Context) error { - keys, listErr := f.storage.List(ctx) - if listErr != nil { - return fmt.Errorf("list: %w", listErr) + if f.storage == nil { + return nil } - slog.InfoContext(ctx, "retrying stored batches", slog.Int("count", len(keys))) - for _, key := range keys { - payload, storageTag, getErr := f.storage.Get(ctx, key) - if getErr != nil { - return fmt.Errorf("get: %w", getErr) + f.header.Add(ddRetryHeader, "true") + defer func() { f.header.Del(ddRetryHeader) }() + + for batch, err := range f.storage.Fetch(ctx) { + if err != nil { + return fmt.Errorf("fetch: %w", err) } - if sendErr := f.send(ctx, payload, storageTag); sendErr != nil { + if sendErr := f.send(ctx, batch.Data, batch.StorageTag); sendErr != nil { return fmt.Errorf("send: %w", sendErr) } - if deleteErr := f.storage.Delete(ctx, key); deleteErr != nil { + if deleteErr := f.storage.Delete(ctx, batch); deleteErr != nil { return fmt.Errorf("delete: %w", deleteErr) } - - slog.DebugContext(ctx, "batch sent successfully", slog.String("key", key)) } return nil } -func (f *Forwarder) send(ctx context.Context, payload []byte, storageTag string) error { +func (f *Forwarder) send(ctx context.Context, payload json.RawMessage, storageTag string) error { ctx, cancel := context.WithTimeout(ctx, httpclient.RequestTimeout) defer cancel() @@ -184,7 +191,7 @@ func (f *Forwarder) send(ctx context.Context, payload []byte, storageTag string) return nil } -func (f *Forwarder) compress(payload []byte) ([]byte, error) { +func (f *Forwarder) compress(payload json.RawMessage) (json.RawMessage, error) { buf := bufPool.Get().(*bytes.Buffer) gz := f.gzipPool.Get().(*gzip.Writer) defer bufPool.Put(buf) diff --git a/aws/logs_monitoring_go/internal/sdkclient/sqs.go b/aws/logs_monitoring_go/internal/sdkclient/sqs.go new file mode 100644 index 000000000..3cc8e1f6b --- /dev/null +++ b/aws/logs_monitoring_go/internal/sdkclient/sqs.go @@ -0,0 +1,29 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package sdkclient + +//go:generate go tool mockgen -source=sqs.go -package=$GOPACKAGE -destination=sqs_mockgen.go + +import ( + "context" + "sync" + + "github.com/aws/aws-sdk-go-v2/service/sqs" +) + +var GetSQS = sync.OnceValues(func() (SQS, error) { + cfg, err := AWSConfig() + if err != nil { + return nil, err + } + return sqs.NewFromConfig(cfg), nil +}) + +type SQS interface { + SendMessage(ctx context.Context, params *sqs.SendMessageInput, optFns ...func(*sqs.Options)) (*sqs.SendMessageOutput, error) + ReceiveMessage(ctx context.Context, params *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) + DeleteMessage(ctx context.Context, params *sqs.DeleteMessageInput, optFns ...func(*sqs.Options)) (*sqs.DeleteMessageOutput, error) +} diff --git a/aws/logs_monitoring_go/internal/sdkclient/sqs_mockgen.go b/aws/logs_monitoring_go/internal/sdkclient/sqs_mockgen.go new file mode 100644 index 000000000..02dd37d9e --- /dev/null +++ b/aws/logs_monitoring_go/internal/sdkclient/sqs_mockgen.go @@ -0,0 +1,102 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: sqs.go +// +// Generated by this command: +// +// mockgen -source=sqs.go -package=sdkclient -destination=sqs_mockgen.go +// + +// Package sdkclient is a generated GoMock package. +package sdkclient + +import ( + context "context" + reflect "reflect" + + sqs "github.com/aws/aws-sdk-go-v2/service/sqs" + gomock "go.uber.org/mock/gomock" +) + +// MockSQS is a mock of SQS interface. +type MockSQS struct { + ctrl *gomock.Controller + recorder *MockSQSMockRecorder + isgomock struct{} +} + +// MockSQSMockRecorder is the mock recorder for MockSQS. +type MockSQSMockRecorder struct { + mock *MockSQS +} + +// NewMockSQS creates a new mock instance. +func NewMockSQS(ctrl *gomock.Controller) *MockSQS { + mock := &MockSQS{ctrl: ctrl} + mock.recorder = &MockSQSMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockSQS) EXPECT() *MockSQSMockRecorder { + return m.recorder +} + +// DeleteMessage mocks base method. +func (m *MockSQS) DeleteMessage(ctx context.Context, params *sqs.DeleteMessageInput, optFns ...func(*sqs.Options)) (*sqs.DeleteMessageOutput, error) { + m.ctrl.T.Helper() + varargs := []any{ctx, params} + for _, a := range optFns { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "DeleteMessage", varargs...) + ret0, _ := ret[0].(*sqs.DeleteMessageOutput) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DeleteMessage indicates an expected call of DeleteMessage. +func (mr *MockSQSMockRecorder) DeleteMessage(ctx, params any, optFns ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{ctx, params}, optFns...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteMessage", reflect.TypeOf((*MockSQS)(nil).DeleteMessage), varargs...) +} + +// ReceiveMessage mocks base method. +func (m *MockSQS) ReceiveMessage(ctx context.Context, params *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) { + m.ctrl.T.Helper() + varargs := []any{ctx, params} + for _, a := range optFns { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "ReceiveMessage", varargs...) + ret0, _ := ret[0].(*sqs.ReceiveMessageOutput) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ReceiveMessage indicates an expected call of ReceiveMessage. +func (mr *MockSQSMockRecorder) ReceiveMessage(ctx, params any, optFns ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{ctx, params}, optFns...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReceiveMessage", reflect.TypeOf((*MockSQS)(nil).ReceiveMessage), varargs...) +} + +// SendMessage mocks base method. +func (m *MockSQS) SendMessage(ctx context.Context, params *sqs.SendMessageInput, optFns ...func(*sqs.Options)) (*sqs.SendMessageOutput, error) { + m.ctrl.T.Helper() + varargs := []any{ctx, params} + for _, a := range optFns { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "SendMessage", varargs...) + ret0, _ := ret[0].(*sqs.SendMessageOutput) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// SendMessage indicates an expected call of SendMessage. +func (mr *MockSQSMockRecorder) SendMessage(ctx, params any, optFns ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{ctx, params}, optFns...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMessage", reflect.TypeOf((*MockSQS)(nil).SendMessage), varargs...) +} diff --git a/aws/logs_monitoring_go/internal/storing/s3.go b/aws/logs_monitoring_go/internal/storing/s3.go index 3aad01123..89b9364b8 100644 --- a/aws/logs_monitoring_go/internal/storing/s3.go +++ b/aws/logs_monitoring_go/internal/storing/s3.go @@ -10,6 +10,7 @@ import ( "context" "fmt" "io" + "iter" "log/slog" "sync/atomic" "time" @@ -18,6 +19,7 @@ import ( "github.com/aws/aws-lambda-go/lambdacontext" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" ) const prefix = "failed_events/" @@ -33,7 +35,7 @@ func newS3(client sdkclient.S3, bucket string) *S3 { return &S3{client: client, bucket: bucket} } -func (s *S3) Put(ctx context.Context, batch []byte, storageTag string) error { +func (s *S3) Store(ctx context.Context, batch Batch) error { invocationID := "unknown" if lc, ok := lambdacontext.FromContext(ctx); ok { invocationID = lc.AwsRequestID @@ -45,8 +47,8 @@ func (s *S3) Put(ctx context.Context, batch []byte, storageTag string) error { _, err := s.client.PutObject(ctx, &s3.PutObjectInput{ Bucket: aws.String(s.bucket), Key: aws.String(key), - Body: bytes.NewReader(batch), - Metadata: map[string]string{metadataStorageTagKey: storageTag}, + Body: bytes.NewReader(batch.Data), + Metadata: map[string]string{metadataStorageTagKey: batch.StorageTag}, }) if err != nil { return err @@ -55,51 +57,63 @@ func (s *S3) Put(ctx context.Context, batch []byte, storageTag string) error { return nil } -func (s *S3) List(ctx context.Context) ([]string, error) { - out, err := s.client.ListObjectsV2(ctx, - &s3.ListObjectsV2Input{ - Bucket: aws.String(s.bucket), - Prefix: aws.String(prefix), - }, - ) - if err != nil { - return nil, err - } +func (s *S3) Fetch(ctx context.Context) iter.Seq2[Batch, error] { + return func(yield func(Batch, error) bool) { + listOut, err := s.client.ListObjectsV2(ctx, + &s3.ListObjectsV2Input{ + Bucket: aws.String(s.bucket), + Prefix: aws.String(prefix), + }, + ) + if err != nil { + yield(Batch{}, fmt.Errorf("list objects: %w", err)) + return + } - keys := make([]string, 0, len(out.Contents)) - for _, obj := range out.Contents { - keys = append(keys, aws.ToString(obj.Key)) - } + for _, object := range listOut.Contents { + storedBatch, err := s.getBatch(ctx, object) + if err != nil { + yield(storedBatch, err) + return + } - return keys, nil + if !yield(storedBatch, err) { + return + } + } + } } -func (s *S3) Get(ctx context.Context, key string) ([]byte, string, error) { +func (s *S3) getBatch(ctx context.Context, object types.Object) (Batch, error) { out, err := s.client.GetObject(ctx, &s3.GetObjectInput{ Bucket: aws.String(s.bucket), - Key: aws.String(key), + Key: aws.String(aws.ToString(object.Key)), }) if err != nil { - return nil, "", err + return Batch{}, fmt.Errorf("get object: %w", err) } defer func() { if err := out.Body.Close(); err != nil { - slog.WarnContext(ctx, "closing body", slog.Any("error", err)) + slog.WarnContext(ctx, "close body", slog.Any("error", err)) } }() batch, err := io.ReadAll(out.Body) if err != nil { - return nil, "", err + return Batch{}, fmt.Errorf("read body: %w", err) } - return batch, out.Metadata[metadataStorageTagKey], nil + return Batch{ + Data: batch, + StorageTag: out.Metadata[metadataStorageTagKey], + DeleteKey: aws.ToString(object.Key), + }, nil } -func (s *S3) Delete(ctx context.Context, key string) error { +func (s *S3) Delete(ctx context.Context, batch Batch) error { _, err := s.client.DeleteObject(ctx, &s3.DeleteObjectInput{ Bucket: aws.String(s.bucket), - Key: aws.String(key), + Key: aws.String(batch.DeleteKey), }) if err != nil { return err diff --git a/aws/logs_monitoring_go/internal/storing/s3_test.go b/aws/logs_monitoring_go/internal/storing/s3_test.go index a5e31c80c..c35d8137f 100644 --- a/aws/logs_monitoring_go/internal/storing/s3_test.go +++ b/aws/logs_monitoring_go/internal/storing/s3_test.go @@ -6,50 +6,49 @@ package storing import ( + "bytes" + "encoding/json" "errors" "io" - "strings" "testing" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/sdkclient" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/testutil" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" - "github.com/aws/aws-sdk-go-v2/service/s3/types" - "github.com/stretchr/testify/assert" + s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" ) -const ( - testBucket = "my-bucket" - testKey = "failed_events/2026/06/01/120000000_req-id_1.json" -) - -func TestS3_Put(t *testing.T) { +func TestS3_Store(t *testing.T) { t.Parallel() tests := map[string]struct { - mockSetup func(m *sdkclient.MockS3) - storageTag string - wantErr bool + mockSetup func(m *sdkclient.MockS3) + batch Batch + wantErr bool }{ "success": { mockSetup: func(m *sdkclient.MockS3) { m.EXPECT(). - PutObject(gomock.Any(), gomock.Cond(func(input *s3.PutObjectInput) bool { - return aws.ToString(input.Bucket) == testBucket && - strings.HasPrefix(aws.ToString(input.Key), prefix) && - input.Metadata["dd-storage-tag"] == "s3" - })). + PutObject(gomock.Any(), gomock.Any()). Return(nil, nil) }, - storageTag: "s3", + batch: Batch{ + Data: testutil.GenerateJSONLogs(t, 100), + StorageTag: "cloudwatch", + }, }, - "error": { + "error on PutObject call": { mockSetup: func(m *sdkclient.MockS3) { m.EXPECT(). PutObject(gomock.Any(), gomock.Any()). - Return(nil, errors.New("")) + Return(nil, errors.New("denied")) + }, + batch: Batch{ + Data: testutil.GenerateJSONLogs(t, 100), + StorageTag: "cloudwatch", }, wantErr: true, }, @@ -62,9 +61,9 @@ func TestS3_Put(t *testing.T) { ctrl := gomock.NewController(t) mock := sdkclient.NewMockS3(ctrl) tc.mockSetup(mock) - storage := newS3(mock, testBucket) + storage := newS3(mock, "my-bucket") - err := storage.Put(t.Context(), []byte{}, tc.storageTag) + err := storage.Store(t.Context(), tc.batch) if tc.wantErr { require.Error(t, err) @@ -75,95 +74,61 @@ func TestS3_Put(t *testing.T) { } } -func TestS3_List(t *testing.T) { +func TestS3_Fetch(t *testing.T) { t.Parallel() tests := map[string]struct { mockSetup func(m *sdkclient.MockS3) - wantKeys []string wantErr bool }{ "success": { mockSetup: func(m *sdkclient.MockS3) { - m.EXPECT(). - ListObjectsV2(gomock.Any(), gomock.Cond(func(input *s3.ListObjectsV2Input) bool { - return aws.ToString(input.Bucket) == testBucket && - aws.ToString(input.Prefix) == prefix - })). - Return(&s3.ListObjectsV2Output{ - Contents: []types.Object{ - {Key: aws.String("failed_events/2026/06/01/120000000_req-id_1.json")}, - {Key: aws.String("failed_events/2026/06/01/120000000_req-id_2.json")}, - {Key: aws.String("failed_events/2026/06/01/120000001_req-id_3.json")}, - }, - }, nil) - }, - wantKeys: []string{ - "failed_events/2026/06/01/120000000_req-id_1.json", - "failed_events/2026/06/01/120000000_req-id_2.json", - "failed_events/2026/06/01/120000001_req-id_3.json", + gomock.InOrder( + m.EXPECT(). + ListObjectsV2(gomock.Any(), gomock.Any()). + Return(&s3.ListObjectsV2Output{ + Contents: []s3types.Object{ + {Key: aws.String("failed_events/2026/06/01/120000000_req-id_1.json")}, + {Key: aws.String("failed_events/2026/06/01/120000000_req-id_2.json")}, + }, + }, nil), + m.EXPECT(). + GetObject(gomock.Any(), gomock.Any()). + Return(&s3.GetObjectOutput{ + Body: io.NopCloser(bytes.NewReader(json.RawMessage(`[]`))), + Metadata: map[string]string{}, + }, nil), + m.EXPECT(). + GetObject(gomock.Any(), gomock.Any()). + Return(&s3.GetObjectOutput{ + Body: io.NopCloser(bytes.NewReader(json.RawMessage(`[]`))), + Metadata: map[string]string{}, + }, nil), + ) }, }, - "error": { + "error on ListObjectsV2 call": { mockSetup: func(m *sdkclient.MockS3) { m.EXPECT(). ListObjectsV2(gomock.Any(), gomock.Any()). - Return(nil, errors.New("")) + Return(nil, errors.New("denied")) }, wantErr: true, }, - } - - for name, tc := range tests { - t.Run(name, func(t *testing.T) { - t.Parallel() - - ctrl := gomock.NewController(t) - mock := sdkclient.NewMockS3(ctrl) - tc.mockSetup(mock) - storage := newS3(mock, testBucket) - - keys, err := storage.List(t.Context()) - - if tc.wantErr { - require.Error(t, err) - return - } - require.NoError(t, err) - assert.Equal(t, tc.wantKeys, keys) - }) - } -} - -func TestS3_Get(t *testing.T) { - t.Parallel() - - tests := map[string]struct { - mockSetup func(m *sdkclient.MockS3) - wantBody []byte - wantStorageTag string - wantErr bool - }{ - "success": { - mockSetup: func(m *sdkclient.MockS3) { - m.EXPECT(). - GetObject(gomock.Any(), gomock.Cond(func(input *s3.GetObjectInput) bool { - return aws.ToString(input.Bucket) == testBucket && - aws.ToString(input.Key) == testKey - })). - Return(&s3.GetObjectOutput{ - Body: io.NopCloser(strings.NewReader(`[{"message":"hello"}]`)), - Metadata: map[string]string{"dd-storage-tag": "cloudwatch"}, - }, nil) - }, - wantBody: []byte(`[{"message":"hello"}]`), - wantStorageTag: "cloudwatch", - }, - "error": { + "error on GetObject call": { mockSetup: func(m *sdkclient.MockS3) { - m.EXPECT(). - GetObject(gomock.Any(), gomock.Any()). - Return(nil, errors.New("")) + gomock.InOrder( + m.EXPECT(). + ListObjectsV2(gomock.Any(), gomock.Any()). + Return(&s3.ListObjectsV2Output{ + Contents: []s3types.Object{ + {Key: aws.String("failed_events/2026/06/01/120000000_req-id_1.json")}, + }, + }, nil), + m.EXPECT(). + GetObject(gomock.Any(), gomock.Any()). + Return(nil, errors.New("denied")), + ) }, wantErr: true, }, @@ -176,17 +141,15 @@ func TestS3_Get(t *testing.T) { ctrl := gomock.NewController(t) mock := sdkclient.NewMockS3(ctrl) tc.mockSetup(mock) - storage := newS3(mock, testBucket) - - body, storageTag, err := storage.Get(t.Context(), testKey) - - if tc.wantErr { - require.Error(t, err) - return + storage := newS3(mock, "failed_events") + + for _, err := range storage.Fetch(t.Context()) { + if tc.wantErr { + require.Error(t, err) + return + } + require.NoError(t, err) } - require.NoError(t, err) - assert.Equal(t, tc.wantBody, body) - assert.Equal(t, tc.wantStorageTag, storageTag) }) } } @@ -196,23 +159,27 @@ func TestS3_Delete(t *testing.T) { tests := map[string]struct { mockSetup func(m *sdkclient.MockS3) + batch Batch wantErr bool }{ "success": { mockSetup: func(m *sdkclient.MockS3) { m.EXPECT(). - DeleteObject(gomock.Any(), gomock.Cond(func(input *s3.DeleteObjectInput) bool { - return aws.ToString(input.Bucket) == testBucket && - aws.ToString(input.Key) == testKey - })). + DeleteObject(gomock.Any(), gomock.Any()). Return(nil, nil) }, + batch: Batch{ + DeleteKey: "failed_events/2026/06/01/120000000_req-id_1.json", + }, }, - "error": { + "error on DeleteObject call": { mockSetup: func(m *sdkclient.MockS3) { m.EXPECT(). DeleteObject(gomock.Any(), gomock.Any()). - Return(nil, errors.New("")) + Return(nil, errors.New("denied")) + }, + batch: Batch{ + DeleteKey: "failed_events/2026/06/01/120000000_req-id_1.json", }, wantErr: true, }, @@ -225,9 +192,9 @@ func TestS3_Delete(t *testing.T) { ctrl := gomock.NewController(t) mock := sdkclient.NewMockS3(ctrl) tc.mockSetup(mock) - storage := newS3(mock, testBucket) + storage := newS3(mock, "my-bucket") - err := storage.Delete(t.Context(), testKey) + err := storage.Delete(t.Context(), tc.batch) if tc.wantErr { require.Error(t, err) diff --git a/aws/logs_monitoring_go/internal/storing/sqs.go b/aws/logs_monitoring_go/internal/storing/sqs.go new file mode 100644 index 000000000..dfdb97388 --- /dev/null +++ b/aws/logs_monitoring_go/internal/storing/sqs.go @@ -0,0 +1,136 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package storing + +import ( + "context" + "encoding/json" + "fmt" + "iter" + + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/batching" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/concurrent" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/sdkclient" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/aws/aws-sdk-go-v2/service/sqs/types" + "golang.org/x/sync/errgroup" +) + +const ( + maxSizePerSQSMessage = 1000 * 1024 // Overhead for other attributes than message body + maxNumberOfMessages = 10 + polling = 10 + visibilityTimeout = 6 * 60 + waitTimeSeconds = 0 +) + +type SQS struct { + client sdkclient.SQS + queue string +} + +func newSQS(client sdkclient.SQS, queue string) *SQS { + return &SQS{client: client, queue: queue} +} + +func (s *SQS) Store(ctx context.Context, batch Batch) error { + var logs []json.RawMessage + if err := json.Unmarshal(batch.Data, &logs); err != nil { + return fmt.Errorf("unmarshal: %w", err) + } + + eg, ctx := errgroup.WithContext(ctx) + + in := make(chan json.RawMessage, 10) + eg.Go(func() error { + defer close(in) + for _, log := range logs { + if err := concurrent.SafeSender(ctx, in, log); err != nil { + return err + } + } + return nil + }) + + out := make(chan json.RawMessage) + eg.Go(func() error { + defer close(out) + batcher := batching.New[json.RawMessage](batching.NewConfig(maxSizePerSQSMessage, maxSizePerSQSMessage, 0)) + return batcher.Start(ctx, in, out) + }) + + eg.Go(func() error { + for { + messageBody, ok, _ := concurrent.SafeReader(ctx, out) + if !ok { + break + } + + _, err := s.client.SendMessage(ctx, &sqs.SendMessageInput{ + QueueUrl: &s.queue, + MessageBody: aws.String(string(messageBody)), + MessageAttributes: map[string]types.MessageAttributeValue{ + metadataStorageTagKey: { + DataType: aws.String("String"), + StringValue: aws.String(batch.StorageTag), + }, + }, + }) + if err != nil { + return fmt.Errorf("send message: %w", err) + } + } + return nil + }) + + return eg.Wait() +} + +func (s *SQS) Fetch(ctx context.Context) iter.Seq2[Batch, error] { + return func(yield func(Batch, error) bool) { + for range polling { + out, err := s.client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{ + QueueUrl: &s.queue, + MaxNumberOfMessages: maxNumberOfMessages, + MessageAttributeNames: []string{metadataStorageTagKey}, + VisibilityTimeout: visibilityTimeout, + WaitTimeSeconds: waitTimeSeconds, + }) + if err != nil { + yield(Batch{}, fmt.Errorf("receive message: %w", err)) + return + } + + if len(out.Messages) == 0 { + return + } + + for _, message := range out.Messages { + storedBatch := Batch{ + Data: []byte(aws.ToString(message.Body)), + StorageTag: aws.ToString(message.MessageAttributes[metadataStorageTagKey].StringValue), + DeleteKey: aws.ToString(message.ReceiptHandle), + } + if !yield(storedBatch, nil) { + return + } + } + } + } +} + +func (s *SQS) Delete(ctx context.Context, batch Batch) error { + _, err := s.client.DeleteMessage(ctx, &sqs.DeleteMessageInput{ + QueueUrl: &s.queue, + ReceiptHandle: &batch.DeleteKey, + }) + if err != nil { + return fmt.Errorf("delete message: %w", err) + } + + return nil +} diff --git a/aws/logs_monitoring_go/internal/storing/sqs_test.go b/aws/logs_monitoring_go/internal/storing/sqs_test.go new file mode 100644 index 000000000..407124bd4 --- /dev/null +++ b/aws/logs_monitoring_go/internal/storing/sqs_test.go @@ -0,0 +1,194 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package storing + +import ( + "errors" + "testing" + + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/sdkclient" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/testutil" + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/aws/aws-sdk-go-v2/service/sqs/types" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" +) + +func TestSQS_Store(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + mockSetup func(m *sdkclient.MockSQS) + batch Batch + wantErr bool + }{ + "five 100KB logs send one message": { + mockSetup: func(m *sdkclient.MockSQS) { + m.EXPECT(). + SendMessage(gomock.Any(), gomock.Any()). + Return(nil, nil) + }, + batch: Batch{ + Data: testutil.GenerateJSONLogs(t, 100*1024, 100*1024, 100*1024, 100*1024, 100*1024), + }, + }, + "two 512KB logs send two messages": { + mockSetup: func(m *sdkclient.MockSQS) { + m.EXPECT(). + SendMessage(gomock.Any(), gomock.Any()). + Times(2). + Return(nil, nil) + }, + batch: Batch{ + Data: testutil.GenerateJSONLogs(t, 512*1024, 512*1024), + }, + }, + "one 1MiB log send no message": { + mockSetup: func(m *sdkclient.MockSQS) { + }, + batch: Batch{ + Data: testutil.GenerateJSONLogs(t, 1*1024*1024), + }, + }, + "error on SendMessage call": { + mockSetup: func(m *sdkclient.MockSQS) { + m.EXPECT(). + SendMessage(gomock.Any(), gomock.Any()). + Return(nil, errors.New("denied")) + }, + batch: Batch{ + Data: testutil.GenerateJSONLogs(t, 1024), + }, + wantErr: true, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + t.Parallel() + + ctrl := gomock.NewController(t) + mock := sdkclient.NewMockSQS(ctrl) + tc.mockSetup(mock) + sqs := newSQS(mock, "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue") + + err := sqs.Store(t.Context(), tc.batch) + + if tc.wantErr { + require.Error(t, err) + return + } + require.NoError(t, err) + }) + } +} + +func TestSQS_Fetch(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + mockSetup func(m *sdkclient.MockSQS) + wantErr bool + }{ + "exit on empty poll": { + mockSetup: func(m *sdkclient.MockSQS) { + gomock.InOrder( + m.EXPECT(). + ReceiveMessage(gomock.Any(), gomock.Any()). + Return(&sqs.ReceiveMessageOutput{ + Messages: []types.Message{{}}, + }, nil), + m.EXPECT(). + ReceiveMessage(gomock.Any(), gomock.Any()). + Return(&sqs.ReceiveMessageOutput{ + Messages: []types.Message{{}}, + }, nil), + m.EXPECT(). + ReceiveMessage(gomock.Any(), gomock.Any()). + Return(&sqs.ReceiveMessageOutput{}, nil), + ) + }, + }, + "error on ReceiveMessage": { + mockSetup: func(m *sdkclient.MockSQS) { + m.EXPECT(). + ReceiveMessage(gomock.Any(), gomock.Any()). + Return(nil, errors.New("denied")) + }, + wantErr: true, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + t.Parallel() + + ctrl := gomock.NewController(t) + mock := sdkclient.NewMockSQS(ctrl) + tc.mockSetup(mock) + s := newSQS(mock, "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue") + + for _, err := range s.Fetch(t.Context()) { + if tc.wantErr { + require.Error(t, err) + return + } + require.NoError(t, err) + } + }) + } +} + +func TestSQS_Delete(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + mockSetup func(m *sdkclient.MockSQS) + batch Batch + wantErr bool + }{ + "success": { + mockSetup: func(m *sdkclient.MockSQS) { + m.EXPECT(). + DeleteMessage(gomock.Any(), gomock.Any()). + Return(nil, nil) + }, + batch: Batch{ + DeleteKey: "MbZj6wDWli+JvwwJaBV+3d=", + }, + }, + "error on DeleteMessage call": { + mockSetup: func(m *sdkclient.MockSQS) { + m.EXPECT(). + DeleteMessage(gomock.Any(), gomock.Any()). + Return(nil, errors.New("denied")) + }, + batch: Batch{ + DeleteKey: "MbZj6wDWli+JvwwJaBV+3d=", + }, + wantErr: true, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + t.Parallel() + + ctrl := gomock.NewController(t) + mock := sdkclient.NewMockSQS(ctrl) + tc.mockSetup(mock) + sqs := newSQS(mock, "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue") + + err := sqs.Delete(t.Context(), tc.batch) + + if tc.wantErr { + require.Error(t, err) + return + } + require.NoError(t, err) + }) + } +} diff --git a/aws/logs_monitoring_go/internal/storing/storage.go b/aws/logs_monitoring_go/internal/storing/storage.go index f36200fad..0f127bb2b 100644 --- a/aws/logs_monitoring_go/internal/storing/storage.go +++ b/aws/logs_monitoring_go/internal/storing/storage.go @@ -7,24 +7,40 @@ package storing import ( "context" + "encoding/json" + "iter" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/sdkclient" ) const metadataStorageTagKey = "dd-storage-tag" +type Batch struct { + Data json.RawMessage + StorageTag string + DeleteKey string +} + type Storage interface { - Put(ctx context.Context, batch []byte, storageTag string) error - List(ctx context.Context) ([]string, error) - Get(ctx context.Context, key string) ([]byte, string, error) - Delete(ctx context.Context, key string) error + Store(ctx context.Context, batch Batch) error + Fetch(ctx context.Context) iter.Seq2[Batch, error] + Delete(ctx context.Context, batch Batch) error } type Options struct { S3Bucket string + SQSQueue string } -func NewStorage(ctx context.Context, opts Options) (Storage, error) { +func NewStorage(opts Options) (Storage, error) { + if opts.SQSQueue != "" { + sqsClient, err := sdkclient.GetSQS() + if err != nil { + return nil, err + } + return newSQS(sqsClient, opts.SQSQueue), nil + } + if opts.S3Bucket != "" { s3Client, err := sdkclient.GetS3() if err != nil { diff --git a/aws/logs_monitoring_go/internal/testutil/testutil.go b/aws/logs_monitoring_go/internal/testutil/testutil.go index 16f3e377b..42fe697e9 100644 --- a/aws/logs_monitoring_go/internal/testutil/testutil.go +++ b/aws/logs_monitoring_go/internal/testutil/testutil.go @@ -11,7 +11,9 @@ import ( "context" "encoding/base64" "encoding/json" + "fmt" "regexp" + "strings" "testing" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" @@ -119,3 +121,54 @@ func MustKinesisEvent(t *testing.T, records ...[]byte) json.RawMessage { } return raw } + +func GenerateJSONLog(t *testing.T, size int) json.RawMessage { + t.Helper() + + const template = `{"id":"0","timestamp":1718540400000,"message":"%s","ddsourcecategory":"aws"}` + overhead := len(fmt.Sprintf(template, "")) + + padding := size - overhead + if padding < 0 { + t.Fatalf("GenerateJSONLog: requested size %d is smaller than the fixed overhead %d", size, overhead) + } + + return json.RawMessage(fmt.Sprintf(template, strings.Repeat("x", padding))) +} + +func GenerateJSONLogs(t *testing.T, sizes ...int) json.RawMessage { + t.Helper() + + logs := make([]json.RawMessage, 0, len(sizes)) + for _, size := range sizes { + logs = append(logs, GenerateJSONLog(t, size)) + } + + data, err := json.Marshal(logs) + if err != nil { + t.Fatalf("GenerateJSONLogs: marshal: %v", err) + } + return data +} + +func ToChannel[T any](t *testing.T, values []T) chan T { + t.Helper() + + ch := make(chan T, len(values)) + for _, v := range values { + ch <- v + } + close(ch) + + return ch +} + +func Drain[T any](t *testing.T, ch <-chan T) (values []T) { + t.Helper() + + for v := range ch { + values = append(values, v) + } + + return values +}