diff --git a/aws/logs_monitoring_go/cmd/forwarder/main.go b/aws/logs_monitoring_go/cmd/forwarder/main.go index d31ad219a..46ba69d2e 100644 --- a/aws/logs_monitoring_go/cmd/forwarder/main.go +++ b/aws/logs_monitoring_go/cmd/forwarder/main.go @@ -8,11 +8,9 @@ package main import ( "context" "encoding/json" - "errors" + "fmt" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" - "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/forwarding" - "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/handling" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/parsing" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/pipeline" @@ -33,24 +31,15 @@ func main() { panic(err) } - cwHandler := handling.NewCloudwatch(cfg) - kinesisHandler := handling.NewKinesis(cfg) - s3Handler := handling.NewS3(cfg) - handling.Register(parsing.InvocationSourceCloudwatchLogs, cwHandler) - handling.Register(parsing.InvocationSourceKinesis, kinesisHandler) - handling.Register(parsing.InvocationSourceS3, s3Handler) - lambda.Start(handleRequest(cfg)) } func handleRequest(cfg *config.Config) func(ctx context.Context, event json.RawMessage) error { return func(ctx context.Context, event json.RawMessage) error { - invocation := parsing.DetectInvocationSource(event) - if invocation == parsing.InvocationSourceUnknown { - return errors.New("unknown invocation") + parsed, err := parsing.Parse(event) + if err != nil { + return fmt.Errorf("parse: %w", err) } - - run := pipeline.NewRun(cfg, handling.Handlers[invocation], forwarding.Storage(invocation)) - return pipeline.Start(ctx, event, run) + return pipeline.Start(ctx, parsed, cfg) } } diff --git a/aws/logs_monitoring_go/internal/forwarding/storage.go b/aws/logs_monitoring_go/internal/forwarding/storage.go index 292ac75e9..53b1bd03b 100644 --- a/aws/logs_monitoring_go/internal/forwarding/storage.go +++ b/aws/logs_monitoring_go/internal/forwarding/storage.go @@ -12,11 +12,11 @@ const ( s3Storage = "s3" ) -func Storage(source parsing.InvocationSource) string { - switch source { - case parsing.InvocationSourceS3: +func StorageFromContentType(contentType parsing.ContentType) string { + switch contentType { + case parsing.ContentTypeS3: return s3Storage - case parsing.InvocationSourceCloudwatchLogs, parsing.InvocationSourceKinesis: + case parsing.ContentTypeCloudwatchLogs, parsing.ContentTypeKinesis: return cloudwatchStorage default: return "" diff --git a/aws/logs_monitoring_go/internal/handling/cloudwatch.go b/aws/logs_monitoring_go/internal/handling/cloudwatch.go index 25249f419..a8cc4a751 100644 --- a/aws/logs_monitoring_go/internal/handling/cloudwatch.go +++ b/aws/logs_monitoring_go/internal/handling/cloudwatch.go @@ -42,7 +42,7 @@ func NewCloudwatch(cfg *config.Config) *CloudwatchHandler { } } -func (h CloudwatchHandler) Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error { +func (h *CloudwatchHandler) Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error { var cwEvent events.CloudwatchLogsEvent if err := json.Unmarshal(event, &cwEvent); err != nil { return fmt.Errorf("unmarshal: %w", err) diff --git a/aws/logs_monitoring_go/internal/handling/eventbridge.go b/aws/logs_monitoring_go/internal/handling/eventbridge.go new file mode 100644 index 000000000..06ab7619e --- /dev/null +++ b/aws/logs_monitoring_go/internal/handling/eventbridge.go @@ -0,0 +1,94 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package handling + +import ( + "bytes" + "cmp" + "context" + "encoding/json" + "errors" + "fmt" + "strings" + + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/concurrent" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" +) + +type EventBridgeHandler struct { + cfg *config.Config +} + +func NewEventBridge(cfg *config.Config) *EventBridgeHandler { + return &EventBridgeHandler{ + cfg: cfg, + } +} + +func (h *EventBridgeHandler) Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error { + lambdaOrigin, err := model.GetLambdaOrigin(ctx) + if err != nil { + return fmt.Errorf("get lambda origin: %w", err) + } + + ebSource, err := decodeEventBridgeSource(event) + if err != nil { + return err + } + source := cmp.Or(h.cfg.Source, ebSource) + service := cmp.Or(h.cfg.Service, source) + + entry := model.NewLogEntry() + entry.Message = string(event) + entry.Source = source + entry.Service = service + entry.Tags = h.cfg.Tags + entry.Metadata = lambdaOrigin + + if h.cfg.Filter.ShouldExclude(entry.Message) { + return nil + } + + entry.Message = h.cfg.Scrubber.Scrub(entry.Message) + return concurrent.SafeSender(ctx, out, entry) +} + +func decodeEventBridgeSource(event json.RawMessage) (string, error) { + dec := json.NewDecoder(bytes.NewReader(event)) + + if t, err := dec.Token(); err != nil || t != json.Delim('{') { + return "", errors.New("decode eventbridge source: expected '{'") + } + + for dec.More() { + key, err := dec.Token() + if err != nil { + return "", fmt.Errorf("decode eventbridge source: read key: %w", err) + } + if key == "source" { + var source string + if err := dec.Decode(&source); err != nil { + return "", fmt.Errorf("decode eventbridge source: %w", err) + } + return eventBridgeSource(source), nil + } + var skip json.RawMessage + if err := dec.Decode(&skip); err != nil { + return "", fmt.Errorf("decode eventbridge source: skip field: %w", err) + } + } + + return "", nil +} + +func eventBridgeSource(source string) string { + _, after, found := strings.Cut(source, ".") + if found { + return after + } + return sourceCloudwatch +} diff --git a/aws/logs_monitoring_go/internal/handling/eventbridge_test.go b/aws/logs_monitoring_go/internal/handling/eventbridge_test.go new file mode 100644 index 000000000..422e94e13 --- /dev/null +++ b/aws/logs_monitoring_go/internal/handling/eventbridge_test.go @@ -0,0 +1,132 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package handling + +import ( + "encoding/json" + "testing" + + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/testutil" + "github.com/google/go-cmp/cmp" +) + +func TestEventBridgeHandler_Handle(t *testing.T) { + t.Parallel() + + ctx := testutil.LambdaContext(t) + + tests := map[string]struct { + event json.RawMessage + cfg *config.Config + want []model.LogEntry + wantErr bool + }{ + "scheduled event": { + event: json.RawMessage(`{"version":"0","id":"abc","detail-type":"Scheduled Event","source":"aws.events","account":"123456789012","time":"1970-01-01T00:00:00Z","region":"us-east-1","resources":[],"detail":{}}`), + cfg: testutil.EmptyConfig(), + want: []model.LogEntry{ + { + Message: `{"version":"0","id":"abc","detail-type":"Scheduled Event","source":"aws.events","account":"123456789012","time":"1970-01-01T00:00:00Z","region":"us-east-1","resources":[],"detail":{}}`, + Source: "events", + SourceCategory: "aws", + Service: "events", + Metadata: testutil.LambdaOrigin(), + }, + }, + }, + "ec2 event": { + event: json.RawMessage(`{"version":"0","id":"abc","detail-type":"EC2 Instance State-change Notification","source":"aws.ec2","account":"123456789012","time":"1970-01-01T00:00:00Z","region":"us-east-1","resources":[],"detail":{"instance-id":"i-123","state":"running"}}`), + cfg: testutil.EmptyConfig(), + want: []model.LogEntry{ + { + Message: `{"version":"0","id":"abc","detail-type":"EC2 Instance State-change Notification","source":"aws.ec2","account":"123456789012","time":"1970-01-01T00:00:00Z","region":"us-east-1","resources":[],"detail":{"instance-id":"i-123","state":"running"}}`, + Source: "ec2", + SourceCategory: "aws", + Service: "ec2", + Metadata: testutil.LambdaOrigin(), + }, + }, + }, + "custom source override": { + event: json.RawMessage(`{"version":"0","id":"abc","detail-type":"Scheduled Event","source":"aws.events","account":"123456789012","time":"1970-01-01T00:00:00Z","region":"us-east-1","resources":[],"detail":{}}`), + cfg: &config.Config{Source: "custom-source"}, + want: []model.LogEntry{ + { + Message: `{"version":"0","id":"abc","detail-type":"Scheduled Event","source":"aws.events","account":"123456789012","time":"1970-01-01T00:00:00Z","region":"us-east-1","resources":[],"detail":{}}`, + Source: "custom-source", + SourceCategory: "aws", + Service: "custom-source", + Metadata: testutil.LambdaOrigin(), + }, + }, + }, + "invalid JSON": { + event: json.RawMessage(`not json`), + cfg: testutil.EmptyConfig(), + wantErr: true, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + t.Parallel() + + handler := NewEventBridge(tc.cfg) + out := make(chan model.LogEntry, len(tc.want)) + + err := handler.Handle(ctx, tc.event, out) + close(out) + + if tc.wantErr { + if err == nil { + t.Fatal("expected error, got nil") + } + return + } + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + var got []model.LogEntry + for entry := range out { + got = append(got, entry) + } + + if diff := cmp.Diff(tc.want, got); diff != "" { + t.Errorf("entries mismatch (-want +got):\n%s", diff) + } + }) + } +} + +func TestEventBridgeSource(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + source string + want string + }{ + "aws.events": {source: "aws.events", want: "events"}, + "aws.ec2": {source: "aws.ec2", want: "ec2"}, + "aws.s3": {source: "aws.s3", want: "s3"}, + "custom.app": {source: "custom.app", want: "app"}, + "no dot": {source: "nodot", want: "cloudwatch"}, + "empty string": {source: "", want: "cloudwatch"}, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + t.Parallel() + got := eventBridgeSource(tc.source) + if got != tc.want { + t.Errorf("got %q, want %q", got, tc.want) + } + }) + } +} diff --git a/aws/logs_monitoring_go/internal/handling/handler.go b/aws/logs_monitoring_go/internal/handling/handler.go index 6edab85a3..49d8b204d 100644 --- a/aws/logs_monitoring_go/internal/handling/handler.go +++ b/aws/logs_monitoring_go/internal/handling/handler.go @@ -8,17 +8,28 @@ package handling import ( "context" "encoding/json" + "fmt" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/parsing" ) -var Handlers = make(map[parsing.InvocationSource]Handler) - type Handler interface { Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error } -func Register(invocation parsing.InvocationSource, handler Handler) { - Handlers[invocation] = handler +func NewHandler(ct parsing.ContentType, cfg *config.Config) (Handler, error) { + switch ct { + case parsing.ContentTypeCloudwatchLogs: + return NewCloudwatch(cfg), nil + case parsing.ContentTypeS3: + return NewS3(cfg), nil + case parsing.ContentTypeKinesis: + return NewKinesis(cfg), nil + case parsing.ContentTypeEventBridge: + return NewEventBridge(cfg), nil + default: + return nil, fmt.Errorf("unsupported content type: %v", ct) + } } diff --git a/aws/logs_monitoring_go/internal/handling/kinesis.go b/aws/logs_monitoring_go/internal/handling/kinesis.go index a1caf3426..4ce403f58 100644 --- a/aws/logs_monitoring_go/internal/handling/kinesis.go +++ b/aws/logs_monitoring_go/internal/handling/kinesis.go @@ -26,13 +26,13 @@ func NewKinesis(cfg *config.Config) *KinesisHandler { } } -func (h KinesisHandler) Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error { +func (h *KinesisHandler) Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error { var kinesisEvent events.KinesisEvent if err := json.Unmarshal(event, &kinesisEvent); err != nil { return fmt.Errorf("unmarshal: %w", err) } - cw := CloudwatchHandler(h) + cw := CloudwatchHandler(*h) for i, record := range kinesisEvent.Records { cwData, err := decompressCloudwatchLogs(record.Kinesis.Data) if err != nil { diff --git a/aws/logs_monitoring_go/internal/handling/s3.go b/aws/logs_monitoring_go/internal/handling/s3.go index 38b6ad9a8..4a3c35e0e 100644 --- a/aws/logs_monitoring_go/internal/handling/s3.go +++ b/aws/logs_monitoring_go/internal/handling/s3.go @@ -38,7 +38,7 @@ func NewS3(cfg *config.Config) *S3Handler { } } -func (h S3Handler) Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error { +func (h *S3Handler) Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error { var s3Event events.S3Event if err := json.Unmarshal(event, &s3Event); err != nil { return fmt.Errorf("unmarshal: %w", err) diff --git a/aws/logs_monitoring_go/internal/parsing/content.go b/aws/logs_monitoring_go/internal/parsing/content.go new file mode 100644 index 000000000..e0b96c0d6 --- /dev/null +++ b/aws/logs_monitoring_go/internal/parsing/content.go @@ -0,0 +1,24 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package parsing + +import "encoding/json" + +//go:generate stringer -type ContentType -trimprefix ContentType -output content_string.go +type ContentType int + +const ( + ContentTypeUnknown ContentType = iota + ContentTypeCloudwatchLogs + ContentTypeS3 + ContentTypeKinesis + ContentTypeEventBridge +) + +type ParsedEvent struct { + ContentType ContentType + Payload json.RawMessage +} diff --git a/aws/logs_monitoring_go/internal/parsing/content_string.go b/aws/logs_monitoring_go/internal/parsing/content_string.go new file mode 100644 index 000000000..3a0a6d398 --- /dev/null +++ b/aws/logs_monitoring_go/internal/parsing/content_string.go @@ -0,0 +1,28 @@ +// Code generated by "stringer -type ContentType -trimprefix ContentType -output content_string.go"; DO NOT EDIT. + +package parsing + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[ContentTypeUnknown-0] + _ = x[ContentTypeCloudwatchLogs-1] + _ = x[ContentTypeS3-2] + _ = x[ContentTypeKinesis-3] + _ = x[ContentTypeEventBridge-4] +} + +const _ContentType_name = "UnknownCloudwatchLogsS3KinesisEventBridge" + +var _ContentType_index = [...]uint8{0, 7, 21, 23, 30, 41} + +func (i ContentType) String() string { + idx := int(i) - 0 + if i < 0 || idx >= len(_ContentType_index)-1 { + return "ContentType(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _ContentType_name[_ContentType_index[idx]:_ContentType_index[idx+1]] +} diff --git a/aws/logs_monitoring_go/internal/parsing/eventbridge.go b/aws/logs_monitoring_go/internal/parsing/eventbridge.go new file mode 100644 index 000000000..f34291c28 --- /dev/null +++ b/aws/logs_monitoring_go/internal/parsing/eventbridge.go @@ -0,0 +1,138 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package parsing + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "strings" + + "github.com/aws/aws-lambda-go/events" +) + +const ( + eventBridgeSourceS3 = "aws.s3" + eventBridgeDetailTypeS3 = "Object Created" +) + +func parseEventBridge(event json.RawMessage) ([]ParsedEvent, error) { + source, detailType, detail, err := decodeEventBridgeEnvelope(event) + if err != nil { + return nil, fmt.Errorf("decode eventbridge: %w", err) + } + + if source == eventBridgeSourceS3 && strings.Contains(detailType, eventBridgeDetailTypeS3) { + s3Event, err := buildS3EventFromEventBridge(detail) + if err != nil { + return nil, fmt.Errorf("build s3 event from eventbridge: %w", err) + } + return []ParsedEvent{{ContentType: ContentTypeS3, Payload: s3Event}}, nil + } + + return []ParsedEvent{{ContentType: ContentTypeEventBridge, Payload: event}}, nil +} + +func decodeEventBridgeEnvelope(event json.RawMessage) (source, detailType string, detail json.RawMessage, err error) { + dec := json.NewDecoder(bytes.NewReader(event)) + + if t, err := dec.Token(); err != nil || t != json.Delim('{') { + return "", "", nil, errors.New("expected '{'") + } + + for dec.More() { + key, err := dec.Token() + if err != nil { + return "", "", nil, fmt.Errorf("read key: %w", err) + } + + switch key { + case "source": + if err := dec.Decode(&source); err != nil { + return "", "", nil, fmt.Errorf("source: %w", err) + } + case "detail-type": + if err := dec.Decode(&detailType); err != nil { + return "", "", nil, fmt.Errorf("detail-type: %w", err) + } + case "detail": + if err := dec.Decode(&detail); err != nil { + return "", "", nil, fmt.Errorf("detail: %w", err) + } + default: + var skip json.RawMessage + if err := dec.Decode(&skip); err != nil { + return "", "", nil, fmt.Errorf("skip field: %w", err) + } + } + } + + return source, detailType, detail, nil +} + +func buildS3EventFromEventBridge(detail json.RawMessage) (json.RawMessage, error) { + bucketName, objectKey, err := decodeEventBridgeS3Detail(detail) + if err != nil { + return nil, fmt.Errorf("decode: %w", err) + } + + s3Event := events.S3Event{ + Records: []events.S3EventRecord{{ + EventSource: eventSourceS3, + S3: events.S3Entity{ + Bucket: events.S3Bucket{Name: bucketName}, + Object: events.S3Object{Key: objectKey, URLDecodedKey: objectKey}, + }, + }}, + } + payload, err := json.Marshal(s3Event) + if err != nil { + return nil, fmt.Errorf("marshal: %w", err) + } + return payload, nil +} + +func decodeEventBridgeS3Detail(detail json.RawMessage) (bucket, key string, err error) { + dec := json.NewDecoder(bytes.NewReader(detail)) + + if t, err := dec.Token(); err != nil || t != json.Delim('{') { + return "", "", errors.New("expected '{'") + } + + for dec.More() { + k, err := dec.Token() + if err != nil { + return "", "", fmt.Errorf("read key: %w", err) + } + + switch k { + case "bucket": + var b struct { + Name string `json:"name"` + } + if err := dec.Decode(&b); err != nil { + return "", "", fmt.Errorf("bucket: %w", err) + } + bucket = b.Name + case "object": + var o struct { + Key string `json:"key"` + } + if err := dec.Decode(&o); err != nil { + return "", "", fmt.Errorf("object: %w", err) + } + key = o.Key + default: + var skip json.RawMessage + if err := dec.Decode(&skip); err != nil { + return "", "", fmt.Errorf("skip field: %w", err) + } + } + } + + return bucket, key, nil +} diff --git a/aws/logs_monitoring_go/internal/parsing/invocation_source.go b/aws/logs_monitoring_go/internal/parsing/invocation_source.go deleted file mode 100644 index d37fa41da..000000000 --- a/aws/logs_monitoring_go/internal/parsing/invocation_source.go +++ /dev/null @@ -1,89 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2026-Present Datadog, Inc. - -package parsing - -import ( - "bytes" - "encoding/json" -) - -//go:generate stringer -type InvocationSource -trimprefix InvocationSource -output invocation_source_string.go -type InvocationSource int - -const ( - InvocationSourceUnknown InvocationSource = iota - InvocationSourceCloudwatchLogs - InvocationSourceS3 - InvocationSourceSNS - InvocationSourceSQS - InvocationSourceKinesis -) - -func DetectInvocationSource(event json.RawMessage) InvocationSource { - dec := json.NewDecoder(bytes.NewReader(event)) - - t, err := dec.Token() - if err != nil || t != json.Delim('{') { - return InvocationSourceUnknown - } - - key, err := dec.Token() - if err != nil { - return InvocationSourceUnknown - } - if key == "awslogs" { - return InvocationSourceCloudwatchLogs - } - - if key == "Records" { - return detectFromRecords(dec) - } - - return InvocationSourceUnknown -} - -func detectFromRecords(dec *json.Decoder) InvocationSource { - t, err := dec.Token() - if err != nil || t != json.Delim('[') { - return InvocationSourceUnknown - } - - t, err = dec.Token() - if err != nil || t != json.Delim('{') { - return InvocationSourceUnknown - } - - for dec.More() { - key, err := dec.Token() - if err != nil { - return InvocationSourceUnknown - } - if key == "eventSource" { - val, err := dec.Token() - if err != nil { - return InvocationSourceUnknown - } - switch val { - case "aws:s3": - return InvocationSourceS3 - case "aws:sns": - return InvocationSourceSNS - case "aws:sqs": - return InvocationSourceSQS - case "aws:kinesis": - return InvocationSourceKinesis - default: - return InvocationSourceUnknown - } - } - var skip json.RawMessage - if err := dec.Decode(&skip); err != nil { - return InvocationSourceUnknown - } - } - - return InvocationSourceUnknown -} diff --git a/aws/logs_monitoring_go/internal/parsing/invocation_source_bench_test.go b/aws/logs_monitoring_go/internal/parsing/invocation_source_bench_test.go deleted file mode 100644 index 5f0ab09d5..000000000 --- a/aws/logs_monitoring_go/internal/parsing/invocation_source_bench_test.go +++ /dev/null @@ -1,67 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2026-Present Datadog, Inc. - -package parsing - -import ( - "bytes" - "encoding/json" - "fmt" - "strings" - "testing" -) - -const benchS3Records = 500 - -var ( - smallCWEvent = json.RawMessage(`{"awslogs":{"data":"dGVzdA=="}}`) - largeS3EventFirst = makeS3EventSourceFirst(benchS3Records) - largeS3EventLast = makeS3EventSourceLast(benchS3Records) -) - -var benchCases = []struct { - name string - event json.RawMessage -}{ - {"SmallCW", smallCWEvent}, - {"LargeS3_SourceFirst", largeS3EventFirst}, - {"LargeS3_SourceLast", largeS3EventLast}, -} - -func BenchmarkDetectInvocationSource(b *testing.B) { - for _, tc := range benchCases { - b.Run(tc.name, func(b *testing.B) { - for b.Loop() { - DetectInvocationSource(tc.event) - } - }) - } -} - -func makeS3EventSourceFirst(nRecords int) json.RawMessage { - var buf bytes.Buffer - buf.WriteString(`{"Records":[`) - for i := range nRecords { - if i > 0 { - buf.WriteByte(',') - } - fmt.Fprintf(&buf, `{"eventSource":"aws:s3","s3":{"bucket":{"name":"bucket-%d"},"object":{"key":"%s"}}}`, i, strings.Repeat("x", 200)) - } - buf.WriteString(`]}`) - return json.RawMessage(buf.Bytes()) -} - -func makeS3EventSourceLast(nRecords int) json.RawMessage { - var buf bytes.Buffer - buf.WriteString(`{"Records":[`) - for i := range nRecords { - if i > 0 { - buf.WriteByte(',') - } - fmt.Fprintf(&buf, `{"s3":{"bucket":{"name":"bucket-%d"},"object":{"key":"%s"}},"eventSource":"aws:s3"}`, i, strings.Repeat("x", 200)) - } - buf.WriteString(`]}`) - return json.RawMessage(buf.Bytes()) -} diff --git a/aws/logs_monitoring_go/internal/parsing/invocation_source_string.go b/aws/logs_monitoring_go/internal/parsing/invocation_source_string.go deleted file mode 100644 index 4e659dadd..000000000 --- a/aws/logs_monitoring_go/internal/parsing/invocation_source_string.go +++ /dev/null @@ -1,29 +0,0 @@ -// Code generated by "stringer -type InvocationSource -trimprefix InvocationSource -output invocation_source_string.go"; DO NOT EDIT. - -package parsing - -import "strconv" - -func _() { - // An "invalid array index" compiler error signifies that the constant values have changed. - // Re-run the stringer command to generate them again. - var x [1]struct{} - _ = x[InvocationSourceUnknown-0] - _ = x[InvocationSourceCloudwatchLogs-1] - _ = x[InvocationSourceS3-2] - _ = x[InvocationSourceSNS-3] - _ = x[InvocationSourceSQS-4] - _ = x[InvocationSourceKinesis-5] -} - -const _InvocationSource_name = "UnknownCloudwatchLogsS3SNSSQSKinesis" - -var _InvocationSource_index = [...]uint8{0, 7, 21, 23, 26, 29, 36} - -func (i InvocationSource) String() string { - idx := int(i) - 0 - if i < 0 || idx >= len(_InvocationSource_index)-1 { - return "InvocationSource(" + strconv.FormatInt(int64(i), 10) + ")" - } - return _InvocationSource_name[_InvocationSource_index[idx]:_InvocationSource_index[idx+1]] -} diff --git a/aws/logs_monitoring_go/internal/parsing/invocation_source_test.go b/aws/logs_monitoring_go/internal/parsing/invocation_source_test.go deleted file mode 100644 index 2a76478b9..000000000 --- a/aws/logs_monitoring_go/internal/parsing/invocation_source_test.go +++ /dev/null @@ -1,52 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2026-Present Datadog, Inc. - -package parsing - -import ( - "encoding/json" - "testing" -) - -func TestDetectInvocationSource(t *testing.T) { - t.Parallel() - - tests := map[string]struct { - event json.RawMessage - wantKey InvocationSource - }{ - "cloudwatch": { - event: json.RawMessage(`{"awslogs":{"data":"dGVzdA=="}}`), - wantKey: InvocationSourceCloudwatchLogs, - }, - "s3": { - event: json.RawMessage(`{"Records":[{"s3":{"bucket":{"name":"my-bucket"},"object":{"key":"my-key"}},"eventSource":"aws:s3"}]}`), - wantKey: InvocationSourceS3, - }, - "empty object": { - event: json.RawMessage(`{}`), - wantKey: InvocationSourceUnknown, - }, - "not JSON": { - event: json.RawMessage(`not a json`), - wantKey: InvocationSourceUnknown, - }, - "empty input": { - event: json.RawMessage(``), - wantKey: InvocationSourceUnknown, - }, - } - - for name, tc := range tests { - t.Run(name, func(t *testing.T) { - t.Parallel() - - got := DetectInvocationSource(tc.event) - if got != tc.wantKey { - t.Errorf("got %d, want %d", got, tc.wantKey) - } - }) - } -} diff --git a/aws/logs_monitoring_go/internal/parsing/parse.go b/aws/logs_monitoring_go/internal/parsing/parse.go new file mode 100644 index 000000000..a28d0dd44 --- /dev/null +++ b/aws/logs_monitoring_go/internal/parsing/parse.go @@ -0,0 +1,96 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package parsing + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "strings" +) + +const ( + eventSourceKey = "eventSource" + eventSourceS3 = "aws:s3" + eventSourceKinesis = "aws:kinesis" +) + +func Parse(event json.RawMessage) ([]ParsedEvent, error) { + dec := json.NewDecoder(bytes.NewReader(event)) + + if t, err := dec.Token(); err != nil || t != json.Delim('{') { + return nil, errors.New("expected JSON object") + } + + for dec.More() { + key, err := dec.Token() + if err != nil { + return nil, fmt.Errorf("read key: %w", err) + } + + switch key { + case "awslogs": + return []ParsedEvent{{ContentType: ContentTypeCloudwatchLogs, Payload: event}}, nil + case "Records": + return parseRecords(event, dec) + case "detail": + return parseEventBridge(event) + default: + var skip json.RawMessage + if err := dec.Decode(&skip); err != nil { + return nil, fmt.Errorf("skip value: %w", err) + } + } + } + + return nil, errors.New("unsupported event") +} + +func parseRecords(event json.RawMessage, dec *json.Decoder) ([]ParsedEvent, error) { + if t, err := dec.Token(); err != nil || t != json.Delim('[') { + return nil, errors.New("records: expected array") + } + if t, err := dec.Token(); err != nil || t != json.Delim('{') { + return nil, errors.New("records: expected object") + } + + for dec.More() { + key, err := dec.Token() + if err != nil { + return nil, fmt.Errorf("read record key: %w", err) + } + + if !strings.EqualFold(key.(string), eventSourceKey) { // SNS event source key has a capital letter + var skip json.RawMessage + if err := dec.Decode(&skip); err != nil { + return nil, fmt.Errorf("skip record field: %w", err) + } + continue + } + + val, err := dec.Token() + if err != nil { + return nil, fmt.Errorf("read event source value: %w", err) + } + + eventSource, ok := val.(string) + if !ok { + return nil, fmt.Errorf("eventSource is not a string: %v", val) + } + + switch eventSource { + case eventSourceS3: + return []ParsedEvent{{ContentType: ContentTypeS3, Payload: event}}, nil + case eventSourceKinesis: + return []ParsedEvent{{ContentType: ContentTypeKinesis, Payload: event}}, nil + default: + return nil, fmt.Errorf("records: unsupported event source %q", eventSource) + } + } + + return nil, errors.New("no eventSource found in records") +} diff --git a/aws/logs_monitoring_go/internal/parsing/parse_test.go b/aws/logs_monitoring_go/internal/parsing/parse_test.go new file mode 100644 index 000000000..545aeafb5 --- /dev/null +++ b/aws/logs_monitoring_go/internal/parsing/parse_test.go @@ -0,0 +1,94 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package parsing + +import ( + "encoding/json" + "testing" +) + +func TestParse(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + event json.RawMessage + want []ContentType + wantErr bool + }{ + "cloudwatch logs": { + event: json.RawMessage(`{"awslogs":{"data":"dGVzdA=="}}`), + want: []ContentType{ContentTypeCloudwatchLogs}, + }, + "s3": { + event: json.RawMessage(`{"Records":[{"eventSource":"aws:s3","s3":{"bucket":{"name":"b"},"object":{"key":"k"}}}]}`), + want: []ContentType{ContentTypeS3}, + }, + "kinesis": { + event: json.RawMessage(`{"Records":[{"eventSource":"aws:kinesis","kinesis":{"data":"dGVzdA=="}}]}`), + want: []ContentType{ContentTypeKinesis}, + }, + "eventbridge generic": { + event: json.RawMessage(`{"version":"0","id":"abc","detail-type":"Scheduled Event","source":"aws.events","detail":{}}`), + want: []ContentType{ContentTypeEventBridge}, + }, + "eventbridge s3": { + event: json.RawMessage(`{"version":"0","detail-type":"Object Created","source":"aws.s3","detail":{"bucket":{"name":"my-bucket"},"object":{"key":"my-key"}}}`), + want: []ContentType{ContentTypeS3}, + }, + "eventbridge ec2": { + event: json.RawMessage(`{"version":"0","detail-type":"EC2 Instance State-change Notification","source":"aws.ec2","detail":{"instance-id":"i-123"}}`), + want: []ContentType{ContentTypeEventBridge}, + }, + "eventbridge s3 without object created": { + event: json.RawMessage(`{"version":"0","detail-type":"Object Deleted","source":"aws.s3","detail":{}}`), + want: []ContentType{ContentTypeEventBridge}, + }, + "empty object": { + event: json.RawMessage(`{}`), + wantErr: true, + }, + "unsupported source": { + event: json.RawMessage(`{"Records":[{"eventSource":"aws:dynamodb"}]}`), + wantErr: true, + }, + "not JSON": { + event: json.RawMessage(`not json`), + wantErr: true, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + t.Parallel() + + got, err := Parse(tc.event) + + if tc.wantErr { + if err == nil { + t.Fatal("expected error, got nil") + } + return + } + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(got) != len(tc.want) { + t.Fatalf("got %d events, want %d", len(got), len(tc.want)) + } + + for i, pe := range got { + if pe.ContentType != tc.want[i] { + t.Errorf("event[%d]: got ContentType %v, want %v", i, pe.ContentType, tc.want[i]) + } + if len(pe.Payload) == 0 { + t.Errorf("event[%d]: empty payload", i) + } + } + }) + } +} diff --git a/aws/logs_monitoring_go/internal/pipeline/pipeline.go b/aws/logs_monitoring_go/internal/pipeline/pipeline.go index 28ddf1598..d89cba246 100644 --- a/aws/logs_monitoring_go/internal/pipeline/pipeline.go +++ b/aws/logs_monitoring_go/internal/pipeline/pipeline.go @@ -7,31 +7,49 @@ package pipeline import ( "context" - "encoding/json" + "errors" + "fmt" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/forwarding" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/handling" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/parsing" "golang.org/x/sync/errgroup" ) func Start( ctx context.Context, - event json.RawMessage, - run *Run, + parsedEvents []parsing.ParsedEvent, + cfg *config.Config, ) error { - g, ctx := errgroup.WithContext(ctx) + if len(parsedEvents) == 0 { + return errors.New("no events to process") + } + + eg, ctx := errgroup.WithContext(ctx) entries := make(chan model.LogEntry) - forwarder := forwarding.NewForwarder(run.Cfg, forwarding.Client, run.Storage) + forwarder := forwarding.NewForwarder(cfg, forwarding.Client, forwarding.StorageFromContentType(parsedEvents[0].ContentType)) - g.Go(func() error { + eg.Go(func() error { defer close(entries) - return run.Handler.Handle(ctx, event, entries) - }) - - g.Go(func() error { - return forwarder.Start(ctx, entries) + for _, parsedEvent := range parsedEvents { + handler, err := handling.NewHandler(parsedEvent.ContentType, cfg) + if err != nil { + return fmt.Errorf("new handler: %w", err) + } + + if err := handler.Handle(ctx, parsedEvent.Payload, entries); err != nil { + return fmt.Errorf("handle: %w", err) + } + } + return nil }) - return g.Wait() + err := forwarder.Start(ctx, entries) + if waitErr := eg.Wait(); waitErr != nil { + return errors.Join(err, waitErr) + } + return err } diff --git a/aws/logs_monitoring_go/internal/pipeline/run.go b/aws/logs_monitoring_go/internal/pipeline/run.go deleted file mode 100644 index 5aee557a1..000000000 --- a/aws/logs_monitoring_go/internal/pipeline/run.go +++ /dev/null @@ -1,25 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2026-Present Datadog, Inc. - -package pipeline - -import ( - "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" - "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/handling" -) - -type Run struct { - Cfg *config.Config - Handler handling.Handler - Storage string -} - -func NewRun(cfg *config.Config, handler handling.Handler, storage string) *Run { - return &Run{ - Cfg: cfg, - Handler: handler, - Storage: storage, - } -}