Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 26 additions & 17 deletions aws/logs_monitoring_go/cmd/forwarder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,40 +8,49 @@ package main
import (
"context"
"encoding/json"
"log/slog"
"errors"

"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/forwarding"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/handling"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/parsing"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/pipeline"

"github.com/aws/aws-lambda-go/lambda"
)

func main() {
ctx := context.Background()
cfg, err := config.Load(ctx)
cfg, err := config.Load()
if err != nil {
slog.Error("config load failed", slog.Any("error", err))
return
panic(err)
}
err = cfg.ResolveAPIKey(context.Background())
if err != nil {
panic(err)
}
err = cfg.ValidateAPIKey(context.Background())
if err != nil {
panic(err)
}

cwHandler := handling.NewCloudwatch(cfg)
kinesisHandler := handling.NewKinesis(cfg)
s3Handler := handling.NewS3(cfg)
handling.Register(parsing.InvocationSourceCloudwatchLogs, cwHandler)
handling.Register(parsing.InvocationSourceKinesis, kinesisHandler)
handling.Register(parsing.InvocationSourceS3, s3Handler)

lambda.Start(handleRequest(cfg))
}

func handleRequest(cfg *config.Config) func(context.Context, json.RawMessage) error {
func handleRequest(cfg *config.Config) func(ctx context.Context, event json.RawMessage) error {
return func(ctx context.Context, event json.RawMessage) error {
invocationSource := parsing.DetectInvocationSource(event)
switch invocationSource {
case parsing.InvocationSourceCloudwatchLogs:
return pipeline.Run(ctx, event, cfg, forwarding.CloudwatchStorage, parsing.HandleCloudwatch)
case parsing.InvocationSourceKinesis:
return pipeline.Run(ctx, event, cfg, forwarding.CloudwatchStorage, parsing.HandleKinesis)
case parsing.InvocationSourceS3:
return pipeline.Run(ctx, event, cfg, forwarding.S3Storage, parsing.HandleS3)
default:
slog.Error("unsupported invocation source", slog.String("source", invocationSource.String()))
return nil
invocation := parsing.DetectInvocationSource(event)
if invocation == parsing.InvocationSourceUnknown {
return errors.New("unknown invocation")
}

run := pipeline.NewRun(cfg, handling.Handlers[invocation], forwarding.Storage(invocation))
return pipeline.Start(ctx, event, run)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2026-Present Datadog, Inc.

package processing
package batching

import (
"bytes"
Expand All @@ -12,6 +12,7 @@ import (
"log/slog"

"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/concurrent"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model"
)

const (
Expand All @@ -20,18 +21,18 @@ const (
maxItemsPerBatch = 1000
)

type Batcher[T any] struct {
type Batcher struct {
batch [][]byte
batchSize int
}

func NewBatcher[T any]() *Batcher[T] {
return &Batcher[T]{
func NewBatcher() *Batcher {
return &Batcher{
batch: make([][]byte, 0, maxItemsPerBatch),
}
}

func (b *Batcher[T]) Batch(ctx context.Context, in <-chan T, out chan<- []byte) error {
func (b *Batcher) Batch(ctx context.Context, in <-chan model.LogEntry, out chan<- []byte) error {
for {
entry, ok, err := concurrent.SafeReader(ctx, in)
if err != nil {
Expand Down Expand Up @@ -66,7 +67,7 @@ func (b *Batcher[T]) Batch(ctx context.Context, in <-chan T, out chan<- []byte)
}
}

func (b *Batcher[T]) flush(ctx context.Context, out chan<- []byte) error {
func (b *Batcher) flush(ctx context.Context, out chan<- []byte) error {
if len(b.batch) == 0 {
return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2026-Present Datadog, Inc.

package processing
package batching

import (
"encoding/json"
Expand All @@ -26,25 +26,29 @@ func TestBatch(t *testing.T) {
wantBatchCount: 0,
},
"single entry": {
entries: []model.LogEntry{makeTestLogEntry()},
entries: []model.LogEntry{model.NewLogEntry()},
wantBatchCount: 1,
wantEntryCounts: []int{1},
},
"multiple entries, one batch": {
entries: []model.LogEntry{makeTestLogEntry(), makeTestLogEntry(), makeTestLogEntry()},
entries: []model.LogEntry{model.NewLogEntry(), model.NewLogEntry(), model.NewLogEntry()},
wantBatchCount: 1,
wantEntryCounts: []int{3},
},
"drop oversized entry": {
entries: []model.LogEntry{model.NewLogEntry(nil, nil, strings.Repeat("x", maxItemSize+1), "", ""), makeTestLogEntry()},
wantBatchCount: 1,
wantEntryCounts: []int{1},
entries: func() []model.LogEntry {
entry := model.NewLogEntry()
entry.Message = strings.Repeat("a", maxItemSize+1)
return []model.LogEntry{entry}
}(),
wantBatchCount: 0,
wantEntryCounts: nil,
},
"split": {
entries: func() []model.LogEntry {
entries := make([]model.LogEntry, maxItemsPerBatch+1)
for i := range entries {
entries[i] = makeTestLogEntry()
entries[i] = model.NewLogEntry()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🥜 nitpick: ‏The added value of the NewLogEntry is to put aws as the sourcecategory. I don't think it's strictly necessary for the test.

We could avoid doing the loop on entries, the line 49 is already allocating the slice with non-pointer struct, so struct are already allocated (empty).

}
return entries
}(),
Expand All @@ -57,51 +61,38 @@ func TestBatch(t *testing.T) {
t.Run(name, func(t *testing.T) {
t.Parallel()

batches := collectBatches(t, feedChannel(tc.entries...))
in := make(chan model.LogEntry, len(tc.entries))
out := make(chan []byte, len(tc.wantEntryCounts))
for _, entry := range tc.entries {
in <- entry
}
close(in)

batcher := NewBatcher()
err := batcher.Batch(t.Context(), in, out)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
close(out)

var batches [][]byte
for b := range out {
batches = append(batches, b)
}

if len(batches) != tc.wantBatchCount {
t.Fatalf("expected %d batches, got %d", tc.wantBatchCount, len(batches))
t.Fatalf("Batch() got %d batches, want %d", len(batches), tc.wantBatchCount)
}

for i, wantCount := range tc.wantEntryCounts {
var entries []model.LogEntry
if err := json.Unmarshal(batches[i], &entries); err != nil {
t.Fatalf("failed to unmarshal batch %d: %v", i, err)
t.Fatalf("Batch() failed to unmarshal batch %d: %v", i, err)
}
if len(entries) != wantCount {
t.Errorf("batch %d: expected %d entries, got %d", i, wantCount, len(entries))
t.Errorf("Batch() batch %d: got %d entries, want %d", i, len(entries), wantCount)
}
}
})
}
}

func makeTestLogEntry() model.LogEntry {
return model.NewLogEntry(nil, nil, "test", "test", "test")
}

func feedChannel(entries ...model.LogEntry) <-chan model.LogEntry {
ch := make(chan model.LogEntry, len(entries))
for _, e := range entries {
ch <- e
}
close(ch)
return ch
}

func collectBatches(t *testing.T, in <-chan model.LogEntry) [][]byte {
t.Helper()

out := make(chan []byte, 100)
batcher := NewBatcher[model.LogEntry]()
err := batcher.Batch(t.Context(), in, out)
close(out)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

var batches [][]byte
for b := range out {
batches = append(batches, b)
}
return batches
}
4 changes: 2 additions & 2 deletions aws/logs_monitoring_go/internal/config/apikey.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type apiKeyResolver struct {
resolve func(ctx context.Context, value string) (string, error)
}

func (c *Config) resolveAPIKey(ctx context.Context) error {
func (c *Config) ResolveAPIKey(ctx context.Context) error {
resolvers := []apiKeyResolver{
{"DD_API_KEY_SECRET_ARN", c.resolveAPIKeyFromSecretsManager},
{"DD_API_KEY_SSM_NAME", c.resolveAPIKeyFromSSM},
Expand All @@ -55,7 +55,7 @@ func (c *Config) resolveAPIKey(ctx context.Context) error {
return errors.New("no API key configured: set DD_API_KEY_SECRET_ARN, DD_API_KEY_SSM_NAME or DD_KMS_API_KEY. See: https://docs.datadoghq.com/serverless/forwarder/")
}

func (c *Config) validateAPIKey(ctx context.Context) error {
func (c *Config) ValidateAPIKey(ctx context.Context) error {
if c.APIKey == "" {
return fmt.Errorf("set DD_API_KEY_SECRET_ARN, DD_API_KEY_SSM_NAME or DD_KMS_API_KEY. See: https://docs.datadoghq.com/serverless/forwarder/: %w", ErrMissingAPIKey)
}
Expand Down
Loading
Loading